diff --git a/yamux/yamux.c b/yamux/yamux.c index 61a7db1..5bed302 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -92,22 +92,6 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { } */ -/*** - * Send the yamux protocol out the default stream - * NOTE: if we initiate the connection, we should expect the same back - * @param context the SessionContext - * @returns true(1) on success, false(0) otherwise - */ -int yamux_send_protocol(struct Stream* stream) { - char* protocol = "/yamux/1.0.0\n"; - struct StreamMessage outgoing; - outgoing.data = (uint8_t*)protocol; - outgoing.data_size = strlen(protocol); - if (!stream->write(stream->stream_context, &outgoing)) - return 0; - return 1; -} - /*** * Check to see if the reply is the yamux protocol header we expect * NOTE: if we initiate the connection, we should expect the same back @@ -143,6 +127,8 @@ int yamux_receive_protocol(struct YamuxContext* context) { * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + // get latest stream (multithreaded, so could be stale) + stream = libp2p_net_connection_get_session_context(stream)->default_stream; if (stream->stream_type == STREAM_TYPE_YAMUX) { struct YamuxContext* ctx = (struct YamuxContext*) stream->stream_context; if (ctx->state == yamux_stream_est) { @@ -270,9 +256,28 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int ctx = (struct YamuxContext*)stream_context; } + if (ctx == NULL) { + libp2p_logger_error("yamux", "read: The incoming stream is not a yamux stream.\n"); + return 0; + } + if (ctx->state != yamux_stream_est) { libp2p_logger_debug("yamux", "read: Yamux still not inited, so passing to lower protocol.\n"); - return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, timeout_secs); + // perhaps this is the yamux protocol id we've been expecting + int retVal = ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, timeout_secs); + libp2p_logger_debug("yamux", "read: we did the lower read, and received a %d.\n", retVal); + if (retVal > 0) { + struct StreamMessage* incoming_message = *message; + libp2p_logger_debug("yamux", "read: The lower read has a message of %d bytes that says: %s.\n", incoming_message->data_size, incoming_message->data); + if (strstr((char*)incoming_message->data, "/yamux/1.0.0") != NULL) { + libp2p_logger_debug("yamux", "read: We got the protocol we've been waiting for.\n"); + ctx->state = yamux_stream_est; + libp2p_stream_message_free(incoming_message); + *message = NULL; + return 0; + } + } + return retVal; } struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); @@ -507,6 +512,8 @@ struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) { * @returns true(1) on success, false(0) otherwise */ int libp2p_yamux_send_protocol(struct Stream* stream) { + // JMJ debug + libp2p_logger_debug("yamux", "Sending protocol through stream of type %d and channel %d.\n", stream->stream_type, stream->channel); const char* protocolID = "/yamux/1.0.0\n"; struct StreamMessage outgoing; outgoing.data_size = strlen(protocolID); @@ -515,70 +522,6 @@ int libp2p_yamux_send_protocol(struct Stream* stream) { return stream->write(stream->stream_context, &outgoing); } -int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { - const char* protocolID = "/yamux/1.0.0\n"; - struct StreamMessage outgoing; - struct StreamMessage* results = NULL; - int retVal = 0; - int haveTheirs = 0; - int peek_result = 0; - - // see if they're trying to send something first (only if we're the client) - if (!am_server) { - peek_result = libp2p_yamux_peek(ctx); - if (peek_result > 0) { - libp2p_logger_debug("yamux", "There is %d bytes waiting for us. Perhaps it is the yamux header we're expecting.\n", peek_result); - // get the protocol - ctx->stream->parent_stream->read(ctx->stream->parent_stream, &results, yamux_default_timeout); - if (results == NULL || results->data_size == 0) { - libp2p_logger_error("yamux", "We thought we had a yamux header, but we got nothing.\n"); - goto exit; - } - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { - libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); - goto exit; - } - libp2p_stream_message_free(results); - results = NULL; - haveTheirs = 1; - } - } - - // send the protocol id - outgoing.data = (uint8_t*)protocolID; - outgoing.data_size = strlen(protocolID); - libp2p_logger_debug("yamux", "Attempting to write the yamux protocol id as %s.\n", (am_server ? "server" : "client")); - if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) { - libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n"); - goto exit; - } - - // wait for them to send the protocol id back - if (!am_server && !haveTheirs) { - // expect the same back - ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, yamux_default_timeout); - if (results == NULL || results->data_size == 0) { - libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we got nothing.\n"); - goto exit; - } - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { - libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); - goto exit; - } - } - - //TODO: okay, we're almost done. Let incoming stuff be marshaled to the correct handler. - // this should be somewhat automatic, as they ask, and we negotiate - //TODO: we should open some streams with them (multistream, id, kademlia, relay) - // this is not automatic, as we need to start the negotiation process - - retVal = 1; - exit: - if (results != NULL) - libp2p_stream_message_free(results); - return retVal; -} - /*** * A new protocol was asked for. Give it a "channel" * @param yamux_stream the yamux stream