From 7c62bdfbb74a53f6151d4236093e9fa6842e29e8 Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 27 Nov 2017 09:06:33 -0500 Subject: [PATCH] debugging yamux and go --- conn/dialer.c | 2 +- identify/identify.c | 5 +++- net/multistream.c | 17 +++++++++++++ peer/peer.c | 24 ------------------- routing/dht_protocol.c | 2 ++ secio/secio.c | 2 ++ yamux/session.c | 54 ++++++++++++++++++++++++++++++++---------- yamux/yamux.c | 14 ++++++++++- 8 files changed, 81 insertions(+), 39 deletions(-) diff --git a/conn/dialer.c b/conn/dialer.c index 1d72f85..f99f95b 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -132,7 +132,7 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer peer->sessionContext->default_stream = new_stream; // we have our swarm connection. Now we ask for some "channels" // id over yamux - libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); + //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); // kademlia over yamux //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_kademlia_stream_new(new_stream)); // circuit relay over yamux diff --git a/identify/identify.c b/identify/identify.c index afe68fc..a621e7e 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -18,6 +18,8 @@ * @returns true(1) if it can handle this message, false(0) if not */ int libp2p_identify_can_handle(const struct StreamMessage* msg) { + if (msg == NULL || msg->data_size == 0 || msg->data == 0) + return 0; const char *protocol = "/ipfs/id/1.0.0\n"; int protocol_size = strlen(protocol); // is there a varint in front? @@ -374,7 +376,8 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { out->stream_context = ctx; out->close = libp2p_identify_close; out->negotiate = libp2p_identify_stream_new; - if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) { + // do we expect a reply? + if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) { libp2p_stream_free(out); free(ctx); return NULL; diff --git a/net/multistream.c b/net/multistream.c index 84e9f54..6dec817 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -25,6 +25,8 @@ int multistream_default_timeout = 5; */ int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { + if (msg == NULL || msg->data == NULL || msg->data_size == 0) + return 0; char *protocol = "/multistream/1.0.0\n"; int protocol_size = strlen(protocol); unsigned char* incoming = msg->data; @@ -394,6 +396,20 @@ struct Stream* libp2p_net_multistream_handshake(struct Stream* stream) { return NULL; } +/*** + * The protocol above is asking for an upgrade + * @param multistream this stream (a multistream) + * @param new_stream the protocol above + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Stream* new_stream) { + // take multistream out of the picture + if (new_stream->parent_stream == multistream) { + new_stream->parent_stream = multistream->parent_stream; + } + return 1; +} + /** * Create a new MultiStream structure * @param parent_stream the stream @@ -411,6 +427,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i out->peek = libp2p_net_multistream_peek; out->read_raw = libp2p_net_multistream_read_raw; out->negotiate = libp2p_net_multistream_handshake; + out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->address = parent_stream->address; // build MultistreamContext struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); diff --git a/peer/peer.c b/peer/peer.c index cc69531..903acdc 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -104,30 +104,6 @@ int libp2p_peer_connect(const struct Dialer* dialer, struct Libp2pPeer* peer, st peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer)); return libp2p_conn_dialer_join_swarm(dialer, peer, timeout); - /* - time_t now, prev = time(NULL); - // find an appropriate address - struct Libp2pLinkedList* current_address = peer->addr_head; - while (current_address != NULL && peer->connection_type != CONNECTION_TYPE_CONNECTED) { - struct MultiAddress *ma = (struct MultiAddress*)current_address->item; - // use the dialer to attempt to dial this MultiAddress and join the swarm - struct Stream* yamux_stream = libp2p_conn_dialer_get_stream(dialer, ma, "yamux"); - if (yamux_stream != NULL) { - // we're okay, get out - peer->connection_type = CONNECTION_TYPE_CONNECTED; - break; - } - now = time(NULL); - if (now >= (prev + timeout)) - break; - current_address = current_address->next; - } // trying to connect - int retVal = (peer->connection_type == CONNECTION_TYPE_CONNECTED); - if (!retVal) { - libp2p_logger_debug("peer", "Attempted connect to %s but failed.\n", libp2p_peer_id_to_string(peer)); - } - return retVal; - */ } /** diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index 320de79..e34383b 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -16,6 +16,8 @@ */ int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) { + if (msg == NULL || msg->data_size == 0 || msg->data == NULL) + return 0; if (msg->data_size < 8) return 0; char* result = strnstr((char*)msg->data, "/ipfs/kad", msg->data_size); diff --git a/secio/secio.c b/secio/secio.c index cffef89..1903dde 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -36,6 +36,8 @@ const char* SupportedCiphers = "AES-256,AES-128,Blowfish"; const char* SupportedHashes = "SHA256,SHA512"; int libp2p_secio_can_handle(const struct StreamMessage* msg) { + if (msg == NULL || msg->data_size == 0 || msg->data == NULL) + return 0; const char* protocol = "/secio/1.0.0"; // sanity checks if (msg->data_size < 12) diff --git a/yamux/session.c b/yamux/session.c index 51cb537..fdddcc1 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -116,7 +116,7 @@ ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err) } /*** - * Ping + * Respond to a Ping * @param session the session to ping * @param value the value to send * @param pong true(1) if we should send the ack, false(0) if we should send the syn (who's side are we on?) @@ -141,7 +141,7 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po struct StreamMessage outgoing; outgoing.data = (uint8_t*)&f; outgoing.data_size = sizeof(struct yamux_frame); - if (!session->parent_stream->write(session->parent_stream->stream_context, &outgoing)) + if (!session->parent_stream->parent_stream->write(session->parent_stream->parent_stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } @@ -180,13 +180,18 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s decode_frame(&f); // check yamux version - if (f.version != YAMUX_VERSION) + if (f.version != YAMUX_VERSION) { + libp2p_logger_error("yamux", "Incorrect Yamux version. Expected %d but received %d.\n", YAMUX_VERSION, f.version); return 0; + } - if (!f.streamid) // we're not dealing with a stream, we're dealing with something at the yamux protocol level + if (!f.streamid) {// we're not dealing with a stream, we're dealing with something at the yamux protocol level + libp2p_logger_debug("yamux", "Received frame with no stream id. We must need to do something at the protocol level.\n"); switch (f.type) { - case yamux_frame_ping: // ping + case yamux_frame_ping: { + // ping + libp2p_logger_debug("yamux", "Received a ping.\n"); if (f.flags & yamux_frame_syn) { yamux_session_ping(yamux_session, f.length, 1); @@ -214,15 +219,22 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s else return -EPROTO; break; - case yamux_frame_go_away: // go away (hanging up) + } + case yamux_frame_go_away: { + // go away (hanging up) + libp2p_logger_debug("yamux", "Received a \"go away\".\n"); yamux_session->closed = 1; if (yamux_session->go_away_fn) yamux_session->go_away_fn(yamux_session, (enum yamux_error)f.length); break; - default: + } + default: { + libp2p_logger_debug("yamux", "We thought we needed to do something at the yamux protocol level, but the flags didn't match up.\n"); return -EPROTO; + } } - else { // we're handling a stream, not something at the yamux protocol level + } else { + // we're handling a stream, not something at the yamux protocol level for (size_t i = 0; i < yamux_session->cap_streams; ++i) { struct yamux_session_stream* ss = &yamux_session->streams[i]; @@ -233,8 +245,10 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s if (s->id == f.streamid) // we have a match between the stored stream and the current stream { + libp2p_logger_debug("yamux", "We found our stream id.\n"); if (f.flags & yamux_frame_rst) { + libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n"); // close the stream s->state = yamux_stream_closed; @@ -243,6 +257,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } else if (f.flags & yamux_frame_fin) { + libp2p_logger_debug("yamux", "They are asking that this stream be closed.\n"); // local stream didn't initiate FIN if (s->state != yamux_stream_closing) yamux_stream_close(context); @@ -254,16 +269,22 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } else if (f.flags & yamux_frame_ack) { + libp2p_logger_debug("yamux", "They sent an ack.\n"); // acknowldegement - if (s->state != yamux_stream_syn_sent) + if (s->state != yamux_stream_syn_sent) { + libp2p_logger_debug("yamux", "We received an ack, but it seems we never sent anything!\n"); return -EPROTO; + } s->state = yamux_stream_est; } - else if (f.flags) + else if (f.flags) { + libp2p_logger_debug("yamux", "They sent no flags. I don't know what to do. Erroring out.\n"); return -EPROTO; + } int sz = sizeof(struct yamux_frame); + libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz); return (re < 0) ? re : (re + incoming_size); } @@ -273,16 +294,21 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s // It must not exist yet, so let's try to make it if (f.flags & yamux_frame_syn) { + libp2p_logger_debug("yamux", "Looks like we have a new stream coming in. Stream %d.\n", f.streamid); struct StreamMessage* msg = libp2p_stream_message_new(); if (incoming_size > sizeof(struct yamux_frame)) { msg->data_size = incoming_size - sizeof(struct yamux_frame); + libp2p_logger_debug("yamux", "Stream %d has data after the frame, with a length of %d.\n", f.streamid, msg->data_size); msg->data = malloc(msg->data_size); memcpy(msg->data, &incoming[sizeof(struct yamux_frame)], msg->data_size); + } else { + libp2p_logger_debug("yamux", "Stream %d has no extra data after the frame.\n", f.streamid); } // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) - if ( (f.streamid % 2 == 0 && yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { + if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { + libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n"); struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg); if (yamuxChannelStream == NULL) { libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); @@ -296,11 +322,15 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s } channelContext->state = yamux_stream_syn_recv; + } else { + libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); } *return_message = msg; } - else + else { + libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right."); return -EPROTO; + } } return 0; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 4e838bc..093a28c 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -52,6 +52,8 @@ struct YamuxContext* libp2p_yamux_get_context(void* stream_context) { * @returns true(1) if it can handle this message, false(0) if not */ int yamux_can_handle(const struct StreamMessage* msg) { + if (msg == NULL || msg->data_size == 0 || msg->data == NULL) + return 0; char *protocol = "/yamux/1.0.0\n"; int protocol_size = strlen(protocol); // is there a varint in front? @@ -254,6 +256,7 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); if (channel != NULL && channel->channel != 0) { + libp2p_logger_debug("yamux", "Data received on yamux stream %d.\n", channel->channel); // we have an established channel. Use it. if (!parent_stream->read(parent_stream->stream_context, message, yamux_default_timeout)) { libp2p_logger_error("yamux", "Read: Attepted to read from channel %d, but the read failed.\n", channel->channel); @@ -270,6 +273,7 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int } libp2p_logger_error("yamux", "yamux_decode returned error.\n"); } else if (ctx != NULL) { + libp2p_logger_debug("yamux", "read: It looks like we're trying to negotiate a new protocol.\n"); // We are still negotiating. They are probably attempting to negotiate a new protocol struct StreamMessage* incoming = NULL; if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { @@ -277,6 +281,12 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int // parse the frame if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) { libp2p_stream_message_free(incoming); + // The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled + if (*message != NULL && (*message)->data_size == 0) { + libp2p_stream_message_free(*message); + *message = NULL; + return 0; + } return 1; } libp2p_logger_error("yamux", "yamux_decode returned error.\n"); @@ -494,7 +504,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { // send the protocol id outgoing.data = (uint8_t*)protocolID; outgoing.data_size = strlen(protocolID); - libp2p_logger_debug("yamux", "Attempting to write the yamux protocol id.\n"); + libp2p_logger_debug("yamux", "Attempting to write the yamux protocol id as %s.\n", (am_server ? "server" : "client")); if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) { libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n"); goto exit; @@ -594,6 +604,8 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv libp2p_yamux_stream_free(out); return NULL; } + // tell protocol below that we want to upgrade + parent_stream->handle_upgrade(parent_stream, out); } return out; }