diff --git a/conn/dialer.c b/conn/dialer.c index c22b5c7..efb9904 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -14,6 +14,7 @@ #include "libp2p/net/multistream.h" #include "libp2p/secio/secio.h" #include "libp2p/yamux/yamux.h" +#include "libp2p/identify/identify.h" struct TransportDialer* libp2p_conn_tcp_transport_dialer_new(); @@ -141,9 +142,13 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer new_stream = libp2p_yamux_stream_new(new_stream); if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; - // identity over yamux + // we have our swarm connection. Now we ask for some "channels" + // id over yamux + libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); // kademlia over yamux + //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_kademlia_stream_new(new_stream)); // circuit relay over yamux + //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_circuit_relay_stream_new(new_stream)); return 1; } else { libp2p_logger_error("dialer", "Unable to do yamux negotiation.\n"); diff --git a/identify/identify.c b/identify/identify.c index e625641..1aa1365 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -35,12 +35,12 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) { * @param context the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_send_protocol(struct SessionContext *context) { +int libp2p_identify_send_protocol(struct IdentifyContext *context) { char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage msg; msg.data = (uint8_t*) protocol; msg.data_size = strlen(protocol); - if (!context->default_stream->write(context, &msg)) { + if (!context->parent_stream->write(context, &msg)) { libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); return 0; } @@ -53,10 +53,10 @@ int libp2p_identify_send_protocol(struct SessionContext *context) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_receive_protocol(struct SessionContext* context) { +int libp2p_identify_receive_protocol(struct IdentifyContext* context) { const char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage* results = NULL; - if (!context->default_stream->read(context, &results, 30)) { + if (!context->parent_stream->read(context, &results, 30)) { libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); return 0; } @@ -92,3 +92,35 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp } return handler; } + +/*** + * Create a new stream that negotiates the identify protocol + * + * NOTE: This will be sent by our side (us asking them). + * Incoming "Identify" requests should be handled by the + * external protocol handler, not this function. + * + * @param parent_stream the parent stream + * @returns a new Stream that can talk "identify" + */ +struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { + if (parent_stream == NULL) + return NULL; + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext)); + if (ctx == NULL) { + libp2p_stream_free(out); + return NULL; + } + ctx->parent_stream = parent_stream; + out->stream_context = ctx; + if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) { + libp2p_stream_free(out); + free(ctx); + return NULL; + } + } + return out; +} + diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index a7f8110..940fcd4 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -21,9 +21,26 @@ typedef struct { char *XXX_unrecognized; } Identify; +struct IdentifyContext { + struct Stream* parent_stream; +}; + int libp2p_identify_can_handle(const struct StreamMessage* msg); -int libp2p_identify_send_protocol(struct SessionContext *context); -int libp2p_identify_receive_protocol(struct SessionContext* context); +int libp2p_identify_send_protocol(struct IdentifyContext *context); +int libp2p_identify_receive_protocol(struct IdentifyContext* context); int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context); int libp2p_identify_shutdown(void* protocol_context); struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers); + +/*** + * Create a new stream that negotiates the identify protocol + * + * NOTE: This will be sent by our side (us asking them). + * Incoming "Identify" requests should be handled by the + * external protocol handler, not this function. + * + * @param parent_stream the parent stream + * @returns a new Stream that can talk "identify" + */ +struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream); + diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index a6a920a..bf20699 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -46,6 +46,7 @@ struct Stream { struct MultiAddress* address; // helps identify who is on the other end pthread_mutex_t* socket_mutex; // only 1 transmission at a time struct Stream* parent_stream; // what stream wraps this stream + int channel; // the channel this stream uses, for multiplexing protocols such as yamux /** * A generic place to store implementation-specific context items */ diff --git a/include/libp2p/utils/vector.h b/include/libp2p/utils/vector.h index d791732..e4f58ad 100644 --- a/include/libp2p/utils/vector.h +++ b/include/libp2p/utils/vector.h @@ -30,8 +30,9 @@ int libp2p_utils_vector_total(struct Libp2pVector* in); * Add a value to the vector * @param vector the vector to add the item to. * @param value the value to be added NOTE: this only saves the pointer, it does not copy. + * @returns the index of the item in the vector */ -void libp2p_utils_vector_add(struct Libp2pVector *vector, const void * value); +int libp2p_utils_vector_add(struct Libp2pVector *vector, const void * value); void libp2p_utils_vector_set(struct Libp2pVector *vector, int pos, void *value); const void *libp2p_utils_vector_get(struct Libp2pVector *vector, int); void libp2p_utils_vector_delete(struct Libp2pVector *vector, int pos); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index d15c8ee..1f2e37a 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -15,6 +15,7 @@ static const int yamux_default_timeout = 10; struct YamuxContext { struct Stream* stream; struct yamux_session* session; + struct Libp2pVector* channels; }; /** @@ -38,3 +39,11 @@ int yamux_send_protocol(struct SessionContext* context); int yamux_receive_protocol(struct SessionContext* context); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); + +/**** + * Add a stream "channel" to the yamux handler + * @param ctx the context + * @param stream the stream to add + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream); diff --git a/net/stream.c b/net/stream.c index 6bd5590..2f7948d 100644 --- a/net/stream.c +++ b/net/stream.c @@ -15,6 +15,7 @@ struct Stream* libp2p_stream_new() { stream->socket_mutex = NULL; stream->stream_context = NULL; stream->write = NULL; + stream->channel = 0; } return stream; } diff --git a/utils/vector.c b/utils/vector.c index fad625d..ad4f4f4 100644 --- a/utils/vector.c +++ b/utils/vector.c @@ -35,11 +35,12 @@ static void libp2p_utils_vector_resize(struct Libp2pVector *v, int capacity) * @param v the vector to add to * @param item the item to add */ -void libp2p_utils_vector_add(struct Libp2pVector *v, const void *item) +int libp2p_utils_vector_add(struct Libp2pVector *v, const void *item) { if (v->capacity == v->total) libp2p_utils_vector_resize(v, v->capacity * 2); v->items[v->total++] = item; + return v->total; } void libp2p_utils_vector_set(struct Libp2pVector *v, int index, void *item) diff --git a/yamux/yamux.c b/yamux/yamux.c index f594faa..e27ff19 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -228,6 +228,7 @@ struct YamuxContext* libp2p_yamux_context_new() { struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); if (ctx != NULL) { ctx->stream = NULL; + ctx->channels = libp2p_utils_vector_new(1); } return ctx; } @@ -283,6 +284,11 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { } } + //TODO: okay, we're almost done. Let incoming stuff be marshaled to the correct handler. + // this should be somewhat automatic, as they ask, and we negotiate + //TODO: we should open some streams with them (multistream, id, kademlia, relay) + // this is not automatic, as we need to start the negotiation process + retVal = 1; exit: if (results != NULL) @@ -321,3 +327,16 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { } return out; } + +/**** + * Add a stream "channel" to the yamux handler + * @param ctx the context + * @param stream the stream to add + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { + int itemNo = libp2p_utils_vector_add(ctx->channels, stream); + stream->channel = itemNo; + return 1; +} +