diff --git a/conn/dialer.c b/conn/dialer.c index 9d3b8ef..1d72f85 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -116,14 +116,14 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer if (conn_stream == NULL) return 0; // multistream - struct Stream* new_stream = libp2p_net_multistream_stream_new(conn_stream); + struct Stream* new_stream = libp2p_net_multistream_stream_new(conn_stream, 0); if (new_stream != NULL) { // secio over multistream new_stream = libp2p_secio_stream_new(new_stream, peer, dialer->peerstore, dialer->private_key); if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; // multistream over secio - new_stream = libp2p_net_multistream_stream_new(new_stream); + new_stream = libp2p_net_multistream_stream_new(new_stream, 0); if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; // yamux over multistream diff --git a/identify/identify.c b/identify/identify.c index 95feb3b..f56a2ea 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -397,7 +397,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent_stream) { if (parent_stream == NULL) return NULL; - struct Stream* multistream = libp2p_net_multistream_stream_new(parent_stream); + struct Stream* multistream = libp2p_net_multistream_stream_new(parent_stream, 0); struct Stream* out = libp2p_stream_new(); if (out != NULL) { out->stream_type = STREAM_TYPE_IDENTIFY; diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 4da9f87..8f95881 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -83,10 +83,11 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, * NOTE: the SessionContext should already contain the connected stream. If not, use * libp2p_net_multistream_connect instead of this method. * - * @param ctx the MultistreamContext + * @param ctx a MultistreamContext + * @param theyRequested true(1) if the multistream ID has already been received from the client * @returns true(1) on success, or false(0) */ -int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx); +int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyRequested); /** * Expect to read a message, and follow its instructions @@ -104,6 +105,12 @@ struct KademliaMessage* libp2p_net_multistream_get_message(struct Stream* stream */ struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessage* incoming); -struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream); +/** + * Create a new MultiStream structure + * @param parent_stream the stream + * @param they_requested true(1) if they requested it (i.e. protocol id has already been sent) + * @returns the new Stream + */ +struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, int theyRequested); void libp2p_net_multistream_stream_free(struct Stream* stream); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index a550df8..3a191a1 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -24,6 +24,13 @@ struct YamuxContext { int am_server; int state; // the state of the connection struct Libp2pVector* protocol_handlers; + /** + * What is stored here is from a read_raw call. It could + * be garbage, but it could be a decent message. It has + * been "unframed" so contains the data portion of the + * last frame captured in a read_raw call, or if it was + * empty, the data from a new read call. + */ struct StreamMessage* buffered_message; long buffered_message_pos; }; @@ -36,7 +43,7 @@ struct YamuxChannelContext { // the child protocol's stream struct Stream* child_stream; // the channel number - int channel; + uint32_t channel; // the window size for this channel int window_size; // the state of the connection diff --git a/net/connectionstream.c b/net/connectionstream.c index 104398b..a6465c3 100644 --- a/net/connectionstream.c +++ b/net/connectionstream.c @@ -73,9 +73,10 @@ int libp2p_net_connection_read(void* stream_context, struct StreamMessage** msg, int current_size = 0; while (1) { int retVal = socket_read(ctx->socket_descriptor, (char*)&buffer[0], 4096, 0, timeout_secs); + libp2p_logger_debug("connectionstream", "Retrieved %d bytes from socket %d.\n", retVal, ctx->socket_descriptor); if (retVal < 1) { // get out of the loop if (retVal < 0) // error - return -1; + return 0; break; } // add what we got to the message @@ -104,13 +105,14 @@ int libp2p_net_connection_read(void* stream_context, struct StreamMessage** msg, *msg = libp2p_stream_message_new(); struct StreamMessage* result = *msg; if (result == NULL) { + libp2p_logger_error("connectionstream", "read: Attempted to allocate memory for message, but allocation failed.\n"); free(result_buffer); return 0; } result->data = result_buffer; result->data_size = current_size; result->error_number = 0; - libp2p_logger_debug("connectionstream", "libp2p_connectionstream_read: Received %d bytes", result->data_size); + libp2p_logger_debug("connectionstream", "libp2p_connectionstream_read: Received %d bytes from socket %d.\n", result->data_size, ctx->socket_descriptor); } return current_size; @@ -148,9 +150,12 @@ int libp2p_net_connection_read_raw(void* stream_context, uint8_t* buffer, int bu * @returns number of bytes written */ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) { - if (stream_context == NULL) + if (stream_context == NULL) { + libp2p_logger_error("connectionstream", "write called with no context.\n"); return -1; + } struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context; + libp2p_logger_debug("connectionstream", "write: About to write %d bytes to socket %d.\n", msg->data_size, ctx->socket_descriptor); return socket_write(ctx->socket_descriptor, (char*)msg->data, msg->data_size, 0); } diff --git a/net/multistream.c b/net/multistream.c index b209d89..84e9f54 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -93,7 +93,8 @@ int libp2p_net_multistream_shutdown(void* protocol_context) { * @returns true(1) on success, otherwise false(0) */ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { - int retVal = ctx->stream->close(ctx->stream); + struct Stream* parent_stream = ctx->stream->parent_stream; + int retVal = parent_stream->close(parent_stream); // regardless of retVal, free the context // TODO: Evaluate if this is the correct way to do it: free(ctx); @@ -200,6 +201,7 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** res size_t varint_length = 0; for(int i = 0; i < 12; i++) { if (parent_stream->read_raw(parent_stream->stream_context, &varint[i], 1, timeout_secs) == -1) { + libp2p_logger_debug("multistream", "read->read_raw returned false.\n"); return 0; } if (varint[i] >> 7 == 0) { @@ -217,11 +219,13 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** res rslts->data_size = num_bytes_requested; rslts->data = (uint8_t*) malloc(num_bytes_requested); if (rslts->data == NULL) { + libp2p_logger_error("multistream", "read: Attempted allocation of stream message failed.\n"); libp2p_stream_message_free(rslts); rslts = NULL; } // now get the data from the parent stream if (!parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs)) { + libp2p_logger_error("multistream", "read: Was supposed to read %d bytes, but read_raw returned false.\n", num_bytes_requested); // problem reading from the parent stream libp2p_stream_message_free(*results); *results = NULL; @@ -306,9 +310,10 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, * libp2p_net_multistream_connect instead of this method. * * @param ctx a MultistreamContext + * @param theyRequested true(1) if the multistream ID has already been received from the client * @returns true(1) on success, or false(0) */ -int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx) { +int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyRequested) { const char* protocolID = "/multistream/1.0.0\n"; struct StreamMessage outgoing; struct StreamMessage* results = NULL; @@ -316,40 +321,48 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx) { int haveTheirs = 0; int peek_result = 0; - // see if they're trying to send something first - peek_result = libp2p_net_multistream_peek(ctx); - /* - if (peek_result < 0) { - libp2p_logger_error("multistream", "Attempted a peek, but received an error.\n"); - return 0; + if (!theyRequested) { + // see if they're trying to send something first + peek_result = libp2p_net_multistream_peek(ctx); + /* + if (peek_result < 0) { + libp2p_logger_error("multistream", "Attempted a peek, but received an error.\n"); + return 0; + } + */ + if (peek_result > 0) { + libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result); + // get the protocol + //ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, multistream_default_timeout); + libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); + if (results == NULL || results->data_size == 0) + goto exit; + if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) + goto exit; + haveTheirs = 1; + } } - */ - if (peek_result > 0) { - libp2p_logger_debug("multistream", "There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result); - // get the protocol - //ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, multistream_default_timeout); - libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); - if (results == NULL || results->data_size == 0) - goto exit; - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) - goto exit; - haveTheirs = 1; - } - // send the protocol id outgoing.data = (uint8_t*)protocolID; outgoing.data_size = strlen(protocolID); - if (!libp2p_net_multistream_write(ctx, &outgoing)) + if (!libp2p_net_multistream_write(ctx, &outgoing)) { + libp2p_logger_debug("multistream", "negotiate: Attempted to send the multistream id, but the write failed.\n"); goto exit; + } // wait for them to send the protocol id back - if (!haveTheirs) { + if (!theyRequested && !haveTheirs) { + libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n"); // expect the same back - libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); - if (results == NULL || results->data_size == 0) + int retVal = libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); + if (retVal == 0 || results == NULL || results->data_size == 0) { + libp2p_logger_debug("multistream", "negotiate: expected the multistream id back, but got nothing. RetVal: %d.\n", retVal); goto exit; - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) + } + if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { + libp2p_logger_debug("multistream", "negotiate: Expected the multistream id back, but did not receive it. We did receive %d bytes though.\n)", results->data_size); goto exit; + } } retVal = 1; @@ -374,12 +387,20 @@ int libp2p_net_multistream_read_raw(void* stream_context, uint8_t* buffer, int b } /** - * Create a new MultiStream structure - * @param socket_fd the file descriptor - * @param ip the IP address - * @param port the port + * We want to try and negotiate Multistream on the incoming stream */ -struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { +struct Stream* libp2p_net_multistream_handshake(struct Stream* stream) { + //TODO: implement this method + return NULL; +} + +/** + * Create a new MultiStream structure + * @param parent_stream the stream + * @param they_requested true(1) if they requested it (i.e. protocol id has already been sent) + * @returns the new Stream + */ +struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, int theyRequested) { struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); if (out != NULL) { out->stream_type = STREAM_TYPE_MULTISTREAM; @@ -389,6 +410,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { out->write = libp2p_net_multistream_write; out->peek = libp2p_net_multistream_peek; out->read_raw = libp2p_net_multistream_read_raw; + out->negotiate = libp2p_net_multistream_handshake; out->address = parent_stream->address; // build MultistreamContext struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); @@ -401,7 +423,8 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { ctx->handlers = NULL; ctx->session_context = NULL; // attempt to negotiate multistream protocol - if (!libp2p_net_multistream_negotiate(ctx)) { + if (!libp2p_net_multistream_negotiate(ctx, theyRequested)) { + libp2p_logger_debug("multistream", "multistream_stream_new: negotiate failed\n"); libp2p_net_multistream_stream_free(out); return NULL; } @@ -418,18 +441,17 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { */ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { // attempt negotiations - struct Stream* new_stream = libp2p_net_multistream_stream_new(stream); + struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1); if (new_stream != NULL) { // upgrade return stream->handle_upgrade(stream, new_stream); } - return -1; } /*** * The handler to handle calls to the protocol - * @param stream_context the context + * @param handler_vector a Libp2pVector of protocol handlers * @returns the protocol handler */ struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { diff --git a/net/server.c b/net/server.c index ccbda72..c5b9aa1 100644 --- a/net/server.c +++ b/net/server.c @@ -60,20 +60,24 @@ void libp2p_net_connection (void *ptr) { libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, connection_param->count); - //TODO: build a stream from the given information - struct Stream* clientStream = libp2p_net_connection_established(connection_param->file_descriptor, connection_param->ip, connection_param->port, NULL); + struct SessionContext sessionContext; + struct Stream* clientStream = libp2p_net_connection_established(connection_param->file_descriptor, connection_param->ip, connection_param->port, &sessionContext); + sessionContext.default_stream = clientStream; + + if (sessionContext.default_stream == NULL) + return; // try to read from the network struct StreamMessage *results = NULL; // handle the call for(;;) { // Read from the network - if (!clientStream->read(clientStream->stream_context, &results, DEFAULT_NETWORK_TIMEOUT)) { + if (!sessionContext.default_stream->read(sessionContext.default_stream->stream_context, &results, DEFAULT_NETWORK_TIMEOUT)) { // problem reading break; } if (results != NULL) { - retVal = libp2p_protocol_marshal(results, clientStream, connection_param->protocol_handlers); + retVal = libp2p_protocol_marshal(results, sessionContext.default_stream, connection_param->protocol_handlers); libp2p_stream_message_free(results); results = NULL; } diff --git a/test/test_yamux.h b/test/test_yamux.h index 9e0ea5f..c968745 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -356,6 +356,11 @@ int test_yamux_client_server_connect() { fprintf(stderr, "Was supposed to get yamux protocol id, but instead received nothing.\n"); goto exit; } + //TODO: make sure everything is negotiated and yamux is in a happy state + // hangup + yamux_stream->close(yamux_stream); + // for debugging + // sleep(30); retVal = 1; exit: libp2p_net_server_stop(); @@ -365,3 +370,132 @@ int test_yamux_client_server_connect() { return retVal; } + +int test_yamux_client_server_multistream() { + int retVal = 0; + struct Libp2pVector* protocol_handlers = NULL; + struct StreamMessage* resultMessage = NULL; + + libp2p_logger_add_class("connectionstream"); + libp2p_logger_add_class("multistream"); + libp2p_logger_add_class("yamux"); + + // setup + // build the protocol handler that can handle yamux + protocol_handlers = libp2p_utils_vector_new(1); + struct Libp2pProtocolHandler* handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + // set up server + libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); + sleep(1); + // set up client (easiest to use transport dialers) + struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); + struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); + struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); + if (stream == NULL) { + fprintf(stderr, "Unable to get stream.\n"); + goto exit; + } + // have client attempt to connect to server and negotiate yamux + struct Stream* yamux_stream = libp2p_yamux_stream_new(stream, 0, protocol_handlers); + if (yamux_stream == NULL) { + fprintf(stderr, "Was supposed to get yamux protocol id, but instead received nothing.\n"); + goto exit; + } + // now attempt multistream + struct Stream* multistream = libp2p_net_multistream_stream_new(yamux_stream, 0); + if (multistream == NULL) { + fprintf(stderr, "Was supposed to get a multistream, but instead got NULL.\n"); + goto exit; + } + // shut down nicely + multistream->close(multistream); + retVal = 1; + exit: + libp2p_net_server_stop(); + if (protocol_handlers != NULL) { + + } + return retVal; + +} + +int test_yamux_multistream_server() { + int retVal = 0; + struct Libp2pVector* protocol_handlers = NULL; + struct StreamMessage* resultMessage = NULL; + + libp2p_logger_add_class("connectionstream"); + libp2p_logger_add_class("multistream"); + libp2p_logger_add_class("yamux"); + + // setup + // build the protocol handler that can handle yamux + protocol_handlers = libp2p_utils_vector_new(1); + struct Libp2pProtocolHandler* handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + // set up server + libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); + // debugging + sleep(120); + retVal = 1; + exit: + libp2p_net_server_stop(); + if (protocol_handlers != NULL) { + + } + return retVal; + +} +int test_yamux_multistream_client() { + int retVal = 0; + struct Libp2pVector* protocol_handlers = NULL; + struct StreamMessage* resultMessage = NULL; + + libp2p_logger_add_class("connectionstream"); + libp2p_logger_add_class("multistream"); + libp2p_logger_add_class("yamux"); + + // setup + // build the protocol handler that can handle yamux + protocol_handlers = libp2p_utils_vector_new(1); + struct Libp2pProtocolHandler* handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + // set up client (easiest to use transport dialers) + struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); + struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); + struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); + if (stream == NULL) { + fprintf(stderr, "Unable to get stream.\n"); + goto exit; + } + // have client attempt to connect to server and negotiate yamux + struct Stream* yamux_stream = libp2p_yamux_stream_new(stream, 0, protocol_handlers); + if (yamux_stream == NULL) { + fprintf(stderr, "Was supposed to get yamux protocol id, but instead received nothing.\n"); + goto exit; + } + // now attempt multistream + struct Stream* multistream = libp2p_net_multistream_stream_new(yamux_stream, 0); + if (multistream == NULL) { + fprintf(stderr, "Was supposed to get a multistream, but instead got NULL.\n"); + goto exit; + } + // shut down nicely + multistream->close(multistream); + // debugging + sleep(30); + retVal = 1; + exit: + if (protocol_handlers != NULL) { + + } + return retVal; + +} diff --git a/test/testit.c b/test/testit.c index 292795a..e144748 100644 --- a/test/testit.c +++ b/test/testit.c @@ -120,6 +120,9 @@ int build_test_collection() { add_test("test_yamux_incoming_protocol_request", test_yamux_incoming_protocol_request, 1); add_test("test_net_server_startup_shutdown", test_net_server_startup_shutdown, 1); add_test("test_yamux_client_server_connect", test_yamux_client_server_connect, 1); + add_test("test_yamux_client_server_multistream", test_yamux_client_server_multistream, 1); + add_test("test_yamux_multistream_server", test_yamux_multistream_server, 0); + add_test("test_yamux_multistream_client", test_yamux_multistream_client, 0); return 1; }; diff --git a/yamux/session.c b/yamux/session.c index 9f5e8e0..51cb537 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -12,6 +12,7 @@ #include "libp2p/yamux/session.h" #include "libp2p/yamux/stream.h" #include "libp2p/yamux/yamux.h" +#include "libp2p/utils/logger.h" static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; @@ -283,12 +284,16 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) if ( (f.streamid % 2 == 0 && yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg); - if (yamuxChannelStream == NULL) + if (yamuxChannelStream == NULL) { + libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); return -EPROTO; + } struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; - if (yamux_session->new_stream_fn) + if (yamux_session->new_stream_fn) { + libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg); + } channelContext->state = yamux_stream_syn_recv; } diff --git a/yamux/stream.c b/yamux/stream.c index b8491ac..59d30d5 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -81,11 +81,12 @@ FOUND:; }; *y_stream = nst; + /* if (libp2p_protocol_marshal(msg, nst.stream, context->protocol_handlers) >= 0) { // success } - /* - struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream); + */ + struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream, id); struct YamuxChannelContext* channel = (struct YamuxChannelContext*)channelStream->stream_context; channel->channel = id; channel->child_stream = NULL; @@ -93,8 +94,6 @@ FOUND:; return channelStream; - */ - return 0; } /** @@ -280,7 +279,7 @@ ssize_t yamux_stream_write(struct YamuxChannelContext* channel_ctx, uint32_t dat char* data = (char*)data_; char* data_end = data + data_length; uint32_t ws = channel_ctx->window_size; - int id = channel_ctx->channel; + uint32_t id = channel_ctx->channel; char sendd[ws + sizeof(struct yamux_frame)]; diff --git a/yamux/yamux.c b/yamux/yamux.c index 9e8871e..f6b5117 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -9,6 +9,42 @@ #include "libp2p/conn/session.h" #include "libp2p/utils/logger.h" +// function declarations that we don't want in the header file +int libp2p_yamux_channels_free(struct YamuxContext* ctx); +struct Stream* libp2p_yamux_get_parent_stream(void* context); + +/** + * Given a context, get the YamuxChannelContext + * @param stream_context the context + * @returns the YamuxChannelContext or NULL if there was none + */ +struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_context) { + char proto = ((uint8_t*)stream_context)[0]; + if (proto == YAMUX_CHANNEL_CONTEXT) { + return (struct YamuxChannelContext*)stream_context; + } + return NULL; +} + +/*** + * Given a context, get the YamuxContext + * @param stream_context a YamuxChannelContext or a YamuxContext + * @returns the YamuxContext, or NULL on error + */ +struct YamuxContext* libp2p_yamux_get_context(void* stream_context) { + char proto = ((uint8_t*)stream_context)[0]; + struct YamuxChannelContext* channel = NULL; + struct YamuxContext* ctx = NULL; + if (proto == YAMUX_CHANNEL_CONTEXT) { + channel = (struct YamuxChannelContext*)stream_context; + ctx = channel->yamux_context; + } else if (proto == YAMUX_CONTEXT) { + ctx = (struct YamuxContext*)stream_context; + } + return ctx; +} + + /** * Determines if this protocol can handle the incoming message * @param incoming the incoming data @@ -135,6 +171,35 @@ struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pV return handler; } +/** + * Close the main yamux connection + * @param stream the stream to close + * @returns true(1) on success, false(0) on error + */ +int libp2p_yamux_send_go_away(struct Stream* stream) { + struct YamuxChannelContext* channel = libp2p_yamux_get_channel_context(stream->stream_context); + struct YamuxContext* ctx = libp2p_yamux_get_context(stream->stream_context); + if (ctx != NULL) { + struct StreamMessage* msg = libp2p_stream_message_new(); + msg->data_size = sizeof(struct yamux_frame); + msg->data = malloc(msg->data_size); + struct yamux_frame* f = (struct yamux_frame*) msg->data; + f->type = yamux_frame_go_away; + f->flags = yamux_frame_fin; + f->streamid = 0; + f->version = 0; + f->length = 0; + if (channel != NULL) { + f->streamid = channel->channel; + } + encode_frame(f); + stream->parent_stream->write(stream->parent_stream->stream_context, msg); + libp2p_stream_message_free(msg); + return 1; + } + return 0; +} + /*** * Close the stream and clean up all resources * NOTE: This also goes through the channels @@ -148,7 +213,9 @@ int libp2p_yamux_close(struct Stream* stream) { return 0; struct Stream* parent_stream = stream->parent_stream; // this should close everything above yamux (i.e. the protocols that are riding on top of yamux) - libp2p_yamux_stream_free(stream); + libp2p_yamux_channels_free(stream->stream_context); + // send a FIN + libp2p_yamux_send_go_away(stream); // and this should close everything below parent_stream->close(parent_stream); return 1; @@ -163,8 +230,10 @@ int libp2p_yamux_close(struct Stream* stream) { * @returns true(1) on success, false(0) on failure */ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) { - if (stream_context == NULL) + if (stream_context == NULL) { + libp2p_logger_error("yamux", "read was passed a null context.\n"); return 0; + } // look at the first byte of the context to determine if this is a YamuxContext (we're negotiating) // or a YamuxChannelContext (we're talking to an established channel) struct YamuxContext* ctx = NULL; @@ -177,26 +246,38 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int ctx = (struct YamuxContext*)stream_context; } + struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); if (channel != NULL && channel->channel != 0) { // we have an established channel. Use it. - if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout)) + if (!parent_stream->read(parent_stream->stream_context, message, yamux_default_timeout)) { + libp2p_logger_error("yamux", "Read: Attepted to read from channel %d, but the read failed.\n", channel->channel); return 0; + } + if (message == NULL) { + libp2p_logger_error("yamux", "Read: Successfully read from channel %d, but message was NULL.\n", channel->channel); + } // TODO: This is not right. It must be sorted out. struct StreamMessage* msg = *message; - if (yamux_decode(channel, msg->data, msg->data_size, message) == 0) + libp2p_logger_debug("yamux", "Read: Received %d bytes on channel %d.\n", msg->data_size, channel->channel); + if (yamux_decode(channel, msg->data, msg->data_size, message) == 0) { return 1; + } + libp2p_logger_error("yamux", "yamux_decode returned error.\n"); } else if (ctx != NULL) { // We are still negotiating. They are probably attempting to negotiate a new protocol struct StreamMessage* incoming = NULL; - if (ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &incoming, yamux_default_timeout)) { + if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { + libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size); // parse the frame if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) { libp2p_stream_message_free(incoming); return 1; } + libp2p_logger_error("yamux", "yamux_decode returned error.\n"); libp2p_stream_message_free(incoming); } } + libp2p_logger_error("yamux", "Unable to do network read.\n"); return 0; } @@ -227,6 +308,23 @@ struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incomin } return out; } + +/*** + * Get the next usable ID for a channel + * NOTE: Also increments the yamux_session_.nextid counter + * NOTE: Odd = client, Even = server + * @param ctx the context + * @returns the next id + */ +uint32_t libp2p_yamux_get_next_id(struct YamuxContext* ctx) { + uint32_t next_id = ctx->session->nextid; + if ( (ctx->am_server && next_id % 2 == 1) + || (!ctx->am_server && next_id % 2 == 0)) + next_id += 1; + ctx->session->nextid = next_id + 1; + return next_id; +} + /*** * Write to the remote * @param stream_context the context. Could be a YamuxContext or YamuxChannelContext @@ -256,40 +354,29 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data; // set a few more flags frame->flags = get_flags(stream_context); - if (channel != NULL) + if (channel == NULL) { + // if we don't yet have a channel, set the id to the next available + frame->streamid = libp2p_yamux_get_next_id(ctx); + } else { frame->streamid = channel->channel; + } encode_frame(frame); int retVal = 0; if (channel != NULL && channel->channel != 0) { // we have an established channel. Use it. - retVal = channel->stream->write(channel->stream->stream_context, outgoing_message); + libp2p_logger_debug("yamux", "About to write %d bytes to yamux channel %d.\n", outgoing_message->data_size, channel->channel); + struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); + retVal = parent_stream->write(parent_stream->stream_context, outgoing_message); } else if (ctx != NULL) { - retVal = ctx->stream->parent_stream->write(ctx->stream->parent_stream, outgoing_message); + libp2p_logger_debug("yamux", "About to write %d bytes to stream.\n", outgoing_message->data_size); + retVal = ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, outgoing_message); } libp2p_stream_message_free(outgoing_message); return retVal; } -/*** - * Given a context, get the YamuxContext - * @param stream_context a YamuxChannelContext or a YamuxContext - * @returns the YamuxContext, or NULL on error - */ -struct YamuxContext* libp2p_yamux_get_context(void* stream_context) { - char proto = ((uint8_t*)stream_context)[0]; - struct YamuxChannelContext* channel = NULL; - struct YamuxContext* ctx = NULL; - if (proto == YAMUX_CHANNEL_CONTEXT) { - channel = (struct YamuxChannelContext*)stream_context; - ctx = channel->yamux_context; - } else if (proto == YAMUX_CONTEXT) { - ctx = (struct YamuxContext*)stream_context; - } - return ctx; -} - /*** * Check to see if there is anything waiting on the network. * @param stream_context the YamuxContext @@ -321,15 +408,19 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size return -1; } struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context); - if (ctx->buffered_message_pos == -1) { + if (ctx->buffered_message_pos == -1 || ctx->buffered_message == NULL) { // we need to get info from the network - if (!ctx->stream->read(ctx->stream->stream_context, &ctx->buffered_message, timeout_secs)) { + if (!libp2p_yamux_read(stream_context, &ctx->buffered_message, timeout_secs)) { + libp2p_logger_error("yamux", "read_raw: Unable to read from network.\n"); return -1; } ctx->buffered_message_pos = 0; + } else { + // we have some data from a previous read_raw call the code + // below should handle this. } // max_to_read is the lesser of bytes read or buffer_size - int max_to_read = (buffer_size > ctx->buffered_message->data_size ? ctx->buffered_message->data_size : buffer_size); + int max_to_read = (buffer_size > (ctx->buffered_message->data_size-ctx->buffered_message_pos) ? ctx->buffered_message->data_size-ctx->buffered_message_pos : buffer_size); memcpy(buffer, &ctx->buffered_message->data[ctx->buffered_message_pos], max_to_read); ctx->buffered_message_pos += max_to_read; if (ctx->buffered_message_pos == ctx->buffered_message->data_size) { @@ -397,6 +488,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { // send the protocol id outgoing.data = (uint8_t*)protocolID; outgoing.data_size = strlen(protocolID); + libp2p_logger_debug("yamux", "Attempting to write the yamux protocol id.\n"); if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) { libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n"); goto exit; @@ -436,6 +528,13 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { */ int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) { // put this stream in the collection, and tie it to an id + if (libp2p_logger_watching_class("yamux")) { + const char* stream_type = ""; + if (new_stream->stream_type == STREAM_TYPE_MULTISTREAM) { + stream_type = "Multistream"; + } + libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type); + } struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; return libp2p_yamux_stream_add(yamux_context, new_stream); } @@ -451,7 +550,7 @@ void libp2p_yamux_read_from_yamux_session(struct yamux_stream* stream, uint32_t */ void libp2p_yamux_new_stream(struct YamuxContext* context, struct Stream* stream, struct StreamMessage* msg) { // ok, we have the new stream structure. We now need to read what was sent. - libp2p_protocol_marshal(msg, stream, context->protocol_handlers); + //libp2p_protocol_marshal(msg, stream, context->protocol_handlers); } /*** @@ -493,6 +592,49 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv return out; } +/*** + * This will retrieve the stream that yamux is riding on top of + * @param context a YamuxContext or YamuxChannelContext + * @returns the Stream that yamux is riding on top of + */ +struct Stream* libp2p_yamux_get_parent_stream(void* context) { + if (context == NULL) + return NULL; + struct YamuxContext* ctx = libp2p_yamux_get_context(context); + if (ctx == NULL) + return NULL; + return ctx->stream->parent_stream; +} + +/*** + * Sends a FIN to close a channel + * @param channel the channel to close + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_channel_send_FIN(struct YamuxChannelContext* channel) { + if (channel == NULL) + return 0; + struct YamuxContext* ctx = channel->yamux_context; + if (ctx != NULL) { + struct StreamMessage* msg = libp2p_stream_message_new(); + msg->data_size = sizeof(struct yamux_frame); + msg->data = malloc(msg->data_size); + struct yamux_frame* f = (struct yamux_frame*) msg->data; + f->type = yamux_frame_window_update; + f->flags = yamux_frame_fin; + f->streamid = channel->channel; + f->version = 0; + f->length = 0; + encode_frame(f); + struct Stream* parent_to_yamux = libp2p_yamux_get_parent_stream(channel); + if (parent_to_yamux != NULL) + parent_to_yamux->write(parent_to_yamux->stream_context, msg); + libp2p_stream_message_free(msg); + return 1; + } + return 0; + +} /** * Clean up resources from libp2p_yamux_channel_new @@ -503,6 +645,8 @@ int libp2p_yamux_channel_close(void* context) { return 0; struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; if (ctx != NULL) { + //Send FIN + libp2p_yamux_channel_send_FIN(ctx); // close the child's stream ctx->child_stream->close(ctx->child_stream); libp2p_stream_free(ctx->stream); @@ -511,6 +655,23 @@ int libp2p_yamux_channel_close(void* context) { return 1; } +/*** + * Close all channels + * @param ctx the YamuxContext that contains a vector of channels + * @returns true(1) + */ +int libp2p_yamux_channels_free(struct YamuxContext* ctx) { + if (ctx->channels) { + for(int i = 0; i < ctx->channels->total; i++) { + struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); + libp2p_yamux_channel_close(curr->stream_context); + } + libp2p_utils_vector_free(ctx->channels); + ctx->channels = NULL; + } + return 1; +} + /*** * Free the resources from libp2p_yamux_context_new * @param ctx the context @@ -523,14 +684,7 @@ void libp2p_yamux_context_free(struct YamuxContext* ctx) { ctx->buffered_message = NULL; } // free all the channels - if (ctx->channels) { - for(int i = 0; i < ctx->channels->total; i++) { - struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); - //curr->close(curr->stream_context); - libp2p_yamux_channel_close(curr->stream_context); - } - libp2p_utils_vector_free(ctx->channels); - } + libp2p_yamux_channels_free(ctx); if (ctx->session != NULL) yamux_session_free(ctx->session); free(ctx); @@ -602,7 +756,7 @@ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, i ctx->yamux_context = incoming_stream->stream_context; ctx->child_stream = NULL; } - ctx->channel = channelNumber; + ctx->channel = (uint32_t) channelNumber; ctx->closed = 0; ctx->state = 0; ctx->window_size = 0; @@ -628,7 +782,7 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { return 0; struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context; // the negotiation was successful. Add it to the list of channels that we have - int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream); + uint32_t itemNo = (uint32_t) libp2p_utils_vector_add(ctx->channels, channel_stream); // There are 2 streams for each protocol. A server has the even numbered streams, the // client the odd number streams. If we are the server, we need to kick off the // process to add a stream of the same type. @@ -636,7 +790,7 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { if (ctx->am_server && itemNo % 2 != 0) { // we're the server, and they have a negotiated a new protocol. // negotiate a stream for us to talk to them. - struct Stream* yamux_stream = stream->parent_stream->parent_stream; + struct Stream* yamux_stream = ctx->stream; struct Stream* server_to_client_stream = stream->negotiate(yamux_stream); libp2p_yamux_stream_add(ctx, server_to_client_stream); }