diff --git a/conn/dialer.c b/conn/dialer.c index cc70180..758edca 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -143,13 +143,35 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer // yamux over multistream new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, dialer->swarm->protocol_handlers); if (new_stream != NULL) { - if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) + if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) { + libp2p_logger_error("dialer", "Unable to get yamux into the ready status.\n"); return 0; + } libp2p_logger_debug("dialer", "We successfully negotiated yamux.\n"); - //peer->sessionContext->default_stream = new_stream; + // the rest should be done on another thread // we have our swarm connection. Now we ask for some "channels" - // id over yamux - //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); + // id over multistream over yamux + const struct Libp2pProtocolHandler* handler = libp2p_protocol_get_handler(dialer->swarm->protocol_handlers, "/ipfs/id/1.0.0\n"); + if (handler != NULL) { + Identify* identify = handler->context; + if (peer->sessionContext->default_stream->stream_type == STREAM_TYPE_YAMUX) { + struct YamuxContext* ctx = libp2p_yamux_get_context(peer->sessionContext->default_stream->stream_context); + // first get a new frame. It should be ready to go + struct Stream* new_channel = yamux_channel_new(ctx, 0, NULL); + if (new_channel != NULL && new_channel->channel > 0) { + // then get a multistream + struct Stream* yamux_multistream = libp2p_net_multistream_stream_new(new_channel, 0); + if (!libp2p_net_multistream_ready(new_channel->stream_context, 10)) { + libp2p_logger_error("dialer", "Unable to get multistream over yamux into the ready status.\n"); + return 0; + } + // then get an identify + struct Stream* identify_stream = libp2p_identify_stream_new(yamux_multistream, identify, 1); + } + } else { + libp2p_logger_error("dialer", "Expected a yamux context, but got a context of type %d.\n", peer->sessionContext->default_stream->stream_type); + } + } // kademlia over yamux //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_kademlia_stream_new(new_stream)); // circuit relay over yamux diff --git a/identify/identify.c b/identify/identify.c index f8a0028..207575d 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -10,6 +10,7 @@ #include "libp2p/conn/session.h" #include "libp2p/identify/identify.h" #include "libp2p/utils/logger.h" +#include "libp2p/yamux/yamux.h" @@ -41,16 +42,20 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) { * @param context the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_send_protocol(struct Stream *stream, Identify* identify) { +int libp2p_identify_send_protocol(struct Stream *stream, Identify* identify, int initiatedByUs) { size_t max_buffer_size = 6000; uint8_t buffer[max_buffer_size]; char *protocol = "/ipfs/id/1.0.0\n"; int protocol_len = strlen(protocol); // throw the protocol into the buffer memcpy(&buffer[0], protocol, protocol_len); - if (!libp2p_identify_protobuf_encode(identify, &buffer[protocol_len], max_buffer_size-protocol_len, &max_buffer_size)) { - libp2p_logger_error("identify", "Unable to protobuf the identity.\n"); - return 0; + if (!initiatedByUs) { + if (!libp2p_identify_protobuf_encode(identify, &buffer[protocol_len], max_buffer_size-protocol_len, &max_buffer_size)) { + libp2p_logger_error("identify", "Unable to protobuf the identity.\n"); + return 0; + } + } else { + max_buffer_size = 0; } struct StreamMessage msg; msg.data_size = protocol_len + max_buffer_size; @@ -331,13 +336,26 @@ exit: * @returns <0 on error, 0 if loop should not continue, >0 on success */ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { - // attempt to create a new Identify connection with them. - // send the protocol id back, and set up the channel - Identify* identify = (Identify*)protocol_context; - struct Stream* new_stream = libp2p_identify_stream_new(stream, identify); - if (new_stream == NULL) - return -1; - return stream->handle_upgrade(stream, new_stream); + struct YamuxChannelContext* yamuxChannelContext = libp2p_yamux_get_parent_channel_context(stream); + struct YamuxContext* yamuxContext = libp2p_yamux_get_context(yamuxChannelContext); + struct Stream* new_stream = NULL; + if (yamuxChannelContext != NULL && yamuxChannelContext->child_stream->stream_type == STREAM_TYPE_YAMUX) { + if ( (yamuxContext->am_server && yamuxChannelContext->channel % 2 == 0) + || (!yamuxContext->am_server && yamuxChannelContext->channel % 2 == 1)) { + // we initiated it, and they are replying + return 1; + } + } else { + // they initiated it. + // attempt to create a new Identify connection with them. + // send the protocol id back, and set up the channel + Identify* identify = (Identify*)protocol_context; + new_stream = libp2p_identify_stream_new(stream, identify, 0); + if (new_stream == NULL) + return -1; + return stream->handle_upgrade(stream, new_stream); + } + return 1; } /** @@ -377,6 +395,17 @@ int libp2p_identify_close(struct Stream* stream) { return 1; } +int libp2p_identify_read(void* stream_context, struct StreamMessage** msg, int timeout_secs) { + struct IdentifyContext* ctx = (struct IdentifyContext*) stream_context; + struct StreamMessage* internal = NULL; + if (ctx->parent_stream->read(ctx->parent_stream->stream_context, &internal, timeout_secs)) { + // we got an identity, but we should not do anything with it right now + // TODO: send a fin + libp2p_stream_message_free(internal); + } + return 0; +} + /*** * Create a new stream that negotiates the identify protocol * @@ -387,7 +416,7 @@ int libp2p_identify_close(struct Stream* stream) { * @param parent_stream the parent stream * @returns a new Stream that can talk "identify" */ -struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify) { +struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify, int initiatedByUs) { if (parent_stream == NULL) return NULL; struct Stream* out = libp2p_stream_new(); @@ -404,8 +433,9 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify out->close = libp2p_identify_close; out->negotiate = NULL; out->bytes_waiting = NULL; + out->read = libp2p_identify_read; // do we expect a reply? - if (!libp2p_identify_send_protocol(parent_stream, identify) /* || !libp2p_identify_receive_protocol(parent_stream) */) { + if (!libp2p_identify_send_protocol(parent_stream, identify, initiatedByUs) /* || !libp2p_identify_receive_protocol(parent_stream) */) { libp2p_stream_free(out); free(ctx); return NULL; @@ -425,6 +455,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify * @param parent_stream the parent stream * @returns a new Stream that is a multistream, but with "identify" already negotiated */ +/* struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent_stream) { if (parent_stream == NULL) return NULL; @@ -451,3 +482,4 @@ struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent } return out; } +*/ diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index 209b01d..1dc435e 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -33,7 +33,7 @@ struct IdentifyContext { }; int libp2p_identify_can_handle(const struct StreamMessage* msg); -int libp2p_identify_send_protocol(struct Stream* stream, Identify* identify); +int libp2p_identify_send_protocol(struct Stream* stream, Identify* identify, int initiatedByUs); int libp2p_identify_receive_protocol(struct Stream* stream); Identify* libp2p_identify_new(); void libp2p_identify_free(Identify* in); @@ -55,5 +55,5 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(char* publi * @param parent_stream the parent stream * @returns a new Stream that can talk "identify" */ -struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify); +struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify, int initiatedByUs); diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 9188c98..30cb279 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -123,9 +123,9 @@ void libp2p_net_multistream_stream_free(struct Stream* stream); /*** * Wait for multistream stream to become ready - * @param session_context the session context to check + * @param context the session context to check, can also be a YamuxChannelContext * @param timeout_secs the number of seconds to wait for things to become ready * @returns true(1) if it becomes ready, false(0) otherwise */ -int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs); +int libp2p_net_multistream_ready(void* context, int timeout_secs); diff --git a/include/libp2p/net/protocol.h b/include/libp2p/net/protocol.h index a85df4c..0e5f820 100644 --- a/include/libp2p/net/protocol.h +++ b/include/libp2p/net/protocol.h @@ -70,3 +70,11 @@ int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers); * @param handlers the vector of handlers */ int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers); + +/*** + * Retrieve the correct protocol handlder for a particular protocol id + * @param protocol_handlers the collection of protocol handlers + * @param id the protocol id + * @returns a protocol handler that can handle id (or NULL if none found) + */ +const struct Libp2pProtocolHandler* libp2p_protocol_get_handler(struct Libp2pVector* protocol_handlers, const char* id); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 1f182ff..2ca7e77 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -129,3 +129,24 @@ struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incomin * @returns true(1) if it becomes ready, false(0) otherwise */ int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs); + +/** + * Given a context, get the YamuxChannelContext + * @param stream_context the context + * @returns the YamuxChannelContext or NULL if there was none + */ +struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_context); + +/*** + * Given a context, get the YamuxContext + * @param stream_context a YamuxChannelContext or a YamuxContext + * @returns the YamuxContext, or NULL on error + */ +struct YamuxContext* libp2p_yamux_get_context(void* stream_context); + +/*** + * Walks down the tree, looking for the nearest YamuxChannelContext + * @param in the stream + * @returns the YamuxChannelContext or NULL + */ +struct YamuxChannelContext* libp2p_yamux_get_parent_channel_context(struct Stream* in); diff --git a/net/multistream.c b/net/multistream.c index a9f5198..715a7d1 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -16,6 +16,7 @@ #include "libp2p/net/multistream.h" #include "libp2p/utils/logger.h" #include "multiaddr/multiaddr.h" +#include "libp2p/yamux/yamux.h" // NOTE: this is normally set to 5 seconds, but you may want to increase this during debugging int multistream_default_timeout = 5; @@ -189,26 +190,43 @@ int libp2p_net_multistream_write_without_check(void* stream_context, struct Stre return num_bytes; } +int multistream_wait(struct Stream* stream, int timeout_secs) { + int counter = 0; + struct MultistreamContext* ctx = (struct MultistreamContext*)stream->stream_context; + while (ctx->status != multistream_status_ack && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (ctx->status == multistream_status_ack) + return 1; + return 0; +} + /*** * Wait for multistream stream to become ready * @param session_context the session context to check * @param timeout_secs the number of seconds to wait for things to become ready * @returns true(1) if it becomes ready, false(0) otherwise */ -int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs) { +int libp2p_net_multistream_ready(void* context, int timeout_secs) { int counter = 0; - while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) { - counter++; - sleep(1); - } - if (session_context->default_stream->stream_type == STREAM_TYPE_MULTISTREAM && counter < 5) { - struct MultistreamContext* ctx = (struct MultistreamContext*)session_context->default_stream->stream_context; - while (ctx->status != multistream_status_ack && counter <= timeout_secs) { + struct YamuxChannelContext* yamuxChannelContext = libp2p_yamux_get_channel_context(context); + if (yamuxChannelContext != NULL) { + while (yamuxChannelContext->child_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) { counter++; sleep(1); } - if (ctx->status == multistream_status_ack) - return 1; + if (counter <= 5) + return multistream_wait(yamuxChannelContext->child_stream, timeout_secs - counter); + } else { + struct SessionContext* session_context = (struct SessionContext*)context; + while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (counter <= 5) { + return multistream_wait(session_context->default_stream, timeout_secs - counter); + } } return 0; } diff --git a/net/protocol.c b/net/protocol.c index e75ba2e..63f4574 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -7,6 +7,7 @@ * Handle the different protocols */ + /*** * Compare incoming to see if they are requesting a protocol upgrade * @param incoming the incoming string @@ -26,6 +27,19 @@ const struct Libp2pProtocolHandler* protocol_compare(struct StreamMessage* msg, return NULL; } +/*** + * Retrieve the correct protocol handlder for a particular protocol id + * @param protocol_handlers the collection of protocol handlers + * @param id the protocol id + * @returns a protocol handler that can handle id (or NULL if none found) + */ +const struct Libp2pProtocolHandler* libp2p_protocol_get_handler(struct Libp2pVector* protocol_handlers, const char* id) { + struct StreamMessage message; + message.data_size = strlen(id); + message.data = (uint8_t*)id; + return protocol_compare(&message, protocol_handlers); +} + /** * Allocate resources for a new Libp2pProtocolHandler * @returns an allocated struct @@ -50,6 +64,16 @@ void libp2p_protocol_handler_free(struct Libp2pProtocolHandler* handler) { free(handler); } +int appears_to_be_a_protocol(struct StreamMessage* msg) { + if (msg == NULL) + return 0; + if (msg->data_size < 2) + return 0; + if (memchr(&msg->data[1], '\n', msg->data_size-1) != NULL) + return 1; + return 0; +} + /*** * Handle an incoming message * @param message the incoming message @@ -61,6 +85,12 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, st const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers); if (handler == NULL) { + if (appears_to_be_a_protocol(msg)) { + struct StreamMessage na_message; + na_message.data = (uint8_t*)"na\n"; + na_message.data_size = 3; + stream->write(stream->stream_context, &na_message); + } // set the msg->error code msg->error_number = 100; return -1; diff --git a/swarm/swarm.c b/swarm/swarm.c index 2f1ab2e..24c9740 100644 --- a/swarm/swarm.c +++ b/swarm/swarm.c @@ -20,6 +20,7 @@ struct SwarmSession { int DEFAULT_NETWORK_TIMEOUT = 5; + /*** * Listens on a particular stream, and marshals the request * @param stream the stream to listen to @@ -29,6 +30,8 @@ int DEFAULT_NETWORK_TIMEOUT = 5; int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* protocol_handlers) { struct StreamMessage* results = NULL; int retVal = 0; + if (stream == NULL) + return -1; // Read from the network libp2p_logger_debug("swarm", "Attempting to get read lock.\n"); pthread_mutex_lock(stream->socket_mutex); diff --git a/test/test_yamux.h b/test/test_yamux.h index 00947a9..3a4db1b 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -140,7 +140,7 @@ int test_yamux_identify() { goto exit; // Now add in another protocol mock_stream->read = mock_identify_read_protocol; - if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream, handler->context))) { + if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream, handler->context, 1))) { goto exit; } // tear down diff --git a/yamux/session.c b/yamux/session.c index 5e16f97..2ede61b 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -282,7 +282,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s libp2p_logger_debug("yamux", "We found our stream id of %d.\n", f.streamid); if (f.flags & yamux_frame_rst) { - libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n"); + libp2p_logger_debug("yamux", "They are asking that stream %d be reset.\n", f.streamid); // close the stream s->state = yamux_stream_closed; @@ -291,10 +291,10 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } else if (f.flags & yamux_frame_fin) { - libp2p_logger_debug("yamux", "They are asking that this stream be closed.\n"); + libp2p_logger_debug("yamux", "They are asking that stream %d be closed.\n", f.streamid); // local stream didn't initiate FIN if (s->state != yamux_stream_closing) - yamux_stream_close(context); + yamux_stream_close(libp2p_yamux_get_parent_channel_context(s->stream)); s->state = yamux_stream_closed; @@ -303,7 +303,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } else if (f.flags & yamux_frame_ack) { - libp2p_logger_debug("yamux", "They sent an ack.\n"); + libp2p_logger_debug("yamux", "They sent an ack for stream %d.\n", f.streamid); // acknowldegement if (s->state != yamux_stream_syn_sent) { libp2p_logger_debug("yamux", "We received an ack, but it seems we never sent anything!\n"); @@ -317,15 +317,10 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s return -EPROTO; } - libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size); - if (f.streamid == 2) { - ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); - libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re); - return (re < 0) ? re : (re + incoming_size); - } else { - libp2p_logger_debug("yamux", "Only handling stream 2 for now.\n"); - return 0; - } + libp2p_logger_debug("yamux", "Processing the data after the frame for stream %d, which is %d bytes.\n", f.streamid, incoming_size - frame_size); + ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); + libp2p_logger_debug("yamux", "decode: yamux_stream_process for stream %d returned %d.\n", f.streamid, (int)re); + return (re < 0) ? re : (re + incoming_size); //yamux_pull_message_from_frame(incoming, incoming_size, return_message); } // stream id matches } @@ -339,52 +334,44 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { - // JMJ just debugging stream 2 for now - if (f.streamid == 2) { - libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid); - struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); - if (yamuxChannelStream == NULL) { - libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid); - return -EPROTO; - } - struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; - - if (yamux_session->new_stream_fn) { - libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid); - yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); - } - // handle window update (if there is one) - struct yamux_session_stream ss = yamux_session->streams[f.streamid]; - ss.alive = 1; - ss.stream = yamux_stream_new(); - ss.stream->id = f.streamid; - ss.stream->session = yamux_session; - ss.stream->state = yamux_stream_syn_recv; - ss.stream->window_size = 0; - yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); - channelContext->state = yamux_stream_syn_recv; - if (f.type == yamux_frame_window_update) { - libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid); - // send it back - yamux_stream_window_update(channelContext, ss.stream->window_size); - } - // TODO: Start negotiations of multistream - struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); - if (multistream != NULL) { - libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid); - // this should already be done - // channelContext->child_stream = multistream; - } else { - libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); - } - } else { - libp2p_logger_error("yamux", "Temporarily not doing anthing for streams other than stream 2.\n"); - return 0; - } - + libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid); + struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); + if (yamuxChannelStream == NULL) { + libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid); + return -EPROTO; + } + struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; + if (yamux_session->new_stream_fn) { + libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid); + yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); + } + // handle window update (if there is one) + struct yamux_session_stream ss = yamux_session->streams[f.streamid]; + ss.alive = 1; + ss.stream = yamux_stream_new(); + ss.stream->id = f.streamid; + ss.stream->session = yamux_session; + ss.stream->state = yamux_stream_syn_recv; + ss.stream->window_size = 0; + yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); + channelContext->state = yamux_stream_syn_recv; + if (f.type == yamux_frame_window_update) { + libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid); + // send it back + yamux_stream_window_update(channelContext, ss.stream->window_size); + } + // TODO: Start negotiations of multistream + struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); + if (multistream != NULL) { + libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid); + // this should already be done + // channelContext->child_stream = multistream; + } else { + libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); + } } else { - libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); - } + libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); + } } else { libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right."); @@ -404,7 +391,7 @@ struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* sess for (size_t i = 0; i < session->cap_streams; ++i) { struct yamux_session_stream* ss = &session->streams[i]; - if (ss->stream->stream->channel == channel) + if (ss != NULL && ss->stream != NULL && ss->stream->stream != NULL && ss->stream->stream->channel == channel) return ss; } return NULL; diff --git a/yamux/stream.c b/yamux/stream.c index 1df15fb..973de4a 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -92,7 +92,7 @@ FOUND:; // success } */ - struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream, id); + struct Stream* channelStream = nst.stream; struct YamuxChannelContext* channel = (struct YamuxChannelContext*)channelStream->stream_context; channel->channel = id; channel->child_stream = NULL; @@ -152,6 +152,9 @@ ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx) */ ssize_t yamux_stream_close(void* context) { + if (context == NULL) + return 0; + if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) { struct YamuxChannelContext* channel_ctx = (struct YamuxChannelContext*) context; if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed) @@ -373,23 +376,31 @@ struct yamux_stream* yamux_stream_new() { */ void* yamux_read_method(void* args) { struct YamuxChannelContext* context = (struct YamuxChannelContext*) args; + context->read_running = 1; struct StreamMessage* message = NULL; // continue to read until the buffer is empty while (context->buffer->buffer_size > 0) { - if (!context->read_running) { - context->read_running = 1; - if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) { - context->read_running = 0; - libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data); - int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers); - libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal); - libp2p_stream_message_free(message); - } else { - context->read_running = 0; - libp2p_logger_debug("yamux", "read_method: read returned false.\n"); - } + if (context->child_stream == NULL || context->child_stream->stream_context == NULL || context->child_stream->read == NULL) { + libp2p_logger_error("yamux", "read_method: Child stream not set up properly for channel %d.\n", context->channel); + context->read_running = 0; + return NULL; + } + struct Stream* child_stream = context->child_stream; + if (child_stream == NULL || child_stream->read == NULL) { + libp2p_logger_error("yamux", "read_method: Child stream not set up properly for channel %d.\n", context->channel); + context->read_running = 0; + return NULL; + } + if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) { + libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data); + int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers); + libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal); + libp2p_stream_message_free(message); + } else { + libp2p_logger_debug("yamux", "read_method: read returned false.\n"); } } + context->read_running = 0; return NULL; } @@ -398,10 +409,12 @@ void* yamux_read_method(void* args) { * @param context the YamuxChannelContext */ int libp2p_yamux_notify_child_stream_has_data(struct YamuxChannelContext* context) { - pthread_t new_thread; + if (!context->read_running) { + pthread_t new_thread; - if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0) - return 1; + if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0) + return 1; + } return 0; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 098a884..141c757 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -13,6 +13,20 @@ int libp2p_yamux_channels_free(struct YamuxContext* ctx); struct Stream* libp2p_yamux_get_parent_stream(void* context); +/*** + * Walks down the tree, looking for the nearest YamuxChannelContext + * @param in the stream + * @returns the YamuxChannelContext or NULL + */ +struct YamuxChannelContext* libp2p_yamux_get_parent_channel_context(struct Stream* in) { + if (in == NULL) + return NULL; + struct YamuxChannelContext* retVal = libp2p_yamux_get_channel_context(in->stream_context); + if (retVal == NULL) { + return libp2p_yamux_get_parent_channel_context(in->parent_stream); + } + return NULL; +} /** * Given a context, get the YamuxChannelContext * @param stream_context the context @@ -32,6 +46,8 @@ struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_contex * @returns the YamuxContext, or NULL on error */ struct YamuxContext* libp2p_yamux_get_context(void* stream_context) { + if (stream_context == NULL) + return NULL; char proto = ((uint8_t*)stream_context)[0]; struct YamuxChannelContext* channel = NULL; struct YamuxContext* ctx = NULL; @@ -527,13 +543,13 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size * @param stream the parent stream * @returns a YamuxContext */ -struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) { +struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream, int amServer) { struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); if (ctx != NULL) { ctx->type = YAMUX_CONTEXT; ctx->stream = NULL; ctx->channels = libp2p_utils_vector_new(1); - ctx->session = yamux_session_new(NULL, stream, yamux_session_server, NULL); + ctx->session = yamux_session_new(NULL, stream, amServer ? yamux_session_server : yamux_session_client, NULL); ctx->am_server = 0; ctx->state = 0; ctx->buffered_message = NULL; @@ -568,11 +584,7 @@ int libp2p_yamux_send_protocol(struct Stream* stream) { int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) { // put this stream in the collection, and tie it to an id if (libp2p_logger_watching_class("yamux")) { - const char* stream_type = ""; - if (new_stream->stream_type == STREAM_TYPE_MULTISTREAM) { - stream_type = "Multistream"; - } - libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type); + libp2p_logger_debug("yamux", "handle_upgrade called for stream type %d.\n", new_stream->stream_type); } struct YamuxContext* yamux_context = libp2p_yamux_get_context(yamux_stream->stream_context); struct YamuxChannelContext* yamux_channel_context = libp2p_yamux_get_channel_context(yamux_stream->stream_context); @@ -624,7 +636,7 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv out->address = parent_stream->address; out->socket_mutex = parent_stream->socket_mutex; // build YamuxContext - struct YamuxContext* ctx = libp2p_yamux_context_new(out); + struct YamuxContext* ctx = libp2p_yamux_context_new(out, am_server); if (ctx == NULL) { libp2p_yamux_stream_free(out); return NULL; @@ -778,18 +790,15 @@ int libp2p_yamux_channel_null_close(struct Stream* stream) { * @returns a new Stream that has a YamuxChannelContext */ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber) { + struct YamuxContext* yamuxContext = libp2p_yamux_get_context(incoming_stream->stream_context); struct Stream* out = libp2p_stream_new(); if (out != NULL) { - int isYamux = 0; - char first_char = ((uint8_t*)incoming_stream->stream_context)[0]; - if (first_char == YAMUX_CONTEXT) - isYamux = 1; out->stream_type = STREAM_TYPE_YAMUX; out->address = incoming_stream->address; // don't allow the incoming_stream to close the channel out->close = libp2p_yamux_channel_null_close; struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); - if (!isYamux) { + if (yamuxContext == NULL) { out->parent_stream = incoming_stream->parent_stream; out->peek = incoming_stream->parent_stream->peek; out->read = incoming_stream->parent_stream->read; @@ -801,14 +810,14 @@ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, i // this does the wrap incoming_stream->parent_stream = out; } else { - out->parent_stream = incoming_stream; - out->peek = incoming_stream->peek; - out->read = incoming_stream->read; - out->read_raw = incoming_stream->read_raw; - out->write = incoming_stream->write; - out->socket_mutex = incoming_stream->socket_mutex; - ctx->yamux_context = incoming_stream->stream_context; - ctx->child_stream = NULL; + out->parent_stream = incoming_stream; + out->peek = incoming_stream->peek; + out->read = incoming_stream->read; + out->read_raw = incoming_stream->read_raw; + out->write = incoming_stream->write; + out->socket_mutex = incoming_stream->socket_mutex; + ctx->yamux_context = incoming_stream->stream_context; + ctx->child_stream = NULL; } ctx->channel = (uint32_t) channelNumber; ctx->closed = 0;