From e67d62600031a41654c0678334f5a691e94927ad Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 30 Nov 2017 14:32:36 -0500 Subject: [PATCH] Added buffering to yamux --- conn/dialer.c | 2 +- identify/identify.c | 2 +- include/libp2p/net/protocol.h | 7 + include/libp2p/net/stream.h | 23 ++- include/libp2p/utils/threadsafe_buffer.h | 58 +++++++ include/libp2p/yamux/session.h | 8 + include/libp2p/yamux/yamux.h | 5 + net/multistream.c | 80 +++++---- net/protocol.c | 11 ++ net/stream.c | 39 +++++ secio/secio.c | 25 ++- swarm/swarm.c | 4 + utils/Makefile | 2 +- utils/threadsafe_buffer.c | 123 ++++++++++++++ yamux/session.c | 30 +++- yamux/stream.c | 85 +++++++++- yamux/yamux.c | 199 +++++++++++++++-------- 17 files changed, 568 insertions(+), 135 deletions(-) create mode 100644 include/libp2p/utils/threadsafe_buffer.h create mode 100644 utils/threadsafe_buffer.c diff --git a/conn/dialer.c b/conn/dialer.c index a3d7da0..cc70180 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -141,7 +141,7 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer return 0; libp2p_logger_debug("dialer", "We successfully negotiated multistream over secio.\n"); // yamux over multistream - new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, NULL); + new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, dialer->swarm->protocol_handlers); if (new_stream != NULL) { if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) return 0; diff --git a/identify/identify.c b/identify/identify.c index d12b234..2c3ab17 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -376,7 +376,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { out->stream_context = ctx; out->close = libp2p_identify_close; out->negotiate = libp2p_identify_stream_new; - out->handle_message = libp2p_identify_handle_message; + out->bytes_waiting = NULL; // do we expect a reply? if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) { libp2p_stream_free(out); diff --git a/include/libp2p/net/protocol.h b/include/libp2p/net/protocol.h index ab1b222..a85df4c 100644 --- a/include/libp2p/net/protocol.h +++ b/include/libp2p/net/protocol.h @@ -63,3 +63,10 @@ int libp2p_protocol_marshal(struct StreamMessage* message, struct Stream* stream * @returns true(1) */ int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers); + +/*** + * Check to see if this is a valid protocol + * @param msg the message + * @param handlers the vector of handlers + */ +int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 6aae6b3..0f94b27 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -133,12 +133,10 @@ struct Stream { /**** * A message has been received, and needs to be handled - * @param message the message received - * @param stream where the message came from - * @param protocol_context the context for the protocol - * @returns < 0 on error, 0 if no further processing needs to be done, or 1 for success + * @param stream the stream that has the message waiting + * @returns number of bytes processed */ - int (*handle_message)(const struct StreamMessage* message, struct Stream* stream, void* protocol_context); + int (*bytes_waiting)(struct Stream* stream); }; struct Stream* libp2p_stream_new(); @@ -165,3 +163,18 @@ int libp2p_stream_lock(struct Stream* stream); * @returns true(1) on success, false(0) otherwise */ int libp2p_stream_unlock(struct Stream* stream); + +/*** + * Determine if this stream is open + * @param stream the stream to check + * @returns true(1) if the stream is open, false otherwise + */ +int libp2p_stream_is_open(struct Stream* stream); + +/** + * Look for the latest stream + * (properly handles both raw streams and yamux streams) + * @param in the incoming stream + * @returns the latest child stream + */ +struct Stream* libp2p_stream_get_latest_stream(struct Stream* in); diff --git a/include/libp2p/utils/threadsafe_buffer.h b/include/libp2p/utils/threadsafe_buffer.h new file mode 100644 index 0000000..972cc5d --- /dev/null +++ b/include/libp2p/utils/threadsafe_buffer.h @@ -0,0 +1,58 @@ +#pragma once + +/** + * A thredsafe buffer + */ + +#include +#include +#include + +/*** + * Holds the information about the buffer + */ +struct ThreadsafeBufferContext { + size_t buffer_size; + uint8_t* buffer; + pthread_mutex_t lock; +}; + +/*** + * Allocate a new context + * @returns a newly allocated context, or NULL on error (out of memory?) + */ +struct ThreadsafeBufferContext* threadsafe_buffer_context_new(); + +/*** + * Free resources of a buffer context + * @param context the context + */ +void threadsafe_buffer_context_free(struct ThreadsafeBufferContext* context); + +/*** + * Read from the buffer without destroying its contents or moving its read pointer + * @param context the context + * @param results where to put the results + * @param results_size the size of the results + * @returns number of bytes read + */ +size_t threadsafe_buffer_peek(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size); + +/*** + * Read from the buffer. + * NOTE: If results_size is more than what is left in the buffer, this will read everything. + * @param context the context + * @param results where to put the results + * @param results_size the size of the buffer + * @returns number of bytes read + */ +size_t threadsafe_buffer_read(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size); + +/**** + * Add bytes to the end of the buffer + * @param context the context + * @param bytes the bytes to add + * @param bytes_size the size of bytes + * @returns the size added to the buffer (0 on error) + */ +size_t threadsafe_buffer_write(struct ThreadsafeBufferContext* context, const uint8_t* bytes, size_t bytes_size); diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h index 79ad0ec..bf818c6 100644 --- a/include/libp2p/yamux/session.h +++ b/include/libp2p/yamux/session.h @@ -145,3 +145,11 @@ ssize_t yamux_session_read(struct yamux_session* session); * @returns true(1) on success, false(0) otherwise */ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message); + +/*** + * Find the correct yamux session stream + * @param streams the collection + * @param channel the id + * @returns the correce yamux_session_stream + */ +struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* session, int channel); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 83f4162..1f182ff 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -2,6 +2,7 @@ #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" +#include "libp2p/utils/threadsafe_buffer.h" #include "libp2p/yamux/stream.h" /*** @@ -54,6 +55,10 @@ struct YamuxChannelContext { int state; // whether or not the connection is closed int closed; + // a buffer for data coming in from the network + struct ThreadsafeBufferContext* buffer; + // true if read is already running + int read_running; }; /** diff --git a/net/multistream.c b/net/multistream.c index 3f5f611..a9f5198 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -26,6 +26,7 @@ int multistream_default_timeout = 5; // forward declarations int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context); +int libp2p_net_multistream_bytes_waiting(struct Stream* stream); int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { @@ -271,8 +272,9 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** res rslts = NULL; } // now get the data from the parent stream - if (!parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs)) { - libp2p_logger_error("multistream", "read: Was supposed to read %d bytes, but read_raw returned false.\n", num_bytes_requested); + int bytes_read = parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs); + if (bytes_read != num_bytes_requested) { + libp2p_logger_error("multistream", "read: Was supposed to read %d bytes, but read_raw returned %d.\n", num_bytes_requested, bytes_read); // problem reading from the parent stream libp2p_stream_message_free(*results); *results = NULL; @@ -366,30 +368,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq struct StreamMessage outgoing; struct StreamMessage* results = NULL; int retVal = 0; - //int haveTheirs = 0; - //int peek_result = 0; - /* - if (!theyRequested) { - // see if they're trying to send something first - peek_result = libp2p_net_multistream_peek(ctx); - if (peek_result > 0) { - libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result); - // get the protocol - libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); - if (results == NULL || results->data_size == 0) { - libp2p_logger_debug("multistream", "negotiate: We tried to read the %d bytes, but got nothing.\n", peek_result); - goto exit; - } - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { - libp2p_logger_debug("multistream", "negotiate: We expected the multistream id, but got %s.\n", results->data); - goto exit; - } - libp2p_logger_debug("multistream", "negotiate: We read %d bytes from the network, and received the multistream id.\n", results->data_size); - haveTheirs = 1; - } - } - */ // send the protocol id outgoing.data = (uint8_t*)protocolID; outgoing.data_size = strlen(protocolID); @@ -405,23 +384,6 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq ctx->status = multistream_status_syn; } - /* - // wait for them to send the protocol id back - if (!theyRequested && !haveTheirs) { - libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n"); - // expect the same back - int retVal = libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); - if (retVal == 0 || results == NULL || results->data_size == 0) { - libp2p_logger_debug("multistream", "negotiate: expected the multistream id back, but got nothing. RetVal: %d.\n", retVal); - goto exit; - } - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { - libp2p_logger_debug("multistream", "negotiate: Expected the multistream id back, but did not receive it. We did receive %d bytes though.\n)", results->data_size); - goto exit; - } - } - */ - retVal = 1; exit: if (results != NULL) @@ -459,9 +421,15 @@ struct Stream* libp2p_net_multistream_handshake(struct Stream* stream) { */ int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Stream* new_stream) { // take multistream out of the picture + if (multistream->stream_type != STREAM_TYPE_MULTISTREAM) { + libp2p_logger_error("multistream", "Attempt to upgrade from multistream to %d, but the first parameter is not multistream, it is %d.\n", new_stream->stream_type, multistream->stream_type); + return 0; + } if (new_stream->parent_stream == multistream) { new_stream->parent_stream = multistream->parent_stream; multistream->parent_stream->handle_upgrade(multistream->parent_stream, new_stream); + } else { + libp2p_logger_error("multistream", "Attempt to upgrade from multistream ti %d, but the parent stream is not multistream.\n", new_stream->stream_type); } return 1; } @@ -486,7 +454,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->address = parent_stream->address; out->socket_mutex = parent_stream->socket_mutex; - out->handle_message = libp2p_net_multistream_handle_message; + out->bytes_waiting = libp2p_net_multistream_bytes_waiting; // build MultistreamContext struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); if (ctx == NULL) { @@ -498,6 +466,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i ctx->stream = out; ctx->handlers = NULL; ctx->session_context = NULL; + parent_stream->handle_upgrade(parent_stream, out); // attempt to negotiate multistream protocol if (!libp2p_net_multistream_negotiate(ctx, theyRequested)) { libp2p_logger_debug("multistream", "multistream_stream_new: negotiate failed\n"); @@ -525,6 +494,10 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i * @returns <0 on error, 0 for the caller to stop handling this, 1 for success */ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + // get the latest stream, as this stuff is multithreaded and may be stale + struct Stream* latest_stream = libp2p_stream_get_latest_stream(stream); + if (latest_stream != NULL) + stream = latest_stream; if (stream->stream_type == STREAM_TYPE_MULTISTREAM) { // we sent a multistream, and this is them responding struct MultistreamContext* ctx = (struct MultistreamContext*) stream->stream_context; @@ -547,6 +520,27 @@ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struc return -1; } +/*** + * We have bytes waiting on the network. + * Normally, multistream is just a step towards another protocol. + * @param stream this multistream + * @returns number of bytes processed + */ +int libp2p_net_multistream_bytes_waiting(struct Stream* stream) { + libp2p_logger_debug("multistream", "bytes_waiting called\n"); + struct StreamMessage* message = NULL; + if (libp2p_net_multistream_read(stream->stream_context, &message, 5)) { + libp2p_logger_debug("multistream", "bytes_waiting: Read %d bytes from stream. [%s]\n", message->data_size, message->data); + struct MultistreamContext* context = (struct MultistreamContext*) stream->stream_context; + int retVal = libp2p_protocol_marshal(message, stream, context->handlers); + if (retVal >= 0) + return message->data_size; + } else { + libp2p_logger_error("multistream", "bytes_waiting said there were bytes waiting, but read was false.\n"); + } + return 0; +} + /*** * The handler to handle calls to the protocol * @param handler_vector a Libp2pVector of protocol handlers diff --git a/net/protocol.c b/net/protocol.c index 82fc6b8..e75ba2e 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -69,6 +69,17 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, st return handler->HandleMessage(msg, stream, handler->context); } +/*** + * Check to see if this is a valid protocol + * @param msg the message + * @param handlers the vector of handlers + */ +int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers) { + if (protocol_compare(msg, handlers) == NULL) + return 0; + return 1; +} + /*** * Shut down all protocol handlers and free vector * @param handlers vector of Libp2pProtocolHandler diff --git a/net/stream.c b/net/stream.c index 33bdd9b..81b06d0 100644 --- a/net/stream.c +++ b/net/stream.c @@ -3,6 +3,7 @@ #include "multiaddr/multiaddr.h" #include "libp2p/net/stream.h" #include "libp2p/net/connectionstream.h" +#include "libp2p/yamux/yamux.h" int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { return libp2p_net_connection_upgrade(parent_stream, new_stream); @@ -39,3 +40,41 @@ void libp2p_stream_free(struct Stream* stream) { free(stream); } } + +int libp2p_stream_is_open(struct Stream* stream) { + if (stream == NULL) + return 0; + + struct Stream* base_stream = stream; + while (base_stream->parent_stream != NULL) + base_stream = base_stream->parent_stream; + if (base_stream->stream_type == STREAM_TYPE_RAW) { + struct ConnectionContext* ctx = (struct ConnectionContext*)base_stream->stream_context; + if (ctx->socket_descriptor > 0) + return 1; + } + return 0; +} + +// forward declaration +struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_context); + +/** + * Look for the latest stream + * (properly handles both raw streams and yamux streams) + * @param in the incoming stream + * @returns the latest child stream + */ +struct Stream* libp2p_stream_get_latest_stream(struct Stream* in) { + if (in == NULL) + return NULL; + if (in->stream_type == STREAM_TYPE_RAW) { + struct ConnectionContext* ctx = (struct ConnectionContext*)in->stream_context; + return ctx->session_context->default_stream; + } else if (in->stream_type == STREAM_TYPE_YAMUX) { + struct YamuxChannelContext* ctx = libp2p_yamux_get_channel_context(in->stream_context); + if (ctx != NULL) + return ctx->child_stream; + } + return libp2p_stream_get_latest_stream(in->parent_stream); +} diff --git a/secio/secio.c b/secio/secio.c index 94447ee..27820dc 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -584,6 +584,21 @@ int libp2p_secio_get_socket_descriptor(struct Stream* stream) { return ctx->socket_descriptor; } +/*** + * Navigate down the tree of streams, and set the raw socket descriptor to 0, + * as it appears the connection has been closed. Cleanup will happen later. + * @param stream the stream + * @returns true(1) + */ +int libp2p_secio_set_socket_descriptor(struct Stream* stream) { + struct Stream* current = stream; + while (current->parent_stream != NULL) + current = current->parent_stream; + struct ConnectionContext* ctx = current->stream_context; + ctx->socket_descriptor = 0; + return 1; +} + /*** * Write bytes to an unencrypted stream * @param session the session information @@ -654,6 +669,9 @@ int libp2p_secio_unencrypted_read(struct Stream* secio_stream, struct StreamMess int socket_descriptor = libp2p_secio_get_socket_descriptor(secio_stream); + if (socket_descriptor <= 0) + return 0; + // first read the 4 byte integer char* size = (char*)&buffer_size; int left = 4; @@ -680,8 +698,11 @@ int libp2p_secio_unencrypted_read(struct Stream* secio_stream, struct StreamMess libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno)); return 0; } - else - libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: 0 bytes read, but errno shows no error. Trying again.\n"); + else { + libp2p_logger_error("secio", "Stream has been shut down from other end.\n"); + libp2p_secio_set_socket_descriptor(secio_stream); + return 0; + } } } else { left = left - read_this_time; diff --git a/swarm/swarm.c b/swarm/swarm.c index 4059c21..2f1ab2e 100644 --- a/swarm/swarm.c +++ b/swarm/swarm.c @@ -36,6 +36,10 @@ int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* p if (!stream->read(stream->stream_context, &results, 1)) { libp2p_logger_debug("swarm", "Releasing read lock\n"); pthread_mutex_unlock(stream->socket_mutex); + if (!libp2p_stream_is_open(stream)) { + libp2p_logger_error("swarm", "Attempted read on stream, but has been closed.\n"); + return -1; + } libp2p_logger_error("swarm", "Unable to read from network (could just be a timeout). Exiting the read.\n"); return retVal; } diff --git a/utils/Makefile b/utils/Makefile index e8bd434..653f98c 100644 --- a/utils/Makefile +++ b/utils/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = string_list.o vector.o linked_list.o logger.o urlencode.o thread_pool.o +OBJS = string_list.o vector.o linked_list.o logger.o urlencode.o thread_pool.o threadsafe_buffer.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/utils/threadsafe_buffer.c b/utils/threadsafe_buffer.c new file mode 100644 index 0000000..d2f7efa --- /dev/null +++ b/utils/threadsafe_buffer.c @@ -0,0 +1,123 @@ +/** + * A thredsafe buffer + */ + +#include + +#include "libp2p/utils/threadsafe_buffer.h" +#include "libp2p/utils/logger.h" + +/*** + * Allocate a new context + * @returns a newly allocated context, or NULL on error (out of memory?) + */ +struct ThreadsafeBufferContext* threadsafe_buffer_context_new() { + struct ThreadsafeBufferContext* context = (struct ThreadsafeBufferContext*) malloc(sizeof(struct ThreadsafeBufferContext)); + if (context != NULL) { + context->buffer_size = 0; + context->buffer = NULL; + pthread_mutex_init(&context->lock, NULL); + } + return context; +} + +/*** + * Free resources of a buffer context + * @param context the context + */ +void threadsafe_buffer_context_free(struct ThreadsafeBufferContext* context) { + if (context != NULL) { + if (context->buffer != NULL) + free(context->buffer); + free(context); + } +} + +/*** + * Read from the buffer without destroying its contents or moving its read pointer + * @param context the context + * @param results where to put the results + * @param results_size the size of the results + * @returns number of bytes read + */ +size_t threadsafe_buffer_peek(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size) { + size_t bytes_read = 0; + if (context == NULL) + return 0; + pthread_mutex_lock(&context->lock); + // do the read + if (context->buffer != NULL && context->buffer_size > 0) { + bytes_read = results_size < context->buffer_size ? results_size : context->buffer_size; + memcpy(results, context->buffer, bytes_read); + } + pthread_mutex_unlock(&context->lock); + return bytes_read; +} + +/*** + * Read from the buffer. + * NOTE: If results_size is more than what is left in the buffer, this will read everything. + * @param context the context + * @param results where to put the results + * @param results_size the size of the buffer + * @returns number of bytes read + */ +size_t threadsafe_buffer_read(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size) { + size_t bytes_read = 0; + if (context == NULL) + return 0; + pthread_mutex_lock(&context->lock); + // do the read + if (context->buffer != NULL && context->buffer_size > 0) { + bytes_read = results_size < context->buffer_size ? results_size : context->buffer_size; + libp2p_logger_debug("threadsafe_buffer", "read: We want to read %d bytes, and have %d in the buffer. Therefore, we will read %d.\n", results_size, context->buffer_size, bytes_read); + memcpy(results, context->buffer, bytes_read); + } + // adjust the size + if (bytes_read > 0) { + if (context->buffer_size - bytes_read > 0) { + // more remains + size_t new_buffer_size = context->buffer_size - bytes_read; + uint8_t* new_buffer = (uint8_t*) malloc(new_buffer_size); + memcpy(new_buffer, &context->buffer[bytes_read], new_buffer_size); + free(context->buffer); + context->buffer = new_buffer; + context->buffer_size = new_buffer_size; + } else { + // everything has been read + free(context->buffer); + context->buffer = NULL; + context->buffer_size = 0; + } + } + pthread_mutex_unlock(&context->lock); + return bytes_read; +} + +/**** + * Add bytes to the end of the buffer + * @param context the context + * @param bytes the bytes to add + * @param bytes_size the size of bytes + * @returns the size added to the buffer (0 on error) + */ +size_t threadsafe_buffer_write(struct ThreadsafeBufferContext* context, const uint8_t* bytes, size_t bytes_size) { + if (context == NULL) + return 0; + if (bytes_size == 0) + return 0; + size_t bytes_copied = 0; + pthread_mutex_lock(&context->lock); + // allocate memory + uint8_t* new_buffer = (uint8_t*) realloc(context->buffer, context->buffer_size + bytes_size); + if (new_buffer != NULL) { + // copy data + memcpy(&new_buffer[context->buffer_size], bytes, bytes_size); + context->buffer_size += bytes_size; + context->buffer = new_buffer; + bytes_copied = bytes_size; + libp2p_logger_debug("threadsafe_buffer", "write: Added %d bytes. Buffer now contains %d bytes.\n", bytes_size, context->buffer_size); + } + pthread_mutex_unlock(&context->lock); + return bytes_copied; +} diff --git a/yamux/session.c b/yamux/session.c index 7aaf702..5e16f97 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -318,10 +318,15 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size); - ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); - libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re); + if (f.streamid == 2) { + ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); + libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re); + return (re < 0) ? re : (re + incoming_size); + } else { + libp2p_logger_debug("yamux", "Only handling stream 2 for now.\n"); + return 0; + } //yamux_pull_message_from_frame(incoming, incoming_size, return_message); - return (re < 0) ? re : (re + incoming_size); } // stream id matches } @@ -367,7 +372,8 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); if (multistream != NULL) { libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid); - channelContext->child_stream = multistream; + // this should already be done + // channelContext->child_stream = multistream; } else { libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); } @@ -388,3 +394,19 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s return 0; } +/*** + * Find the correct yamux session stream + * @param streams the collection + * @param channel the id + * @returns the correce yamux_session_stream + */ +struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* session, int channel) { + for (size_t i = 0; i < session->cap_streams; ++i) + { + struct yamux_session_stream* ss = &session->streams[i]; + if (ss->stream->stream->channel == channel) + return ss; + } + return NULL; +} + diff --git a/yamux/stream.c b/yamux/stream.c index 84cabdb..1df15fb 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -11,6 +11,7 @@ #include "libp2p/yamux/stream.h" #include "libp2p/yamux/yamux.h" #include "libp2p/utils/logger.h" +#include "libp2p/utils/threadsafe_buffer.h" #define MIN(x,y) (y^((x^y)&-(xbuffer->buffer_size > 0) { + if (!context->read_running) { + context->read_running = 1; + if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) { + context->read_running = 0; + libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data); + int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers); + libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal); + libp2p_stream_message_free(message); + } else { + context->read_running = 0; + libp2p_logger_debug("yamux", "read_method: read returned false.\n"); + } + } + } + return NULL; +} + +/** + * spin off a new thread to handle a child's reading of data + * @param context the YamuxChannelContext + */ +int libp2p_yamux_notify_child_stream_has_data(struct YamuxChannelContext* context) { + pthread_t new_thread; + + if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0) + return 1; + return 0; +} + /*** * A frame came in. This looks at the data after the frame and does the right thing. * @param stream the stream @@ -405,17 +445,48 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr stream->read_fn(stream, f.length, (void*)incoming); */ // the new way - struct StreamMessage stream_message; - stream_message.data_size = incoming_size; - stream_message.data = (uint8_t*)incoming; - libp2p_logger_debug("yamux", "Calling handle_message for stream type %d with message of %d bytes. [%s]\n", stream->stream->stream_type, stream_message.data_size, stream_message.data); - struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream->stream->stream_context); + // get the yamux channel stream + struct Stream* channel_stream = stream->stream; + while(channel_stream != NULL && channel_stream->stream_type != STREAM_TYPE_YAMUX) + channel_stream = channel_stream->parent_stream; + struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(channel_stream->stream_context); if (channelContext == NULL) { libp2p_logger_error("yamux", "Unable to get channel context for stream %d.\n", frame->streamid); return -EPROTO; } - channelContext->child_stream->handle_message(&stream_message, channelContext->child_stream, NULL); - + libp2p_logger_debug("yamux", "writing %d bytes to channel context %d.\n", incoming_size, channelContext->channel); + threadsafe_buffer_write(channelContext->buffer, incoming, incoming_size); + if(channelContext->child_stream == NULL) { + // we have to handle this ourselves + // see if we have the entire message + int buffer_size = channelContext->buffer->buffer_size; + uint8_t buffer[buffer_size]; + buffer_size = threadsafe_buffer_peek(channelContext->buffer, buffer, buffer_size); + struct StreamMessage message; + message.data_size = buffer_size; + message.data = buffer; + if (libp2p_protocol_is_valid_protocol(&message, channelContext->yamux_context->protocol_handlers)) { + // marshal the call + buffer_size = threadsafe_buffer_read(channelContext->buffer, buffer, buffer_size); + message.data_size = buffer_size; + message.data = buffer; + libp2p_protocol_marshal(&message, stream->stream, channelContext->yamux_context->protocol_handlers); + } + } else { + // Alert the child protocol that these bytes came in. + // NOTE: We're doing the work in a separate thread + // we tell them, and they need to be smart enough to see if this is a complete message or not + libp2p_yamux_notify_child_stream_has_data(channelContext); + /* + struct StreamMessage* message = NULL; + if (channelContext->child_stream->read(channelContext->child_stream->stream_context, &message, 5) && message != NULL) { + int retVal = libp2p_protocol_marshal(message, channelContext->child_stream, channelContext->yamux_context->protocol_handlers); + libp2p_stream_message_free(message); + if (retVal < 0) + return 0; + } + */ + } return incoming_size; } default: diff --git a/yamux/yamux.c b/yamux/yamux.c index b368695..098a884 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -207,6 +207,43 @@ int yamux_more_to_read(struct StreamMessage* incoming) { return 0; } +int libp2p_yamux_channel_read(void* stream_context, struct StreamMessage** message, int timeout_secs) { + if (stream_context == NULL) { + libp2p_logger_error("yamux", "channel_read: stream context null.\n"); + return 0; + } + struct YamuxChannelContext* context = libp2p_yamux_get_channel_context(stream_context); + if (context == NULL) { + libp2p_logger_error("yamux", "channel_read: stream_context not a channel context\n"); + return 0; + } + // reserve the necessary memory + *message = libp2p_stream_message_new(); + struct StreamMessage* msg = *message; + if (msg == NULL) { + libp2p_logger_error("yamux", "channel_read: Unable to allocate memory for message struct.\n"); + return 0; + } + msg->data_size = context->buffer->buffer_size; + if (msg->data_size == 0) { + libp2p_logger_debug("yamux", "channel_read: Nothing to read.\n"); + libp2p_stream_message_free(msg); + *message = NULL; + return 0; + } + msg->data = (uint8_t*) malloc(msg->data_size); + if (msg->data == NULL) { + libp2p_logger_error("yamux", "chanel_read: Unable to allocate memory for message data.\n"); + libp2p_stream_message_free(msg); + *message = NULL; + return 0; + } + // ok, we have our struct. Now fill it + msg->data_size = threadsafe_buffer_read(context->buffer, msg->data, msg->data_size); + libp2p_logger_debug("yamux", "channel_read: Read %d bytes from buffer.\n", msg->data_size); + return msg->data_size; +} + /** * Read from the network, expecting a yamux frame. * NOTE: This will also dispatch the frame to the correct protocol @@ -220,18 +257,14 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int libp2p_logger_error("yamux", "read was passed a null context.\n"); return 0; } - // look at the first byte of the context to determine if this is a YamuxContext (we're negotiating) - // or a YamuxChannelContext (we're talking to an established channel) - struct YamuxContext* ctx = NULL; - struct YamuxChannelContext* channel = NULL; - char proto = ((uint8_t*)stream_context)[0]; - if (proto == YAMUX_CHANNEL_CONTEXT) { - channel = (struct YamuxChannelContext*)stream_context; - ctx = channel->yamux_context; - } else if (proto == YAMUX_CONTEXT) { - ctx = (struct YamuxContext*)stream_context; + struct YamuxChannelContext* channel = libp2p_yamux_get_channel_context(stream_context); + if (channel != NULL) { + // do a channel read instead + return libp2p_yamux_channel_read(stream_context, message, timeout_secs); } + struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context); + if (ctx == NULL) { libp2p_logger_error("yamux", "read: The incoming stream is not a yamux stream.\n"); return 0; @@ -257,66 +290,47 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int } struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); - if (channel != NULL && channel->channel != 0) { - // I don't think this will ever be the case. This I believe to be dead code - libp2p_logger_debug("yamux", "Data received on yamux stream %d.\n", channel->channel); - // we have an established channel. Use it. - if (!parent_stream->read(parent_stream->stream_context, message, yamux_default_timeout)) { - libp2p_logger_error("yamux", "Read: Attepted to read from channel %d, but the read failed.\n", channel->channel); - return 0; + // this is the normal situation (not dead code). + struct StreamMessage* incoming = NULL; + // we need a lock + if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { + libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size); + // This could be a data frame with the actual data coming later. Yuck. + // JMJ in the case of an incomplete buffer, the next read should be the data. This must + // be true, as the next data does not have a frame. We should only read the bytes we need. + int moreToRead = yamux_more_to_read(incoming); + if (moreToRead > 0) { + uint8_t buffer[moreToRead]; + if (parent_stream->read_raw(parent_stream->stream_context, buffer, moreToRead, timeout_secs) == moreToRead) { + // we have the bytes we need + uint8_t* new_buffer = (uint8_t*) malloc(incoming->data_size + moreToRead); + memcpy(new_buffer, incoming->data, incoming->data_size); + memcpy(&new_buffer[incoming->data_size], buffer, moreToRead); + incoming->data_size += moreToRead; + free(incoming->data); + incoming->data = new_buffer; + } else { + // we didn't get the bytes we needed + return 0; + } } - if (message == NULL) { - libp2p_logger_error("yamux", "Read: Successfully read from channel %d, but message was NULL.\n", channel->channel); - } - // TODO: This is not right. It must be sorted out. - struct StreamMessage* msg = *message; - libp2p_logger_debug("yamux", "Read: Received %d bytes on channel %d.\n", msg->data_size, channel->channel); - if (yamux_decode(channel, msg->data, msg->data_size, message) == 0) { + // parse the frame. This is where the work happens. + if (yamux_decode(ctx, incoming->data, incoming->data_size, message) >= 0) { + libp2p_stream_message_free(incoming); + // The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled + if (*message != NULL && (*message)->data_size == 0) { + libp2p_stream_message_free(*message); + *message = NULL; + return 0; + } return 1; } libp2p_logger_error("yamux", "yamux_decode returned error.\n"); - } else if (ctx != NULL) { - // this is the normal situation (not dead code). - struct StreamMessage* incoming = NULL; - // we need a lock - if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { - libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size); - // This could be a data frame with the actual data coming later. Yuck. - // JMJ in the case of an incomplete buffer, the next read should be the data. This must - // be true, as the next data does not have a frame. We should only read the bytes we need. - int moreToRead = yamux_more_to_read(incoming); - if (moreToRead > 0) { - uint8_t buffer[moreToRead]; - if (parent_stream->read_raw(parent_stream->stream_context, buffer, moreToRead, timeout_secs) == moreToRead) { - // we have the bytes we need - uint8_t* new_buffer = (uint8_t*) malloc(incoming->data_size + moreToRead); - memcpy(new_buffer, incoming->data, incoming->data_size); - memcpy(&new_buffer[incoming->data_size], buffer, moreToRead); - incoming->data_size += moreToRead; - free(incoming->data); - incoming->data = new_buffer; - } else { - // we didn't get the bytes we needed - return 0; - } - } - // parse the frame. This is where the work happens. - if (yamux_decode(ctx, incoming->data, incoming->data_size, message) >= 0) { - libp2p_stream_message_free(incoming); - // The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled - if (*message != NULL && (*message)->data_size == 0) { - libp2p_stream_message_free(*message); - *message = NULL; - return 0; - } - return 1; - } - libp2p_logger_error("yamux", "yamux_decode returned error.\n"); - libp2p_stream_message_free(incoming); - } else { - // read failed - } + libp2p_stream_message_free(incoming); + } else { + // read failed } + libp2p_logger_error("yamux", "Unable to do network read.\n"); return 0; } @@ -439,6 +453,29 @@ int libp2p_yamux_peek(void* stream_context) { return parent_stream->peek(parent_stream->stream_context); } +/*** + * Read from the yamux channel buffer, mimics a network read + * @param stream_context a YamuxChannelContext + * @param buffer where to put the results + * @param buffer_size the size of the buffer + * @param timeout_secs how long to wait (currently unused) + * @returns the number of bytes placed into the buffer + */ +int libp2p_yamux_channel_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) { + if (stream_context == NULL) + return 0; + struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream_context); + if (channelContext == NULL) + return 0; + // wait to see if we get the bytes we need + int counter = 0; + while (channelContext->buffer->buffer_size < buffer_size && counter < timeout_secs) { + sleep(1); + counter++; + } + return threadsafe_buffer_read(channelContext->buffer, buffer, buffer_size); +} + /*** * Read from the network, and place it in the buffer * NOTE: This may put something in the internal read buffer (i.e. buffer_size is too small) @@ -452,6 +489,11 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size if (stream_context == NULL) { return -1; } + struct YamuxChannelContext* channel_context = libp2p_yamux_get_channel_context(stream_context); + if (channel_context != NULL) { + // do a read_raw on a channel + return libp2p_yamux_channel_read_raw(stream_context, buffer, buffer_size, timeout_secs); + } struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context); if (ctx->buffered_message_pos == -1 || ctx->buffered_message == NULL) { // we need to get info from the network @@ -532,12 +574,23 @@ int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_ } libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type); } - struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; - return libp2p_yamux_stream_add(yamux_context, new_stream); -} - -void libp2p_yamux_read_from_yamux_session(struct yamux_stream* stream, uint32_t data_len, void* data) { - + struct YamuxContext* yamux_context = libp2p_yamux_get_context(yamux_stream->stream_context); + struct YamuxChannelContext* yamux_channel_context = libp2p_yamux_get_channel_context(yamux_stream->stream_context); + if (yamux_channel_context != NULL) { + // they've asked to upgrade on a channel. Make them the new default stream for this channel + yamux_channel_context->child_stream = new_stream; + struct yamux_session_stream* yamux_session_stream = yamux_get_session_stream(yamux_channel_context->yamux_context->session, yamux_channel_context->channel); + if (yamux_session_stream == NULL) { + libp2p_logger_error("yamux", "Unable to get correct session stream.\n"); + return 0; + } + yamux_session_stream->stream->stream = new_stream; + return 1; + } else { + // they've asked to upgrade on the main channel. I don't think this should never happen. + libp2p_logger_debug("yamux", "handle_upgrade: Attempt to upgrade on the main yamux channel"); + return libp2p_yamux_stream_add(yamux_context, new_stream); + } } /*** @@ -763,7 +816,11 @@ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, i ctx->window_size = 0; ctx->type = YAMUX_CHANNEL_CONTEXT; ctx->stream = out; + ctx->buffer = threadsafe_buffer_context_new(); + ctx->read_running = 0; out->stream_context = ctx; + out->handle_upgrade = libp2p_yamux_handle_upgrade; + out->channel = channelNumber; } return out; }