diff --git a/identify/identify.c b/identify/identify.c index a621e7e..d12b234 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -376,6 +376,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { out->stream_context = ctx; out->close = libp2p_identify_close; out->negotiate = libp2p_identify_stream_new; + out->handle_message = libp2p_identify_handle_message; // do we expect a reply? if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) { libp2p_stream_free(out); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 360e565..6aae6b3 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -130,6 +130,15 @@ struct Stream { * @returns a new Stream, or NULL on error */ struct Stream* (*negotiate)(struct Stream* parent_stream); + + /**** + * A message has been received, and needs to be handled + * @param message the message received + * @param stream where the message came from + * @param protocol_context the context for the protocol + * @returns < 0 on error, 0 if no further processing needs to be done, or 1 for success + */ + int (*handle_message)(const struct StreamMessage* message, struct Stream* stream, void* protocol_context); }; struct Stream* libp2p_stream_new(); diff --git a/net/multistream.c b/net/multistream.c index 28c182f..3f5f611 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -24,6 +24,10 @@ int multistream_default_timeout = 5; * An implementation of the libp2p multistream */ +// forward declarations +int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context); + + int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { if (msg == NULL || msg->data == NULL || msg->data_size == 0) return 0; @@ -482,6 +486,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->address = parent_stream->address; out->socket_mutex = parent_stream->socket_mutex; + out->handle_message = libp2p_net_multistream_handle_message; // build MultistreamContext struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); if (ctx == NULL) { diff --git a/yamux/session.c b/yamux/session.c index 1840022..7aaf702 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -267,6 +267,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } } } else { + libp2p_logger_debug("yamux", "yamux_decode: received something for yamux stream %d.\n", f.streamid); // we're handling a stream, not something at the yamux protocol level for (size_t i = 0; i < yamux_session->cap_streams; ++i) { @@ -278,7 +279,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s if (s->id == f.streamid) // we have a match between the stored stream and the current stream { - libp2p_logger_debug("yamux", "We found our stream id.\n"); + libp2p_logger_debug("yamux", "We found our stream id of %d.\n", f.streamid); if (f.flags & yamux_frame_rst) { libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n"); @@ -316,11 +317,12 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s return -EPROTO; } - libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); + libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size); ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); - yamux_pull_message_from_frame(incoming, incoming_size, return_message); + libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re); + //yamux_pull_message_from_frame(incoming, incoming_size, return_message); return (re < 0) ? re : (re + incoming_size); - } + } // stream id matches } // This stream is not in my list of streams. @@ -332,41 +334,47 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { - libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n"); - struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); - if (yamuxChannelStream == NULL) { - libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); - return -EPROTO; - } - struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; + // JMJ just debugging stream 2 for now + if (f.streamid == 2) { + libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid); + struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); + if (yamuxChannelStream == NULL) { + libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid); + return -EPROTO; + } + struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; - if (yamux_session->new_stream_fn) { - libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); - yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); + if (yamux_session->new_stream_fn) { + libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid); + yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); + } + // handle window update (if there is one) + struct yamux_session_stream ss = yamux_session->streams[f.streamid]; + ss.alive = 1; + ss.stream = yamux_stream_new(); + ss.stream->id = f.streamid; + ss.stream->session = yamux_session; + ss.stream->state = yamux_stream_syn_recv; + ss.stream->window_size = 0; + yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); + channelContext->state = yamux_stream_syn_recv; + if (f.type == yamux_frame_window_update) { + libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid); + // send it back + yamux_stream_window_update(channelContext, ss.stream->window_size); + } + // TODO: Start negotiations of multistream + struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); + if (multistream != NULL) { + libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid); + channelContext->child_stream = multistream; + } else { + libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); + } + } else { + libp2p_logger_error("yamux", "Temporarily not doing anthing for streams other than stream 2.\n"); + return 0; } - // handle window update (if there is one) - struct yamux_session_stream ss = yamux_session->streams[f.streamid]; - ss.alive = 1; - ss.stream = yamux_stream_new(); - ss.stream->id = f.streamid; - ss.stream->session = yamux_session; - ss.stream->state = yamux_stream_syn_recv; - ss.stream->window_size = 0; - yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); - channelContext->state = yamux_stream_syn_recv; - if (f.type == yamux_frame_window_update) { - libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid); - // send it back - yamux_stream_window_update(channelContext, ss.stream->window_size); - } - // TODO: Start negotiations of multistream - struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); - if (multistream != NULL) { - libp2p_logger_debug("yamux", "Successfully negotiated multistream on stream %d.\n", f.streamid); - channelContext->child_stream = multistream; - } else { - libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); - } } else { libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); diff --git a/yamux/stream.c b/yamux/stream.c index 9d23074..84cabdb 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -10,12 +10,14 @@ #include "libp2p/yamux/frame.h" #include "libp2p/yamux/stream.h" #include "libp2p/yamux/yamux.h" +#include "libp2p/utils/logger.h" #define MIN(x,y) (y^((x^y)&-(xwindow_size + (int64_t)(int32_t)f.length ); nws &= 0xFFFFFFFFLL; stream->window_size = (uint32_t)nws; @@ -386,11 +389,32 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr //no break case yamux_frame_data: { - if (incoming_size != (ssize_t)f.length) - return -1; + if (incoming_size != (ssize_t)f.length) { + if (f.type == yamux_frame_data) { + libp2p_logger_debug("yamux", "stream_process: They said we should look for frame data, but sizes don't match. They said %d and we see %d.\n", f.length, incoming_size); + return -1; + } + } + if (incoming_size == 0) + return 0; + + // the old way + /* if (stream->read_fn) stream->read_fn(stream, f.length, (void*)incoming); + */ + // the new way + struct StreamMessage stream_message; + stream_message.data_size = incoming_size; + stream_message.data = (uint8_t*)incoming; + libp2p_logger_debug("yamux", "Calling handle_message for stream type %d with message of %d bytes. [%s]\n", stream->stream->stream_type, stream_message.data_size, stream_message.data); + struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream->stream->stream_context); + if (channelContext == NULL) { + libp2p_logger_error("yamux", "Unable to get channel context for stream %d.\n", frame->streamid); + return -EPROTO; + } + channelContext->child_stream->handle_message(&stream_message, channelContext->child_stream, NULL); return incoming_size; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 5bed302..b368695 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -68,56 +68,6 @@ int yamux_can_handle(const struct StreamMessage* msg) { return 0; } -/** - * the yamux stream received some bytes. Process them - * @param stream the stream that the data came in on - * @param msg the message - * @param incoming the stream buffer - */ -/* -void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { - struct Libp2pVector* handlers = stream->userdata; - int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers); - if (retVal == -1) { - // TODO handle error condition - libp2p_logger_error("yamux", "Marshalling returned error.\n"); - } else if (retVal > 0) { - // TODO handle everything went okay - libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n"); - } else { - // TODO we've been told we shouldn't do anything anymore - libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n"); - } - return; -} -*/ - -/*** - * Check to see if the reply is the yamux protocol header we expect - * NOTE: if we initiate the connection, we should expect the same back - * @param context the SessionContext - * @returns true(1) on success, false(0) otherwise - */ -int yamux_receive_protocol(struct YamuxContext* context) { - char* protocol = "/yamux/1.0.0\n"; - struct StreamMessage* results = NULL; - int retVal = 0; - - if (!context->stream->parent_stream->read(context->stream->parent_stream->stream_context, &results, 30)) { - libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); - goto exit; - } - // the first byte is the size, so skip it - char* ptr = strstr((char*)&results->data[1], protocol); - if (ptr == NULL || ptr - (char*)results->data > 1) { - goto exit; - } - retVal = 1; - exit: - libp2p_stream_message_free(results); - return retVal; -} - /*** * The remote is attempting to negotiate yamux * @param msg the incoming message @@ -231,6 +181,32 @@ int libp2p_yamux_close(struct Stream* stream) { return 1; } +/*** + * Determine if the incoming is a data frame, but we need more data + * @param incoming the incoming message + * @returns > 0 if we need more data, 0 if not + */ +int yamux_more_to_read(struct StreamMessage* incoming) { + if (incoming == NULL) + return 0; + if (incoming->data_size < 12) { + return 0; + } + // get frame + struct yamux_frame* original_frame = (struct yamux_frame*)incoming->data; + struct yamux_frame* copy = (struct yamux_frame*) malloc(sizeof(struct yamux_frame)); + memcpy(copy, original_frame, sizeof(struct yamux_frame)); + decode_frame(copy); + if (copy->type == yamux_frame_data) { + libp2p_logger_debug("yamux", "Checking frame sizes. It says we should have %d, and I see %d.\n", copy->length, incoming->data_size - sizeof(struct yamux_frame)); + int retVal = copy->length - (incoming->data_size - sizeof(struct yamux_frame)); + free(copy); + return retVal; + } + free(copy); + return 0; +} + /** * Read from the network, expecting a yamux frame. * NOTE: This will also dispatch the frame to the correct protocol @@ -301,14 +277,31 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int libp2p_logger_error("yamux", "yamux_decode returned error.\n"); } else if (ctx != NULL) { // this is the normal situation (not dead code). - libp2p_logger_debug("yamux", "read: It looks like we're trying to negotiate a new protocol or received a yamux frame.\n"); struct StreamMessage* incoming = NULL; + // we need a lock if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size); + // This could be a data frame with the actual data coming later. Yuck. + // JMJ in the case of an incomplete buffer, the next read should be the data. This must + // be true, as the next data does not have a frame. We should only read the bytes we need. + int moreToRead = yamux_more_to_read(incoming); + if (moreToRead > 0) { + uint8_t buffer[moreToRead]; + if (parent_stream->read_raw(parent_stream->stream_context, buffer, moreToRead, timeout_secs) == moreToRead) { + // we have the bytes we need + uint8_t* new_buffer = (uint8_t*) malloc(incoming->data_size + moreToRead); + memcpy(new_buffer, incoming->data, incoming->data_size); + memcpy(&new_buffer[incoming->data_size], buffer, moreToRead); + incoming->data_size += moreToRead; + free(incoming->data); + incoming->data = new_buffer; + } else { + // we didn't get the bytes we needed + return 0; + } + } // parse the frame. This is where the work happens. - // JMJ: Maybe this should come back with a message to be processed further, along with some stream number to - // know who to dispatch it to. Or perhaps it should have all the information to handle it internally (better option) - if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) { + if (yamux_decode(ctx, incoming->data, incoming->data_size, message) >= 0) { libp2p_stream_message_free(incoming); // The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled if (*message != NULL && (*message)->data_size == 0) { @@ -320,6 +313,8 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int } libp2p_logger_error("yamux", "yamux_decode returned error.\n"); libp2p_stream_message_free(incoming); + } else { + // read failed } } libp2p_logger_error("yamux", "Unable to do network read.\n");