From 8551121bf8da2c8369f077e0975672a274672692 Mon Sep 17 00:00:00 2001 From: John Jones Date: Tue, 28 Nov 2017 23:41:46 -0500 Subject: [PATCH] More yamux fixes --- conn/dialer.c | 23 +++++----- include/libp2p/yamux/yamux.h | 8 ++++ net/multistream.c | 2 + swarm/swarm.c | 2 +- yamux/yamux.c | 85 ++++++++++++++++++++++++++++++++---- 5 files changed, 100 insertions(+), 20 deletions(-) diff --git a/conn/dialer.c b/conn/dialer.c index faa49e8..89fbc05 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -121,7 +121,6 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer // we're connected. start listening for responses libp2p_swarm_add_peer(dialer->swarm, peer); // wait for multistream - int counter = 0; if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) { return 0; } @@ -129,21 +128,25 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer if (new_stream != NULL) { // secio over multistream new_stream = libp2p_secio_stream_new(new_stream, dialer->peerstore, dialer->private_key); - counter = 0; - if (!libp2p_secio_ready(peer->sessionContext, 10) ) { - return 0; - } - counter = 0; if (new_stream != NULL) { - peer->sessionContext->default_stream = new_stream; + if (!libp2p_secio_ready(peer->sessionContext, 10) ) { + return 0; + } + libp2p_logger_debug("dialer", "We successfully negotiated secio.\n"); // multistream over secio - new_stream = libp2p_net_multistream_stream_new(new_stream, 0); + // Don't bother, as the other side requests multistream + //new_stream = libp2p_net_multistream_stream_new(new_stream, 0); if (new_stream != NULL) { - peer->sessionContext->default_stream = new_stream; + if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) + return 0; + libp2p_logger_debug("dialer", "We successfully negotiated multistream over secio.\n"); // yamux over multistream new_stream = libp2p_yamux_stream_new(new_stream, 0, NULL); if (new_stream != NULL) { - peer->sessionContext->default_stream = new_stream; + if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) + return 0; + libp2p_logger_debug("dialer", "We successfully negotiated yamux.\n"); + //peer->sessionContext->default_stream = new_stream; // we have our swarm connection. Now we ask for some "channels" // id over yamux //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index f57c20f..83f4162 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -116,3 +116,11 @@ void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx); * @returns a new StreamMessage that has a yamux_frame */ struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming); + +/*** + * Wait for yamux stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs); diff --git a/net/multistream.c b/net/multistream.c index 9795afb..28c182f 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -534,6 +534,8 @@ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struc // the incoming stream is not a multistream. They are attempting to upgrade to multistream struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1); if (new_stream != NULL) { + struct MultistreamContext* ctx = (struct MultistreamContext*)stream->stream_context; + ctx->status = multistream_status_ack; // upgrade return stream->handle_upgrade(stream, new_stream); } diff --git a/swarm/swarm.c b/swarm/swarm.c index c9ae58d..4059c21 100644 --- a/swarm/swarm.c +++ b/swarm/swarm.c @@ -36,7 +36,7 @@ int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* p if (!stream->read(stream->stream_context, &results, 1)) { libp2p_logger_debug("swarm", "Releasing read lock\n"); pthread_mutex_unlock(stream->socket_mutex); - libp2p_logger_error("swarm", "Unable to read from network. Exiting.\n"); + libp2p_logger_error("swarm", "Unable to read from network (could just be a timeout). Exiting the read.\n"); return retVal; } libp2p_logger_debug("swarm", "Releasing read lock.\n"); diff --git a/yamux/yamux.c b/yamux/yamux.c index 093a28c..12e96ad 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -143,12 +143,28 @@ int yamux_receive_protocol(struct YamuxContext* context) { * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { - struct Stream* new_stream = libp2p_yamux_stream_new(stream, 1, protocol_context); - if (new_stream == NULL) - return -1; - // upgrade - stream->handle_upgrade(stream, new_stream); - return 1; + if (stream->stream_type == STREAM_TYPE_YAMUX) { + struct YamuxContext* ctx = (struct YamuxContext*) stream->stream_context; + if (ctx->state == yamux_stream_est) { + // TODO: This is probably a frame. we need to handle this. + return -1; + } else { + //TODO: check to make sure they sent the yamux protocol id + // we sent a protocol ID, and this is them responding + ctx->state = yamux_stream_est; + } + return 1; + } + // the incoming stream is not yamux. They are attempting to upgrade to yamux + struct YamuxProtocolContext* yamuxProtocolContext = (struct YamuxProtocolContext*)protocol_context; + struct Stream* new_stream = libp2p_yamux_stream_new(stream, 0, yamuxProtocolContext->protocol_handlers); + if (new_stream != NULL) { + struct YamuxContext* ctx = (struct YamuxContext*) new_stream->stream_context; + ctx->state = yamux_stream_est; + // upgrade + return stream->handle_upgrade(stream, new_stream); + } + return -1; } /** @@ -365,6 +381,11 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { if (ctx == NULL && channel == NULL) return 0; + if (ctx->state != yamux_stream_est) { + struct Stream* parent_stream = ctx->stream->parent_stream; + return parent_stream->write(parent_stream->stream_context, message); + } + struct StreamMessage* outgoing_message = libp2p_yamux_prepare_to_send(message); // now convert fame for network use struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data; @@ -472,6 +493,20 @@ struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) { return ctx; } +/*** + * Write the protocol id for yamux to the stream + * @param stream the stream to write to + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_send_protocol(struct Stream* stream) { + const char* protocolID = "/yamux/1.0.0\n"; + struct StreamMessage outgoing; + outgoing.data_size = strlen(protocolID); + outgoing.data = (uint8_t*)protocolID; + outgoing.error_number = 0; + return stream->write(stream->stream_context, &outgoing); +} + int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { const char* protocolID = "/yamux/1.0.0\n"; struct StreamMessage outgoing; @@ -599,13 +634,14 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv ctx->stream = out; ctx->am_server = am_server; ctx->protocol_handlers = protocol_handlers; + ctx->state = yamux_stream_inited; + // tell protocol below that we want to upgrade + parent_stream->handle_upgrade(parent_stream, out); // attempt to negotiate yamux protocol - if (!libp2p_yamux_negotiate(ctx, am_server)) { + if (!libp2p_yamux_send_protocol(parent_stream)) { libp2p_yamux_stream_free(out); return NULL; } - // tell protocol below that we want to upgrade - parent_stream->handle_upgrade(parent_stream, out); } return out; } @@ -814,3 +850,34 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { } return 1; } + +/*** + * Wait for yamux stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs) { + int counter = 0; + while (session_context != NULL + && session_context->default_stream != NULL + && session_context->default_stream->stream_type != STREAM_TYPE_YAMUX + && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (session_context != NULL + && session_context->default_stream != NULL + && session_context->default_stream->stream_type == STREAM_TYPE_YAMUX + && counter < 5) { + struct YamuxContext* ctx = (struct YamuxContext*)session_context->default_stream->stream_context; + while (ctx->state != yamux_stream_est && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (ctx->state == yamux_stream_est) + return 1; + } + return 0; +} +