From f2e5af40585d3c138b786ac5452c4ff09489a034 Mon Sep 17 00:00:00 2001 From: John Jones Date: Wed, 8 Nov 2017 10:51:43 -0500 Subject: [PATCH] Major changes for implementation of yamux protocol This commit changes the Stream interface, so as to allow the yamux protocol to have channels. It is necessary, but touches many areas. The codebase is better for it. --- conn/session.c | 2 +- conn/tcp_transport_dialer.c | 2 +- identify/identify.c | 39 +++-- include/libp2p/identify/identify.h | 5 +- include/libp2p/net/connectionstream.h | 20 ++- include/libp2p/net/protocol.h | 17 +- include/libp2p/net/stream.h | 13 +- include/libp2p/routing/dht_protocol.h | 21 ++- include/libp2p/secio/secio.h | 7 +- include/libp2p/yamux/yamux.h | 9 +- net/connectionstream.c | 46 +++++- net/multistream.c | 114 ++++++------- net/protocol.c | 19 ++- net/stream.c | 7 + routing/dht_protocol.c | 96 ++++++----- secio/secio.c | 24 ++- test/mock_stream.h | 26 +-- test/test_conn.h | 2 +- test/test_multistream.h | 4 +- test/test_yamux.h | 54 +++--- yamux/yamux.c | 230 ++++++++++++++------------ 21 files changed, 438 insertions(+), 319 deletions(-) diff --git a/conn/session.c b/conn/session.c index 971a6e5..db8f3eb 100644 --- a/conn/session.c +++ b/conn/session.c @@ -44,7 +44,7 @@ struct SessionContext* libp2p_session_context_new() { int libp2p_session_context_free(struct SessionContext* context) { if (context != NULL) { if (context->default_stream != NULL) - context->default_stream->close(context); + context->default_stream->close(context->default_stream); context->default_stream = NULL; context->insecure_stream = NULL; context->secure_stream = NULL; diff --git a/conn/tcp_transport_dialer.c b/conn/tcp_transport_dialer.c index 5804bb9..6859333 100644 --- a/conn/tcp_transport_dialer.c +++ b/conn/tcp_transport_dialer.c @@ -24,7 +24,7 @@ struct Stream* libp2p_conn_tcp_dial(const struct TransportDialer* transport_dial if (!multiaddress_get_ip_address(addr, &ip)) return NULL; - struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port); + struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port, NULL); free(ip); return stream; diff --git a/identify/identify.c b/identify/identify.c index 079a31d..3eafbd8 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -35,12 +35,12 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) { * @param context the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_send_protocol(struct IdentifyContext *context) { +int libp2p_identify_send_protocol(struct Stream *stream) { char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage msg; msg.data = (uint8_t*) protocol; msg.data_size = strlen(protocol); - if (!context->parent_stream->write(context->parent_stream->stream_context, &msg)) { + if (!stream->write(stream->stream_context, &msg)) { libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); return 0; } @@ -53,10 +53,10 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_receive_protocol(struct IdentifyContext* context) { +int libp2p_identify_receive_protocol(struct Stream* stream) { const char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage* results = NULL; - if (!context->parent_stream->read(context->parent_stream->stream_context, &results, 30)) { + if (!stream->read(stream->stream_context, &results, 30)) { libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); return 0; } @@ -80,15 +80,14 @@ int libp2p_identify_receive_protocol(struct IdentifyContext* context) { * @param protocol_context the identify protocol context * @returns <0 on error, 0 if loop should not continue, >0 on success */ -int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { - if (protocol_context == NULL) +int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct Stream* new_stream = libp2p_identify_stream_new(stream); + if (new_stream == NULL) return -1; // attempt to create a new Identify connection with them. // send the protocol id back, and set up the channel - struct IdentifyContext* ctx = (struct IdentifyContext*)protocol_context; - libp2p_identify_send_protocol(ctx); //TODO: now add this "channel" - return 1; + return stream->handle_upgrade(stream, new_stream); } /** @@ -97,13 +96,16 @@ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Sessi * @returns true(1) */ int libp2p_identify_shutdown(void* protocol_context) { - return 0; + if (protocol_context == NULL) + return 0; + free(protocol_context); + return 1; } struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = handlers; + handler->context = handler; handler->CanHandle = libp2p_identify_can_handle; handler->HandleMessage = libp2p_identify_handle_message; handler->Shutdown = libp2p_identify_shutdown; @@ -111,13 +113,14 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp return handler; } -int libp2p_identify_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_identify_close(struct Stream* stream) { + if (stream == NULL) return 0; - struct IdentifyContext* ctx = (struct IdentifyContext*)stream_context; - ctx->parent_stream->close(ctx->parent_stream->stream_context); - free(ctx->stream); - free(ctx); + if (stream->parent_stream != NULL) + stream->parent_stream->close(stream->parent_stream); + if (stream->stream_context != NULL) + free(stream->stream_context); + libp2p_stream_free(stream); return 1; } @@ -146,7 +149,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { ctx->stream = out; out->stream_context = ctx; out->close = libp2p_identify_close; - if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) { + if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) { libp2p_stream_free(out); free(ctx); return NULL; diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index 2311d25..b2bec1a 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -29,9 +29,8 @@ struct IdentifyContext { }; int libp2p_identify_can_handle(const struct StreamMessage* msg); -int libp2p_identify_send_protocol(struct IdentifyContext *context); -int libp2p_identify_receive_protocol(struct IdentifyContext* context); -int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context); +int libp2p_identify_send_protocol(struct Stream* stream); +int libp2p_identify_receive_protocol(struct Stream* stream); int libp2p_identify_shutdown(void* protocol_context); struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers); diff --git a/include/libp2p/net/connectionstream.h b/include/libp2p/net/connectionstream.h index 38c6c16..d928e75 100644 --- a/include/libp2p/net/connectionstream.h +++ b/include/libp2p/net/connectionstream.h @@ -1,6 +1,7 @@ #pragma once #include "libp2p/net/stream.h" +#include "libp2p/conn/session.h" /*** * Create a new stream based on a network connection @@ -9,7 +10,24 @@ * @param port the port of the connection * @returns a Stream */ -struct Stream* libp2p_net_connection_new(int fd, char* ip, int port); +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context); + +/** + * Attempt to upgrade the parent_stream to use the new stream by default + * @param parent_stream the parent stream + * @param new_stream the new stream + * @returns true(1) on success, false(0) if not + */ +int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream); + +/** + * Given a stream, find the SessionContext + * NOTE: This is done by navigating to the root context, which should + * be a ConnectionContext, then grabbing the SessionContext there. + * @param stream the stream to use + * @returns the SessionContext for this stream + */ +struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream); /*** * These are put here to allow implementations of struct Stream diff --git a/include/libp2p/net/protocol.h b/include/libp2p/net/protocol.h index a9992fd..801636f 100644 --- a/include/libp2p/net/protocol.h +++ b/include/libp2p/net/protocol.h @@ -22,11 +22,11 @@ struct Libp2pProtocolHandler { * Handles the message * @param incoming the incoming data buffer * @param incoming_size the size of the incoming data buffer - * @param session_context the information about the incoming connection + * @param stream the incoming stream * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ - int (*HandleMessage)(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context); + int (*HandleMessage)(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context); /** * Shutting down. Clean up any memory allocations @@ -45,8 +45,15 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new(); /*** * Handle an incoming message * @param message the incoming message - * @param session the SessionContext of the incoming connection + * @param stream the incoming connection * @param handlers a Vector of protocol handlers - * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success + * @returns -1 on error, 0 on protocol upgrade, 1 on success */ -int libp2p_protocol_marshal(struct StreamMessage* message, struct SessionContext* context, struct Libp2pVector* protocol_handlers); +int libp2p_protocol_marshal(struct StreamMessage* message, struct Stream* stream, struct Libp2pVector* protocol_handlers); + +/*** + * Shut down all protocol handlers and free vector + * @param handlers vector of Libp2pProtocolHandler + * @returns true(1) + */ +int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index a6a920a..370d372 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -46,6 +46,8 @@ struct Stream { struct MultiAddress* address; // helps identify who is on the other end pthread_mutex_t* socket_mutex; // only 1 transmission at a time struct Stream* parent_stream; // what stream wraps this stream + int channel; // the channel (for multiplexing streams) + /** * A generic place to store implementation-specific context items */ @@ -82,10 +84,10 @@ struct Stream { * Closes a stream * * NOTE: This is also responsible for deallocating the Stream struct - * @param stream the stream context + * @param stream the stream * @returns true(1) on success, otherwise false(0) */ - int (*close)(void* stream_context); + int (*close)(struct Stream* stream); /*** * Checks to see if something is waiting on the stream @@ -94,6 +96,13 @@ struct Stream { * @returns true(1) if something is waiting, false(0) if not, -1 on error */ int (*peek)(void* stream_context); + + /** + * Handle a stream upgrade + * @param stream the current stream + * @param new_stream the newly created stream + */ + int (*handle_upgrade)(struct Stream* stream, struct Stream* new_stream); }; struct Stream* libp2p_stream_new(); diff --git a/include/libp2p/routing/dht_protocol.h b/include/libp2p/routing/dht_protocol.h index 581ebf7..6915501 100644 --- a/include/libp2p/routing/dht_protocol.h +++ b/include/libp2p/routing/dht_protocol.h @@ -10,7 +10,16 @@ * This is where kademlia and dht talk to the outside world */ -struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store); + +struct DhtContext { + struct Peerstore* peer_store; + struct ProviderStore* provider_store; + struct Datastore* datastore; + struct Filestore* filestore; +}; + +struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store, + struct Datastore* datastore, struct Filestore* filestore); /** * Take existing stream and upgrade to the Kademlia / DHT protocol/codec @@ -21,20 +30,20 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context); /** * Handle a client requesting an upgrade to the DHT protocol - * @param context the context + * @param stream the stream * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handshake(struct SessionContext* context); +int libp2p_routing_dht_handshake(struct Stream* stream); /*** * Handle the incoming message. Handshake should have already * been done. We should expect that the next read contains * a protobuf'd kademlia message. - * @param session the context - * @param peerstore a list of peers + * @param stream the incoming stream + * @param protocol_context the protocol context * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore); +int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context); /*** * Send a kademlia message diff --git a/include/libp2p/secio/secio.h b/include/libp2p/secio/secio.h index e756346..500ff51 100644 --- a/include/libp2p/secio/secio.h +++ b/include/libp2p/secio/secio.h @@ -42,16 +42,17 @@ int libp2p_secio_initiate_handshake(struct SecioContext* ctx); /*** * Send the protocol string to the remote stream - * @param session the context + * @param stream stream * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SecioContext* session); +int libp2p_secio_send_protocol(struct Stream* stream); + /*** * Attempt to read the secio protocol as a reply from the remote * @param session the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SecioContext* session); +int libp2p_secio_receive_protocol(struct Stream* stream); /*** * performs initial communication over an insecure channel to share diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 27218eb..5c67f8f 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -26,7 +26,10 @@ struct YamuxContext { struct YamuxChannelContext { char type; struct YamuxContext* yamux_context; + // this stream struct Stream* stream; + // the child protocol's stream + struct Stream* child_stream; // the channel number int channel; // the window size for this channel @@ -40,14 +43,14 @@ struct YamuxChannelContext { /** * Build a handler that can handle the yamux protocol */ -struct Libp2pProtocolHandler* yamux_build_protocol_handler(); +struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(); /*** * Send the yamux protocol out the default stream * NOTE: if we initiate the connection, we should expect the same back - * @param context the SessionContext + * @param stream the stream * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct YamuxContext* context); +int yamux_send_protocol(struct Stream* stream); /*** * Check to see if the reply is the yamux protocol header we expect diff --git a/net/connectionstream.c b/net/connectionstream.c index 6884f9c..639fe27 100644 --- a/net/connectionstream.c +++ b/net/connectionstream.c @@ -12,6 +12,7 @@ #include "libp2p/net/stream.h" #include "libp2p/net/p2pnet.h" #include "libp2p/utils/logger.h" +#include "libp2p/conn/session.h" #include "multiaddr/multiaddr.h" /** @@ -19,10 +20,10 @@ * @param stream_context the ConnectionContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_net_connection_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_net_connection_close(struct Stream* stream) { + if (stream->stream_context == NULL) return 0; - struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; if (ctx != NULL) { if (ctx->socket_descriptor > 0) { close(ctx->socket_descriptor); @@ -113,7 +114,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) * @param port the port of the connection * @returns a Stream */ -struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context) { struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream)); if (out != NULL) { out->close = libp2p_net_connection_close; @@ -135,6 +136,7 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { if (ctx != NULL) { out->stream_context = ctx; ctx->socket_descriptor = fd; + ctx->session_context = session_context; if (!socket_connect4_with_timeout(ctx->socket_descriptor, hostname_to_ip(ip), port, 10) == 0) { // unable to connect libp2p_stream_free(out); @@ -144,3 +146,39 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { } return out; } + +/** + * Attempt to upgrade the parent_stream to use the new stream by default + * @param parent_stream the parent stream + * @param new_stream the new stream + * @returns true(1) on success, false(0) if not + */ +int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { + if (parent_stream == NULL) + return 0; + struct Stream* current_stream = parent_stream; + while (current_stream->parent_stream != NULL) + current_stream = current_stream->parent_stream; + // current_stream is now the root, and should have a ConnectionContext + struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context; + ctx->session_context->default_stream = new_stream; + return 1; +} + +/** + * Given a stream, find the SessionContext + * NOTE: This is done by navigating to the root context, which should + * be a ConnectionContext, then grabbing the SessionContext there. + * @param stream the stream to use + * @returns the SessionContext for this stream + */ +struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream) { + if (stream == NULL) { + return NULL; + } + struct Stream* current_stream = stream; + while (current_stream->parent_stream != NULL) + current_stream = current_stream->parent_stream; + struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context; + return ctx->session_context; +} diff --git a/net/multistream.c b/net/multistream.c index 2facd5a..532dff5 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -9,6 +9,7 @@ #include "libp2p/os/utils.h" #include "libp2p/net/p2pnet.h" +#include "libp2p/net/connectionstream.h" #include "libp2p/record/message.h" #include "libp2p/secio/secio.h" #include "varint.h" @@ -78,45 +79,6 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) { return 1; } -int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { - // try sending the protocol back - //if (!libp2p_net_multistream_send_protocol(context)) - // return -1; - - struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context; - - // try to read from the network - struct StreamMessage* results = NULL; - int retVal = 0; - int max_retries = 10; - int numRetries = 0; - // handle the call - for(;;) { - // try to read for 5 seconds - if (context->default_stream->read(context, &results, 5)) { - // we read something from the network. Process it. - // NOTE: If it is a multistream protocol that we are receiving, ignore it. - if (libp2p_net_multistream_can_handle(results)) - continue; - numRetries = 0; - retVal = libp2p_protocol_marshal(results, context, multistream_context->handlers); - if (results != NULL) - free(results); - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; - } else { - // we were unable to read from the network. - // if it timed out, we should try again (if we're not out of retries) - if (numRetries >= max_retries) - break; - numRetries++; - } - } - - return retVal; -} - int libp2p_net_multistream_shutdown(void* protocol_context) { struct MultistreamContext* context = (struct MultistreamContext*) protocol_context; if (context != NULL) { @@ -125,37 +87,13 @@ int libp2p_net_multistream_shutdown(void* protocol_context) { return 1; } -/*** - * The handler to handle calls to the protocol - * @param stream_context the context - * @returns the protocol handler - */ -struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { - - // build the context - struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); - if (context == NULL) - return NULL; - context->handlers = (struct Libp2pVector*) handler_vector; - - // build the handler - struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); - if (handler != NULL) { - handler->context = context; - handler->CanHandle = libp2p_net_multistream_can_handle; - handler->HandleMessage = libp2p_net_multistream_handle_message; - handler->Shutdown = libp2p_net_multistream_shutdown; - } - return handler; -} - /** * Close the connection and free memory * @param ctx the context * @returns true(1) on success, otherwise false(0) */ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { - int retVal = ctx->stream->close(ctx); + int retVal = ctx->stream->close(ctx->stream); // regardless of retVal, free the context // TODO: Evaluate if this is the correct way to do it: free(ctx); @@ -168,11 +106,11 @@ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { * @param stream_context a SessionContext * @returns true(1) on success, otherwise false(0) */ -int libp2p_net_multistream_close(void* stream_context) { - if (stream_context == NULL) { +int libp2p_net_multistream_close(struct Stream* stream) { + if (stream->stream_context == NULL) { return 0; } - struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context; + struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream->stream_context; return libp2p_net_multistream_context_free(multistream_context); } @@ -458,3 +396,45 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { return out; } +/*** + * The remote is attempting to negotiate the multistream protocol + * @param msg incoming message + * @param stream the incoming stream + * @param protocol_context the context for the Multistream protocol (not stream specific) + * @returns <0 on error, 0 for the caller to stop handling this, 1 for success + */ +int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + // attempt negotiations + struct Stream* new_stream = libp2p_net_multistream_stream_new(stream); + if (new_stream != NULL) { + // upgrade + return stream->handle_upgrade(stream, new_stream); + } + + return -1; +} + +/*** + * The handler to handle calls to the protocol + * @param stream_context the context + * @returns the protocol handler + */ +struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { + + // build the context + struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); + if (context == NULL) + return NULL; + context->handlers = (struct Libp2pVector*) handler_vector; + + // build the handler + struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); + if (handler != NULL) { + handler->context = context; + handler->CanHandle = libp2p_net_multistream_can_handle; + handler->HandleMessage = libp2p_net_multistream_handle_message; + handler->Shutdown = libp2p_net_multistream_shutdown; + } + return handler; +} + diff --git a/net/protocol.c b/net/protocol.c index 148d090..4ff4956 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -46,7 +46,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() { * @param handlers a Vector of protocol handlers * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success */ -int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* session, struct Libp2pVector* handlers) { +int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, struct Libp2pVector* handlers) { const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers); if (handler == NULL) { @@ -60,9 +60,22 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* se break; } } - libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str); return -1; } - return handler->HandleMessage(msg, session, handler->context); + return handler->HandleMessage(msg, stream, handler->context); +} + +/*** + * Shut down all protocol handlers and free vector + * @param handlers vector of Libp2pProtocolHandler + * @returns true(1) + */ +int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers) { + for(int i = 0; i < handlers->total; i++) { + struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*)libp2p_utils_vector_get(handlers, i); + handler->Shutdown(handler->context); + } + libp2p_utils_vector_free(handlers); + return 1; } diff --git a/net/stream.c b/net/stream.c index 6bd5590..33bdd9b 100644 --- a/net/stream.c +++ b/net/stream.c @@ -2,6 +2,11 @@ #include "multiaddr/multiaddr.h" #include "libp2p/net/stream.h" +#include "libp2p/net/connectionstream.h" + +int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { + return libp2p_net_connection_upgrade(parent_stream, new_stream); +} struct Stream* libp2p_stream_new() { struct Stream* stream = (struct Stream*) malloc(sizeof(struct Stream)); @@ -15,6 +20,8 @@ struct Stream* libp2p_stream_new() { stream->socket_mutex = NULL; stream->stream_context = NULL; stream->write = NULL; + stream->handle_upgrade = libp2p_stream_default_handle_upgrade; + stream->channel = -1; } return stream; } diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index e4f795c..320de79 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -15,11 +15,6 @@ * This is where kademlia and dht talk to the outside world */ -struct DhtContext { - struct Peerstore* peer_store; - struct ProviderStore* provider_store; -}; - int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) { if (msg->data_size < 8) return 0; @@ -34,20 +29,22 @@ int libp2p_routing_dht_shutdown(void* context) { return 1; } -int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct SessionContext* session_context, void* context) { - libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id); - struct DhtContext* ctx = (struct DhtContext*)context; - if (!libp2p_routing_dht_handshake(session_context)) +int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct DhtContext* ctx = (struct DhtContext*)protocol_context; + if (!libp2p_routing_dht_handshake(stream)) return -1; - return (libp2p_routing_dht_handle_message(session_context, ctx->peer_store, ctx->provider_store) == 0) ? -1 : 1; + return (libp2p_routing_dht_handle_message(stream, ctx) == 0) ? -1 : 1; } -struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store) { +struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store, + struct Datastore* datastore, struct Filestore* filestore) { struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*) malloc(sizeof(struct Libp2pProtocolHandler)); if (handler != NULL) { struct DhtContext* ctx = (struct DhtContext*) malloc(sizeof(struct DhtContext)); ctx->peer_store = peer_store; ctx->provider_store = provider_store; + ctx->datastore = datastore; + ctx->filestore = filestore; handler->context = ctx; handler->CanHandle = libp2p_routing_dht_can_handle; handler->HandleMessage = libp2p_routing_dht_handle_msg; @@ -111,15 +108,15 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { /** * Handle a client requesting an upgrade to the DHT protocol - * @param context the context + * @param stream the stream to the remote * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handshake(struct SessionContext* context) { +int libp2p_routing_dht_handshake(struct Stream* stream) { char* protocol = "/ipfs/kad/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*) protocol; outgoing.data_size = strlen(protocol); - return context->default_stream->write(context, &outgoing); + return stream->write(stream->stream_context, &outgoing); } /** @@ -136,14 +133,15 @@ int libp2p_routing_dht_handle_ping(struct KademliaMessage* message, unsigned cha /** * See if we have information as to who can provide this item - * @param session the context + * @param stream the incoming stream * @param message the message from the caller, contains a key - * @param peerstore the list of peers - * @param providerstore the list of peers that can provide things + * @param protocol_context the context + * @param results where to put the results + * @param results_size the size of the results * @returns true(1) on success, false(0) otherwise */ -int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct KademliaMessage* message, struct Peerstore* peerstore, - struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) { +int libp2p_routing_dht_handle_get_providers(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* protocol_context, + unsigned char** results, size_t* results_size) { unsigned char* peer_id = NULL; int peer_id_size = 0; @@ -152,16 +150,16 @@ int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, stru // Can I provide it locally? struct DatastoreRecord* datastore_record = NULL; - if (session->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, session->datastore)) { + if (protocol_context->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, protocol_context->datastore)) { // we can provide this hash from our datastore libp2p_datastore_record_free(datastore_record); libp2p_logger_debug("dht_protocol", "I can provide myself as a provider for this key.\n"); message->provider_peer_head = libp2p_utils_linked_list_new(); - message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(peerstore)); - } else if (libp2p_providerstore_get(providerstore, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) { + message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(protocol_context->peer_store)); + } else if (libp2p_providerstore_get(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) { // Can I provide it because someone announced it earlier? // we have a peer id, convert it to a peer object - struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, peer_id, peer_id_size); + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, peer_id, peer_id_size); if (peer != NULL) { libp2p_logger_debug("dht_protocol", "I can provide a provider for this key, because %s says he has it.\n", libp2p_peer_id_to_string(peer)); // add it to the message @@ -233,16 +231,15 @@ struct MultiAddress* libp2p_routing_dht_find_peer_ip_multiaddress(struct Libp2pL /*** * Remote peer has announced that he can provide a key - * @param session session context + * @param stream the incoming stream * @param message the message - * @param peerstore the peerstore - * @param providerstore the providerstore + * @param protocol_context the context * @param result_buffer where to put the result * @param result_buffer_size the size of the result buffer * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) { +int libp2p_routing_dht_handle_add_provider(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context, unsigned char** result_buffer, size_t* result_buffer_size) { int retVal = 0; struct Libp2pPeer *peer = NULL; @@ -294,10 +291,10 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc */ // now add the peer to the peerstore libp2p_logger_debug("dht_protocol", "About to add peer %s to peerstore\n", peer_ma->string); - if (!libp2p_peerstore_add_peer(peerstore, peer)) + if (!libp2p_peerstore_add_peer(protocol_context->peer_store, peer)) goto exit; libp2p_logger_debug("dht_protocol", "About to add key to providerstore\n"); - if (!libp2p_providerstore_add(providerstore, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size)) + if (!libp2p_providerstore_add(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size)) goto exit; } @@ -332,11 +329,11 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc * @param result_buffer_size the size of the results * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { +int libp2p_routing_dht_handle_get_value(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* dht_context, + unsigned char** result_buffer, size_t *result_buffer_size) { - struct Datastore* datastore = session->datastore; - struct Filestore* filestore = session->filestore; + struct Datastore* datastore = dht_context->datastore; + struct Filestore* filestore = dht_context->filestore; size_t data_size = 0; unsigned char* data = NULL; @@ -374,14 +371,15 @@ int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct K /** * Put something in the dht datastore - * @param session the session context + * @param stream the incoming stream * @param message the message * @param peerstore the peerstore * @param providerstore the providerstore + * @param datastore the datastore * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore) { +int libp2p_routing_dht_handle_put_value(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context) { if (message->record == NULL) return 0; @@ -407,7 +405,7 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K } memcpy(record->value, message->record->value, record->value_size); - int retVal = session->datastore->datastore_put(record, session->datastore); + int retVal = protocol_context->datastore->datastore_put(record, protocol_context->datastore); libp2p_datastore_record_free(record); return retVal; } @@ -422,10 +420,10 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K * @param result_buffer_size the size of the results * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { +int libp2p_routing_dht_handle_find_node(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context, unsigned char** result_buffer, size_t *result_buffer_size) { // look through peer store - struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)message->key, message->key_size); + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, (unsigned char*)message->key, message->key_size); if (peer != NULL) { message->provider_peer_head = libp2p_utils_linked_list_new(); message->provider_peer_head->item = libp2p_peer_copy(peer); @@ -445,7 +443,7 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct K * @param peerstore a list of peers * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) { +int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context) { unsigned char *result_buffer = NULL; struct StreamMessage* buffer = NULL; size_t result_buffer_size = 0; @@ -453,7 +451,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee struct KademliaMessage* message = NULL; // read from stream - if (!session->default_stream->read(session, &buffer, 5)) + if (!stream->read(stream->stream_context, &buffer, 5)) goto exit; // unprotobuf if (!libp2p_message_protobuf_decode(buffer->data, buffer->data_size, &message)) @@ -462,19 +460,19 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee // handle message switch(message->message_type) { case(MESSAGE_TYPE_PUT_VALUE): // store a value in local storage - libp2p_routing_dht_handle_put_value(session, message, peerstore, providerstore); + libp2p_routing_dht_handle_put_value(stream, message, protocol_context); break; case(MESSAGE_TYPE_GET_VALUE): // get a value from local storage - libp2p_routing_dht_handle_get_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_get_value(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_ADD_PROVIDER): // client wants us to know he can provide something - libp2p_routing_dht_handle_add_provider(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_add_provider(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_GET_PROVIDERS): // see if we can help, and send closer peers - libp2p_routing_dht_handle_get_providers(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_get_providers(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_FIND_NODE): // find peers - libp2p_routing_dht_handle_find_node(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_find_node(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_PING): libp2p_routing_dht_handle_ping(message, &result_buffer, &result_buffer_size); @@ -486,7 +484,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee struct StreamMessage outgoing; outgoing.data = result_buffer; outgoing.data_size = result_buffer_size; - if (!session->default_stream->write(session, &outgoing)) + if (!stream->write(stream->stream_context, &outgoing)) goto exit; } else { libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type); diff --git a/secio/secio.c b/secio/secio.c index 5bf965e..e080a45 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -53,11 +53,11 @@ int libp2p_secio_can_handle(const struct StreamMessage* msg) { * @param protocol_context a SecioContext that contains the needed information * @returns <0 on error, 0 if okay (does not allow daemon to continue looping) */ -int libp2p_secio_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { +int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { libp2p_logger_debug("secio", "Handling incoming secio message.\n"); struct SecioContext* ctx = (struct SecioContext*)protocol_context; // send them the protocol - if (!libp2p_secio_send_protocol(ctx)) + if (!libp2p_secio_send_protocol(stream)) return -1; int retVal = libp2p_secio_handshake(ctx); if (retVal) @@ -518,12 +518,12 @@ int libp2p_secio_make_mac_and_cipher(struct SessionContext* session, struct Stre * @param ctx the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SecioContext* ctx) { +int libp2p_secio_send_protocol(struct Stream* stream) { char* protocol = "/secio/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing); + return stream->write(stream->stream_context, &outgoing); } /*** @@ -531,12 +531,12 @@ int libp2p_secio_send_protocol(struct SecioContext* ctx) { * @param ctx the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SecioContext* ctx) { +int libp2p_secio_receive_protocol(struct Stream* stream) { char* protocol = "/secio/1.0.0\n"; int numSecs = 30; int retVal = 0; struct StreamMessage* buffer = NULL; - ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &buffer, numSecs); + stream->read(stream->stream_context, &buffer, numSecs); if (buffer == NULL) { libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); } else { @@ -1335,6 +1335,12 @@ int libp2p_secio_read_raw(void* stream_context, uint8_t* buffer, int buffer_size return max_to_read; } +int libp2p_secio_close(struct Stream* stream) { + if (stream != NULL && stream->stream_context != NULL) + free(stream->stream_context); + return 1; +} + /*** * Initiates a secio handshake. Use this method when you want to initiate a secio * session. This should not be used to respond to incoming secio requests @@ -1361,13 +1367,13 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp ctx->peer_store = peerstore; ctx->private_key = rsa_private_key; new_stream->parent_stream = parent_stream; - new_stream->close = libp2p_secio_shutdown; + new_stream->close = libp2p_secio_close; new_stream->peek = libp2p_secio_peek; new_stream->read = libp2p_secio_encrypted_read; new_stream->read_raw = libp2p_secio_read_raw; new_stream->write = libp2p_secio_encrypted_write; - if (!libp2p_secio_send_protocol(ctx) - || !libp2p_secio_receive_protocol(ctx) + if (!libp2p_secio_send_protocol(parent_stream) + || !libp2p_secio_receive_protocol(parent_stream) || !libp2p_secio_handshake(ctx)) { libp2p_stream_free(new_stream); new_stream = NULL; diff --git a/test/mock_stream.h b/test/mock_stream.h index 605cdb9..8f8d4f0 100644 --- a/test/mock_stream.h +++ b/test/mock_stream.h @@ -3,17 +3,13 @@ #include #include "libp2p/net/stream.h" -struct MockContext { - struct Stream* stream; -}; - void mock_stream_free(struct Stream* stream); -int mock_stream_close(void* context) { - if (context == NULL) - return 1; - struct MockContext* ctx = (struct MockContext*)context; - mock_stream_free(ctx->stream); +int mock_stream_close(struct Stream* stream) { + if (stream == NULL) + return 0; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; + mock_stream_free(stream); return 1; } @@ -41,8 +37,9 @@ struct Stream* mock_stream_new() { out->read = mock_stream_read; out->read_raw = mock_stream_read_raw; out->write = mock_stream_write; - struct MockContext* ctx = malloc(sizeof(struct MockContext)); - ctx->stream = out; + struct ConnectionContext* ctx = malloc(sizeof(struct ConnectionContext)); + ctx->session_context = (struct SessionContext*)malloc(sizeof(struct SessionContext)); + ctx->session_context->default_stream = out; out->stream_context = ctx; } return out; @@ -51,7 +48,12 @@ struct Stream* mock_stream_new() { void mock_stream_free(struct Stream* stream) { if (stream == NULL) return; - if (stream->stream_context != NULL) + if (stream->stream_context != NULL) { + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; + // this will close the session, which will be a loop, so don't + //libp2p_session_context_free(ctx->session_context); + free(ctx->session_context); free(stream->stream_context); + } free(stream); } diff --git a/test/test_conn.h b/test/test_conn.h index ed98544..29a41b4 100644 --- a/test/test_conn.h +++ b/test/test_conn.h @@ -182,7 +182,7 @@ int test_dialer_dial_multistream() { if (stream != NULL) { struct SessionContext session_context; session_context.insecure_stream = stream; - stream->close(&session_context); + stream->close(stream); libp2p_net_multistream_stream_free(stream); } return retVal; diff --git a/test/test_multistream.h b/test/test_multistream.h index f8cd2ba..b2a796d 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -21,7 +21,7 @@ int test_multistream_connect() { if (stream != NULL) { struct SessionContext ctx; ctx.insecure_stream = stream; - stream->close(&ctx); + stream->close(stream); libp2p_net_multistream_stream_free(stream); } @@ -61,7 +61,7 @@ int test_multistream_get_list() { exit: if (session.insecure_stream != NULL) { - session.insecure_stream->close(&session); + session.insecure_stream->close(session.insecure_stream); libp2p_net_multistream_stream_free(session.insecure_stream); } libp2p_stream_message_free(response); diff --git a/test/test_yamux.h b/test/test_yamux.h index e0f0c9a..6237c15 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -53,8 +53,7 @@ int test_yamux_stream_new() { retVal = 1; exit: if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + yamux_stream->close(yamux_stream); return retVal; } @@ -71,43 +70,58 @@ int test_yamux_identify() { goto exit; // Now add in another protocol mock_stream->read = mock_identify_read_protocol; - if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(libp2p_yamux_channel_new(yamux_stream)))) { + if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream))) { goto exit; } // tear down retVal = 1; exit: if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + yamux_stream->close(yamux_stream); return retVal; } int test_yamux_incoming_protocol_request() { int retVal = 0; + // setup - struct Stream* mock_stream = mock_stream_new(); - mock_stream->read = mock_yamux_read_protocol; - struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); - if (yamux_stream == NULL) - goto exit; - // build the protocol handler that can handle identify protocol + // build the protocol handler that can handle yamux and identify protocol struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1); struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers); libp2p_utils_vector_add(protocol_handlers, handler); - struct SessionContext session_context; - session_context.default_stream = yamux_stream; + handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + struct Stream* mock_stream = mock_stream_new(); + struct SessionContext* session_context = ((struct ConnectionContext*)mock_stream->stream_context)->session_context; + mock_stream->read = mock_yamux_read_protocol; + struct StreamMessage* result_message = NULL; + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { + libp2p_logger_error("test_yamux", "Unable to read Yamux protocol from mock stream.\n"); + goto exit; + } + if (libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers) < 0) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol unsuccessful.\n"); + goto exit; + } + // now we should have upgraded to the yamux protocol + libp2p_stream_message_free(result_message); + result_message = NULL; + if (session_context->default_stream->parent_stream == NULL) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol appeared susccessful, but was not.\n"); + goto exit; + } // Someone is requesting the identity protocol mock_stream->read = mock_identify_read_protocol; - struct StreamMessage* result_message; - if (!yamux_stream->read(yamux_stream->stream_context, &result_message, 10)) { + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { libp2p_logger_error("test_yamux", "Unable to read identify protocol.\n"); goto exit; } // handle the marshaling of the protocol - libp2p_protocol_marshal(result_message, &session_context, protocol_handlers); + libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers); + libp2p_stream_message_free(result_message); + result_message = NULL; // now verify the results - struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; + struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context; if (yamux_context->channels->total != 1) { libp2p_logger_error("test_yamux", "Identify protocol was not found.\n"); goto exit; @@ -116,8 +130,8 @@ int test_yamux_incoming_protocol_request() { // tear down retVal = 1; exit: - if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + if (session_context->default_stream != NULL) + session_context->default_stream->close(session_context->default_stream); + libp2p_protocol_handlers_shutdown(protocol_handlers); return retVal; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 5a322ad..451bd07 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -5,6 +5,7 @@ #include "libp2p/yamux/yamux.h" #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" +#include "libp2p/net/connectionstream.h" #include "libp2p/conn/session.h" #include "libp2p/utils/logger.h" @@ -59,12 +60,12 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct YamuxContext* context) { +int yamux_send_protocol(struct Stream* stream) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - if (!context->stream->parent_stream->write(context->stream->parent_stream->stream_context, &outgoing)) + if (!stream->write(stream->stream_context, &outgoing)) return 0; return 1; } @@ -99,33 +100,16 @@ int yamux_receive_protocol(struct YamuxContext* context) { * The remote is attempting to negotiate yamux * @param msg the incoming message * @param incoming_size the size of the incoming data buffer - * @param session_context the information about the incoming connection + * @param stream the incoming stream * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { - struct YamuxContext* ctx = (struct YamuxContext*)protocol_context; - // we should have the yamux protocol in msg. Send the protocol back. - if (!yamux_send_protocol(ctx)) { - return 0; - } - /* - struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context); - uint8_t* buf = (uint8_t*) malloc(msg->data_size); - if (buf == NULL) +int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct Stream* new_stream = libp2p_yamux_stream_new(stream); + if (new_stream == NULL) return -1; - memcpy(buf, msg->data, msg->data_size); - for(;;) { - int retVal = yamux_decode(yamux, msg->data, msg->data_size); - free(buf); - buf = NULL; - if (!retVal) - break; - else { // try to read more from this stream - // TODO need more information as to what this loop should do - } - } - */ + // upgrade + stream->handle_upgrade(stream, new_stream); return 1; } @@ -135,13 +119,15 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* * @returns true(1) */ int yamux_shutdown(void* protocol_context) { + if (protocol_context != NULL) + free(protocol_context); return 0; } -struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) { +struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = handlers; + handler->context = handler; handler->CanHandle = yamux_can_handle; handler->HandleMessage = yamux_handle_message; handler->Shutdown = yamux_shutdown; @@ -155,11 +141,13 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* * @param stream_context the YamuxContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_yamux_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_yamux_close(struct Stream* stream) { + if (stream == NULL) return 0; - struct YamuxContext* ctx = (struct YamuxContext*)stream_context; - libp2p_yamux_stream_free(ctx->stream); + if (stream->stream_context == NULL) + return 0; + if (stream->parent_stream->close(stream->parent_stream)) + libp2p_yamux_stream_free(stream); return 1; } @@ -263,25 +251,6 @@ struct YamuxContext* libp2p_yamux_context_new() { return ctx; } -/*** - * Free the resources from libp2p_yamux_context_new - * @param ctx the context - */ -void libp2p_yamux_context_free(struct YamuxContext* ctx) { - if (ctx == NULL) - return; - // free all the channels - if (ctx->channels) { - for(int i = 0; i < ctx->channels->total; i++) { - struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); - curr->close(curr->stream_context); - } - libp2p_utils_vector_free(ctx->channels); - } - free(ctx); - return; -} - int libp2p_yamux_negotiate(struct YamuxContext* ctx) { const char* protocolID = "/yamux/1.0.0\n"; struct StreamMessage outgoing; @@ -343,6 +312,18 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { return retVal; } +/*** + * A new protocol was asked for. Give it a "channel" + * @param yamux_stream the yamux stream + * @param new_stream the newly negotiated protocol + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) { + // put this stream in the collection, and tie it to an id + struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; + return libp2p_yamux_stream_add(yamux_context, new_stream); +} + /*** * Negotiate the Yamux protocol * @param parent_stream the parent stream @@ -357,6 +338,7 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { out->write = libp2p_yamux_write; out->peek = libp2p_yamux_peek; out->read_raw = libp2p_yamux_read_raw; + out->handle_upgrade = libp2p_yamux_handle_upgrade; out->address = parent_stream->address; // build YamuxContext struct YamuxContext* ctx = libp2p_yamux_context_new(); @@ -375,6 +357,44 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { return out; } + +/** + * Clean up resources from libp2p_yamux_channel_new + * @param ctx the YamuxChannelContext + */ +int libp2p_yamux_channel_close(void* context) { + if (context == NULL) + return 0; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; + if (ctx != NULL) { + // close the child's stream + ctx->child_stream->close(ctx->child_stream); + libp2p_stream_free(ctx->stream); + free(ctx); + } + return 1; +} + +/*** + * Free the resources from libp2p_yamux_context_new + * @param ctx the context + */ +void libp2p_yamux_context_free(struct YamuxContext* ctx) { + if (ctx == NULL) + return; + // free all the channels + if (ctx->channels) { + for(int i = 0; i < ctx->channels->total; i++) { + struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); + //curr->close(curr->stream_context); + libp2p_yamux_channel_close(curr->stream_context); + } + libp2p_utils_vector_free(ctx->channels); + } + free(ctx); + return; +} + /** * Frees resources held by the stream * @param yamux_stream the stream @@ -387,6 +407,50 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) { libp2p_stream_free(yamux_stream); } +/*** + * Channels calling close on the stream should not be able + * to clean up layers below + * @param context the context + * @returns true(1); + */ +int libp2p_yamux_channel_null_close(struct Stream* stream) { + return 1; +} + +/** + * Create a stream that has a "YamuxChannelContext" related to this yamux protocol + * NOTE: This "wraps" the incoming stream, so that the returned stream is the parent + * of the incoming_stream + * @param incoming_stream the stream of the new protocol + * @returns a new Stream that has a YamuxChannelContext, and incoming_stream->parent_stream is set to this stream + */ +struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream) { + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + out->address = incoming_stream->address; + // don't allow the incoming_stream to close the channel + out->close = libp2p_yamux_channel_null_close; + out->parent_stream = incoming_stream->parent_stream; + out->peek = incoming_stream->parent_stream->peek; + out->read = incoming_stream->parent_stream->read; + out->read_raw = incoming_stream->parent_stream->read_raw; + out->socket_mutex = incoming_stream->parent_stream->socket_mutex; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); + ctx->channel = 0; + ctx->closed = 0; + ctx->state = 0; + ctx->window_size = 0; + ctx->type = YAMUX_CHANNEL_CONTEXT; + ctx->yamux_context = incoming_stream->parent_stream->stream_context; + ctx->stream = out; + ctx->child_stream = incoming_stream; + out->stream_context = ctx; + out->write = incoming_stream->parent_stream->write; + incoming_stream->parent_stream = out; + } + return out; +} + /**** * Add a stream "channel" to the yamux handler * @param ctx the context @@ -396,65 +460,13 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) { int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { if (stream == NULL) return 0; - // the stream's parent should have a YamuxChannelContext - char proto = ((uint8_t*)stream->parent_stream->stream_context)[0]; - if (proto == YAMUX_CHANNEL_CONTEXT) { - // the negotiation was successful. Add it to the list of channels that we have - int itemNo = libp2p_utils_vector_add(ctx->channels, stream); - struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context; - if (incoming->channel != 0) { - // this is wrong. There should have not been a channel number - return 0; - } - incoming->channel = itemNo; - return 1; - } - return 0; -} - -/** - * Clean up resources from libp2p_yamux_channel_new - * @param ctx the YamuxChannelContext - */ -int libp2p_yamux_channel_close(void* context) { - if (context == NULL) + // wrap the new stream in a YamuxChannelContext + struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream); + if (channel_stream == NULL) return 0; - struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; - if (ctx != NULL) { - if (ctx->stream != NULL) - free(ctx->stream); - free(ctx); - } + struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context; + // the negotiation was successful. Add it to the list of channels that we have + int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream); + channel_context->channel = itemNo; return 1; } - -/** - * Create a stream that has a "YamuxChannelContext" related to this yamux protocol - * @param parent_stream the parent yamux stream - * @returns a new Stream that is a YamuxChannelContext - */ -struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) { - struct Stream* out = libp2p_stream_new(); - if (out != NULL) { - out->address = parent_stream->address; - out->close = libp2p_yamux_channel_close; - out->parent_stream = parent_stream; - out->peek = parent_stream->peek; - out->read = parent_stream->read; - out->read_raw = parent_stream->read_raw; - out->socket_mutex = parent_stream->socket_mutex; - struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); - ctx->channel = 0; - ctx->closed = 0; - ctx->state = 0; - ctx->window_size = 0; - ctx->type = YAMUX_CHANNEL_CONTEXT; - ctx->yamux_context = parent_stream->stream_context; - ctx->stream = out; - out->stream_context = ctx; - out->write = parent_stream->write; - } - return out; -} - -