diff --git a/conn/session.c b/conn/session.c index 971a6e5..db8f3eb 100644 --- a/conn/session.c +++ b/conn/session.c @@ -44,7 +44,7 @@ struct SessionContext* libp2p_session_context_new() { int libp2p_session_context_free(struct SessionContext* context) { if (context != NULL) { if (context->default_stream != NULL) - context->default_stream->close(context); + context->default_stream->close(context->default_stream); context->default_stream = NULL; context->insecure_stream = NULL; context->secure_stream = NULL; diff --git a/conn/tcp_transport_dialer.c b/conn/tcp_transport_dialer.c index 5804bb9..6859333 100644 --- a/conn/tcp_transport_dialer.c +++ b/conn/tcp_transport_dialer.c @@ -24,7 +24,7 @@ struct Stream* libp2p_conn_tcp_dial(const struct TransportDialer* transport_dial if (!multiaddress_get_ip_address(addr, &ip)) return NULL; - struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port); + struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port, NULL); free(ip); return stream; diff --git a/identify/identify.c b/identify/identify.c index 079a31d..3eafbd8 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -35,12 +35,12 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) { * @param context the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_send_protocol(struct IdentifyContext *context) { +int libp2p_identify_send_protocol(struct Stream *stream) { char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage msg; msg.data = (uint8_t*) protocol; msg.data_size = strlen(protocol); - if (!context->parent_stream->write(context->parent_stream->stream_context, &msg)) { + if (!stream->write(stream->stream_context, &msg)) { libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); return 0; } @@ -53,10 +53,10 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_identify_receive_protocol(struct IdentifyContext* context) { +int libp2p_identify_receive_protocol(struct Stream* stream) { const char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage* results = NULL; - if (!context->parent_stream->read(context->parent_stream->stream_context, &results, 30)) { + if (!stream->read(stream->stream_context, &results, 30)) { libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); return 0; } @@ -80,15 +80,14 @@ int libp2p_identify_receive_protocol(struct IdentifyContext* context) { * @param protocol_context the identify protocol context * @returns <0 on error, 0 if loop should not continue, >0 on success */ -int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { - if (protocol_context == NULL) +int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct Stream* new_stream = libp2p_identify_stream_new(stream); + if (new_stream == NULL) return -1; // attempt to create a new Identify connection with them. // send the protocol id back, and set up the channel - struct IdentifyContext* ctx = (struct IdentifyContext*)protocol_context; - libp2p_identify_send_protocol(ctx); //TODO: now add this "channel" - return 1; + return stream->handle_upgrade(stream, new_stream); } /** @@ -97,13 +96,16 @@ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Sessi * @returns true(1) */ int libp2p_identify_shutdown(void* protocol_context) { - return 0; + if (protocol_context == NULL) + return 0; + free(protocol_context); + return 1; } struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = handlers; + handler->context = handler; handler->CanHandle = libp2p_identify_can_handle; handler->HandleMessage = libp2p_identify_handle_message; handler->Shutdown = libp2p_identify_shutdown; @@ -111,13 +113,14 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp return handler; } -int libp2p_identify_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_identify_close(struct Stream* stream) { + if (stream == NULL) return 0; - struct IdentifyContext* ctx = (struct IdentifyContext*)stream_context; - ctx->parent_stream->close(ctx->parent_stream->stream_context); - free(ctx->stream); - free(ctx); + if (stream->parent_stream != NULL) + stream->parent_stream->close(stream->parent_stream); + if (stream->stream_context != NULL) + free(stream->stream_context); + libp2p_stream_free(stream); return 1; } @@ -146,7 +149,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { ctx->stream = out; out->stream_context = ctx; out->close = libp2p_identify_close; - if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) { + if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) { libp2p_stream_free(out); free(ctx); return NULL; diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index 2311d25..b2bec1a 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -29,9 +29,8 @@ struct IdentifyContext { }; int libp2p_identify_can_handle(const struct StreamMessage* msg); -int libp2p_identify_send_protocol(struct IdentifyContext *context); -int libp2p_identify_receive_protocol(struct IdentifyContext* context); -int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context); +int libp2p_identify_send_protocol(struct Stream* stream); +int libp2p_identify_receive_protocol(struct Stream* stream); int libp2p_identify_shutdown(void* protocol_context); struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers); diff --git a/include/libp2p/net/connectionstream.h b/include/libp2p/net/connectionstream.h index 38c6c16..d928e75 100644 --- a/include/libp2p/net/connectionstream.h +++ b/include/libp2p/net/connectionstream.h @@ -1,6 +1,7 @@ #pragma once #include "libp2p/net/stream.h" +#include "libp2p/conn/session.h" /*** * Create a new stream based on a network connection @@ -9,7 +10,24 @@ * @param port the port of the connection * @returns a Stream */ -struct Stream* libp2p_net_connection_new(int fd, char* ip, int port); +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context); + +/** + * Attempt to upgrade the parent_stream to use the new stream by default + * @param parent_stream the parent stream + * @param new_stream the new stream + * @returns true(1) on success, false(0) if not + */ +int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream); + +/** + * Given a stream, find the SessionContext + * NOTE: This is done by navigating to the root context, which should + * be a ConnectionContext, then grabbing the SessionContext there. + * @param stream the stream to use + * @returns the SessionContext for this stream + */ +struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream); /*** * These are put here to allow implementations of struct Stream diff --git a/include/libp2p/net/protocol.h b/include/libp2p/net/protocol.h index a9992fd..801636f 100644 --- a/include/libp2p/net/protocol.h +++ b/include/libp2p/net/protocol.h @@ -22,11 +22,11 @@ struct Libp2pProtocolHandler { * Handles the message * @param incoming the incoming data buffer * @param incoming_size the size of the incoming data buffer - * @param session_context the information about the incoming connection + * @param stream the incoming stream * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ - int (*HandleMessage)(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context); + int (*HandleMessage)(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context); /** * Shutting down. Clean up any memory allocations @@ -45,8 +45,15 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new(); /*** * Handle an incoming message * @param message the incoming message - * @param session the SessionContext of the incoming connection + * @param stream the incoming connection * @param handlers a Vector of protocol handlers - * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success + * @returns -1 on error, 0 on protocol upgrade, 1 on success */ -int libp2p_protocol_marshal(struct StreamMessage* message, struct SessionContext* context, struct Libp2pVector* protocol_handlers); +int libp2p_protocol_marshal(struct StreamMessage* message, struct Stream* stream, struct Libp2pVector* protocol_handlers); + +/*** + * Shut down all protocol handlers and free vector + * @param handlers vector of Libp2pProtocolHandler + * @returns true(1) + */ +int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index a6a920a..370d372 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -46,6 +46,8 @@ struct Stream { struct MultiAddress* address; // helps identify who is on the other end pthread_mutex_t* socket_mutex; // only 1 transmission at a time struct Stream* parent_stream; // what stream wraps this stream + int channel; // the channel (for multiplexing streams) + /** * A generic place to store implementation-specific context items */ @@ -82,10 +84,10 @@ struct Stream { * Closes a stream * * NOTE: This is also responsible for deallocating the Stream struct - * @param stream the stream context + * @param stream the stream * @returns true(1) on success, otherwise false(0) */ - int (*close)(void* stream_context); + int (*close)(struct Stream* stream); /*** * Checks to see if something is waiting on the stream @@ -94,6 +96,13 @@ struct Stream { * @returns true(1) if something is waiting, false(0) if not, -1 on error */ int (*peek)(void* stream_context); + + /** + * Handle a stream upgrade + * @param stream the current stream + * @param new_stream the newly created stream + */ + int (*handle_upgrade)(struct Stream* stream, struct Stream* new_stream); }; struct Stream* libp2p_stream_new(); diff --git a/include/libp2p/routing/dht_protocol.h b/include/libp2p/routing/dht_protocol.h index 581ebf7..6915501 100644 --- a/include/libp2p/routing/dht_protocol.h +++ b/include/libp2p/routing/dht_protocol.h @@ -10,7 +10,16 @@ * This is where kademlia and dht talk to the outside world */ -struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store); + +struct DhtContext { + struct Peerstore* peer_store; + struct ProviderStore* provider_store; + struct Datastore* datastore; + struct Filestore* filestore; +}; + +struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store, + struct Datastore* datastore, struct Filestore* filestore); /** * Take existing stream and upgrade to the Kademlia / DHT protocol/codec @@ -21,20 +30,20 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context); /** * Handle a client requesting an upgrade to the DHT protocol - * @param context the context + * @param stream the stream * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handshake(struct SessionContext* context); +int libp2p_routing_dht_handshake(struct Stream* stream); /*** * Handle the incoming message. Handshake should have already * been done. We should expect that the next read contains * a protobuf'd kademlia message. - * @param session the context - * @param peerstore a list of peers + * @param stream the incoming stream + * @param protocol_context the protocol context * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore); +int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context); /*** * Send a kademlia message diff --git a/include/libp2p/secio/secio.h b/include/libp2p/secio/secio.h index e756346..500ff51 100644 --- a/include/libp2p/secio/secio.h +++ b/include/libp2p/secio/secio.h @@ -42,16 +42,17 @@ int libp2p_secio_initiate_handshake(struct SecioContext* ctx); /*** * Send the protocol string to the remote stream - * @param session the context + * @param stream stream * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SecioContext* session); +int libp2p_secio_send_protocol(struct Stream* stream); + /*** * Attempt to read the secio protocol as a reply from the remote * @param session the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SecioContext* session); +int libp2p_secio_receive_protocol(struct Stream* stream); /*** * performs initial communication over an insecure channel to share diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 27218eb..5c67f8f 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -26,7 +26,10 @@ struct YamuxContext { struct YamuxChannelContext { char type; struct YamuxContext* yamux_context; + // this stream struct Stream* stream; + // the child protocol's stream + struct Stream* child_stream; // the channel number int channel; // the window size for this channel @@ -40,14 +43,14 @@ struct YamuxChannelContext { /** * Build a handler that can handle the yamux protocol */ -struct Libp2pProtocolHandler* yamux_build_protocol_handler(); +struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(); /*** * Send the yamux protocol out the default stream * NOTE: if we initiate the connection, we should expect the same back - * @param context the SessionContext + * @param stream the stream * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct YamuxContext* context); +int yamux_send_protocol(struct Stream* stream); /*** * Check to see if the reply is the yamux protocol header we expect diff --git a/net/connectionstream.c b/net/connectionstream.c index 6884f9c..639fe27 100644 --- a/net/connectionstream.c +++ b/net/connectionstream.c @@ -12,6 +12,7 @@ #include "libp2p/net/stream.h" #include "libp2p/net/p2pnet.h" #include "libp2p/utils/logger.h" +#include "libp2p/conn/session.h" #include "multiaddr/multiaddr.h" /** @@ -19,10 +20,10 @@ * @param stream_context the ConnectionContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_net_connection_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_net_connection_close(struct Stream* stream) { + if (stream->stream_context == NULL) return 0; - struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; if (ctx != NULL) { if (ctx->socket_descriptor > 0) { close(ctx->socket_descriptor); @@ -113,7 +114,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) * @param port the port of the connection * @returns a Stream */ -struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context) { struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream)); if (out != NULL) { out->close = libp2p_net_connection_close; @@ -135,6 +136,7 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { if (ctx != NULL) { out->stream_context = ctx; ctx->socket_descriptor = fd; + ctx->session_context = session_context; if (!socket_connect4_with_timeout(ctx->socket_descriptor, hostname_to_ip(ip), port, 10) == 0) { // unable to connect libp2p_stream_free(out); @@ -144,3 +146,39 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { } return out; } + +/** + * Attempt to upgrade the parent_stream to use the new stream by default + * @param parent_stream the parent stream + * @param new_stream the new stream + * @returns true(1) on success, false(0) if not + */ +int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { + if (parent_stream == NULL) + return 0; + struct Stream* current_stream = parent_stream; + while (current_stream->parent_stream != NULL) + current_stream = current_stream->parent_stream; + // current_stream is now the root, and should have a ConnectionContext + struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context; + ctx->session_context->default_stream = new_stream; + return 1; +} + +/** + * Given a stream, find the SessionContext + * NOTE: This is done by navigating to the root context, which should + * be a ConnectionContext, then grabbing the SessionContext there. + * @param stream the stream to use + * @returns the SessionContext for this stream + */ +struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream) { + if (stream == NULL) { + return NULL; + } + struct Stream* current_stream = stream; + while (current_stream->parent_stream != NULL) + current_stream = current_stream->parent_stream; + struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context; + return ctx->session_context; +} diff --git a/net/multistream.c b/net/multistream.c index 2facd5a..532dff5 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -9,6 +9,7 @@ #include "libp2p/os/utils.h" #include "libp2p/net/p2pnet.h" +#include "libp2p/net/connectionstream.h" #include "libp2p/record/message.h" #include "libp2p/secio/secio.h" #include "varint.h" @@ -78,45 +79,6 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) { return 1; } -int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { - // try sending the protocol back - //if (!libp2p_net_multistream_send_protocol(context)) - // return -1; - - struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context; - - // try to read from the network - struct StreamMessage* results = NULL; - int retVal = 0; - int max_retries = 10; - int numRetries = 0; - // handle the call - for(;;) { - // try to read for 5 seconds - if (context->default_stream->read(context, &results, 5)) { - // we read something from the network. Process it. - // NOTE: If it is a multistream protocol that we are receiving, ignore it. - if (libp2p_net_multistream_can_handle(results)) - continue; - numRetries = 0; - retVal = libp2p_protocol_marshal(results, context, multistream_context->handlers); - if (results != NULL) - free(results); - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; - } else { - // we were unable to read from the network. - // if it timed out, we should try again (if we're not out of retries) - if (numRetries >= max_retries) - break; - numRetries++; - } - } - - return retVal; -} - int libp2p_net_multistream_shutdown(void* protocol_context) { struct MultistreamContext* context = (struct MultistreamContext*) protocol_context; if (context != NULL) { @@ -125,37 +87,13 @@ int libp2p_net_multistream_shutdown(void* protocol_context) { return 1; } -/*** - * The handler to handle calls to the protocol - * @param stream_context the context - * @returns the protocol handler - */ -struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { - - // build the context - struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); - if (context == NULL) - return NULL; - context->handlers = (struct Libp2pVector*) handler_vector; - - // build the handler - struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); - if (handler != NULL) { - handler->context = context; - handler->CanHandle = libp2p_net_multistream_can_handle; - handler->HandleMessage = libp2p_net_multistream_handle_message; - handler->Shutdown = libp2p_net_multistream_shutdown; - } - return handler; -} - /** * Close the connection and free memory * @param ctx the context * @returns true(1) on success, otherwise false(0) */ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { - int retVal = ctx->stream->close(ctx); + int retVal = ctx->stream->close(ctx->stream); // regardless of retVal, free the context // TODO: Evaluate if this is the correct way to do it: free(ctx); @@ -168,11 +106,11 @@ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { * @param stream_context a SessionContext * @returns true(1) on success, otherwise false(0) */ -int libp2p_net_multistream_close(void* stream_context) { - if (stream_context == NULL) { +int libp2p_net_multistream_close(struct Stream* stream) { + if (stream->stream_context == NULL) { return 0; } - struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context; + struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream->stream_context; return libp2p_net_multistream_context_free(multistream_context); } @@ -458,3 +396,45 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { return out; } +/*** + * The remote is attempting to negotiate the multistream protocol + * @param msg incoming message + * @param stream the incoming stream + * @param protocol_context the context for the Multistream protocol (not stream specific) + * @returns <0 on error, 0 for the caller to stop handling this, 1 for success + */ +int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + // attempt negotiations + struct Stream* new_stream = libp2p_net_multistream_stream_new(stream); + if (new_stream != NULL) { + // upgrade + return stream->handle_upgrade(stream, new_stream); + } + + return -1; +} + +/*** + * The handler to handle calls to the protocol + * @param stream_context the context + * @returns the protocol handler + */ +struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { + + // build the context + struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); + if (context == NULL) + return NULL; + context->handlers = (struct Libp2pVector*) handler_vector; + + // build the handler + struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); + if (handler != NULL) { + handler->context = context; + handler->CanHandle = libp2p_net_multistream_can_handle; + handler->HandleMessage = libp2p_net_multistream_handle_message; + handler->Shutdown = libp2p_net_multistream_shutdown; + } + return handler; +} + diff --git a/net/protocol.c b/net/protocol.c index 148d090..4ff4956 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -46,7 +46,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() { * @param handlers a Vector of protocol handlers * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success */ -int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* session, struct Libp2pVector* handlers) { +int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, struct Libp2pVector* handlers) { const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers); if (handler == NULL) { @@ -60,9 +60,22 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* se break; } } - libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str); return -1; } - return handler->HandleMessage(msg, session, handler->context); + return handler->HandleMessage(msg, stream, handler->context); +} + +/*** + * Shut down all protocol handlers and free vector + * @param handlers vector of Libp2pProtocolHandler + * @returns true(1) + */ +int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers) { + for(int i = 0; i < handlers->total; i++) { + struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*)libp2p_utils_vector_get(handlers, i); + handler->Shutdown(handler->context); + } + libp2p_utils_vector_free(handlers); + return 1; } diff --git a/net/stream.c b/net/stream.c index 6bd5590..33bdd9b 100644 --- a/net/stream.c +++ b/net/stream.c @@ -2,6 +2,11 @@ #include "multiaddr/multiaddr.h" #include "libp2p/net/stream.h" +#include "libp2p/net/connectionstream.h" + +int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { + return libp2p_net_connection_upgrade(parent_stream, new_stream); +} struct Stream* libp2p_stream_new() { struct Stream* stream = (struct Stream*) malloc(sizeof(struct Stream)); @@ -15,6 +20,8 @@ struct Stream* libp2p_stream_new() { stream->socket_mutex = NULL; stream->stream_context = NULL; stream->write = NULL; + stream->handle_upgrade = libp2p_stream_default_handle_upgrade; + stream->channel = -1; } return stream; } diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index e4f795c..320de79 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -15,11 +15,6 @@ * This is where kademlia and dht talk to the outside world */ -struct DhtContext { - struct Peerstore* peer_store; - struct ProviderStore* provider_store; -}; - int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) { if (msg->data_size < 8) return 0; @@ -34,20 +29,22 @@ int libp2p_routing_dht_shutdown(void* context) { return 1; } -int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct SessionContext* session_context, void* context) { - libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id); - struct DhtContext* ctx = (struct DhtContext*)context; - if (!libp2p_routing_dht_handshake(session_context)) +int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct DhtContext* ctx = (struct DhtContext*)protocol_context; + if (!libp2p_routing_dht_handshake(stream)) return -1; - return (libp2p_routing_dht_handle_message(session_context, ctx->peer_store, ctx->provider_store) == 0) ? -1 : 1; + return (libp2p_routing_dht_handle_message(stream, ctx) == 0) ? -1 : 1; } -struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store) { +struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store, + struct Datastore* datastore, struct Filestore* filestore) { struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*) malloc(sizeof(struct Libp2pProtocolHandler)); if (handler != NULL) { struct DhtContext* ctx = (struct DhtContext*) malloc(sizeof(struct DhtContext)); ctx->peer_store = peer_store; ctx->provider_store = provider_store; + ctx->datastore = datastore; + ctx->filestore = filestore; handler->context = ctx; handler->CanHandle = libp2p_routing_dht_can_handle; handler->HandleMessage = libp2p_routing_dht_handle_msg; @@ -111,15 +108,15 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { /** * Handle a client requesting an upgrade to the DHT protocol - * @param context the context + * @param stream the stream to the remote * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handshake(struct SessionContext* context) { +int libp2p_routing_dht_handshake(struct Stream* stream) { char* protocol = "/ipfs/kad/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*) protocol; outgoing.data_size = strlen(protocol); - return context->default_stream->write(context, &outgoing); + return stream->write(stream->stream_context, &outgoing); } /** @@ -136,14 +133,15 @@ int libp2p_routing_dht_handle_ping(struct KademliaMessage* message, unsigned cha /** * See if we have information as to who can provide this item - * @param session the context + * @param stream the incoming stream * @param message the message from the caller, contains a key - * @param peerstore the list of peers - * @param providerstore the list of peers that can provide things + * @param protocol_context the context + * @param results where to put the results + * @param results_size the size of the results * @returns true(1) on success, false(0) otherwise */ -int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct KademliaMessage* message, struct Peerstore* peerstore, - struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) { +int libp2p_routing_dht_handle_get_providers(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* protocol_context, + unsigned char** results, size_t* results_size) { unsigned char* peer_id = NULL; int peer_id_size = 0; @@ -152,16 +150,16 @@ int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, stru // Can I provide it locally? struct DatastoreRecord* datastore_record = NULL; - if (session->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, session->datastore)) { + if (protocol_context->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, protocol_context->datastore)) { // we can provide this hash from our datastore libp2p_datastore_record_free(datastore_record); libp2p_logger_debug("dht_protocol", "I can provide myself as a provider for this key.\n"); message->provider_peer_head = libp2p_utils_linked_list_new(); - message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(peerstore)); - } else if (libp2p_providerstore_get(providerstore, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) { + message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(protocol_context->peer_store)); + } else if (libp2p_providerstore_get(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) { // Can I provide it because someone announced it earlier? // we have a peer id, convert it to a peer object - struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, peer_id, peer_id_size); + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, peer_id, peer_id_size); if (peer != NULL) { libp2p_logger_debug("dht_protocol", "I can provide a provider for this key, because %s says he has it.\n", libp2p_peer_id_to_string(peer)); // add it to the message @@ -233,16 +231,15 @@ struct MultiAddress* libp2p_routing_dht_find_peer_ip_multiaddress(struct Libp2pL /*** * Remote peer has announced that he can provide a key - * @param session session context + * @param stream the incoming stream * @param message the message - * @param peerstore the peerstore - * @param providerstore the providerstore + * @param protocol_context the context * @param result_buffer where to put the result * @param result_buffer_size the size of the result buffer * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) { +int libp2p_routing_dht_handle_add_provider(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context, unsigned char** result_buffer, size_t* result_buffer_size) { int retVal = 0; struct Libp2pPeer *peer = NULL; @@ -294,10 +291,10 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc */ // now add the peer to the peerstore libp2p_logger_debug("dht_protocol", "About to add peer %s to peerstore\n", peer_ma->string); - if (!libp2p_peerstore_add_peer(peerstore, peer)) + if (!libp2p_peerstore_add_peer(protocol_context->peer_store, peer)) goto exit; libp2p_logger_debug("dht_protocol", "About to add key to providerstore\n"); - if (!libp2p_providerstore_add(providerstore, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size)) + if (!libp2p_providerstore_add(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size)) goto exit; } @@ -332,11 +329,11 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc * @param result_buffer_size the size of the results * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { +int libp2p_routing_dht_handle_get_value(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* dht_context, + unsigned char** result_buffer, size_t *result_buffer_size) { - struct Datastore* datastore = session->datastore; - struct Filestore* filestore = session->filestore; + struct Datastore* datastore = dht_context->datastore; + struct Filestore* filestore = dht_context->filestore; size_t data_size = 0; unsigned char* data = NULL; @@ -374,14 +371,15 @@ int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct K /** * Put something in the dht datastore - * @param session the session context + * @param stream the incoming stream * @param message the message * @param peerstore the peerstore * @param providerstore the providerstore + * @param datastore the datastore * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore) { +int libp2p_routing_dht_handle_put_value(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context) { if (message->record == NULL) return 0; @@ -407,7 +405,7 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K } memcpy(record->value, message->record->value, record->value_size); - int retVal = session->datastore->datastore_put(record, session->datastore); + int retVal = protocol_context->datastore->datastore_put(record, protocol_context->datastore); libp2p_datastore_record_free(record); return retVal; } @@ -422,10 +420,10 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K * @param result_buffer_size the size of the results * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct KademliaMessage* message, - struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { +int libp2p_routing_dht_handle_find_node(struct Stream* stream, struct KademliaMessage* message, + struct DhtContext* protocol_context, unsigned char** result_buffer, size_t *result_buffer_size) { // look through peer store - struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)message->key, message->key_size); + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, (unsigned char*)message->key, message->key_size); if (peer != NULL) { message->provider_peer_head = libp2p_utils_linked_list_new(); message->provider_peer_head->item = libp2p_peer_copy(peer); @@ -445,7 +443,7 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct K * @param peerstore a list of peers * @returns true(1) on success, otherwise false(0) */ -int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) { +int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context) { unsigned char *result_buffer = NULL; struct StreamMessage* buffer = NULL; size_t result_buffer_size = 0; @@ -453,7 +451,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee struct KademliaMessage* message = NULL; // read from stream - if (!session->default_stream->read(session, &buffer, 5)) + if (!stream->read(stream->stream_context, &buffer, 5)) goto exit; // unprotobuf if (!libp2p_message_protobuf_decode(buffer->data, buffer->data_size, &message)) @@ -462,19 +460,19 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee // handle message switch(message->message_type) { case(MESSAGE_TYPE_PUT_VALUE): // store a value in local storage - libp2p_routing_dht_handle_put_value(session, message, peerstore, providerstore); + libp2p_routing_dht_handle_put_value(stream, message, protocol_context); break; case(MESSAGE_TYPE_GET_VALUE): // get a value from local storage - libp2p_routing_dht_handle_get_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_get_value(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_ADD_PROVIDER): // client wants us to know he can provide something - libp2p_routing_dht_handle_add_provider(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_add_provider(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_GET_PROVIDERS): // see if we can help, and send closer peers - libp2p_routing_dht_handle_get_providers(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_get_providers(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_FIND_NODE): // find peers - libp2p_routing_dht_handle_find_node(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + libp2p_routing_dht_handle_find_node(stream, message, protocol_context, &result_buffer, &result_buffer_size); break; case(MESSAGE_TYPE_PING): libp2p_routing_dht_handle_ping(message, &result_buffer, &result_buffer_size); @@ -486,7 +484,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee struct StreamMessage outgoing; outgoing.data = result_buffer; outgoing.data_size = result_buffer_size; - if (!session->default_stream->write(session, &outgoing)) + if (!stream->write(stream->stream_context, &outgoing)) goto exit; } else { libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type); diff --git a/secio/secio.c b/secio/secio.c index 5bf965e..e080a45 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -53,11 +53,11 @@ int libp2p_secio_can_handle(const struct StreamMessage* msg) { * @param protocol_context a SecioContext that contains the needed information * @returns <0 on error, 0 if okay (does not allow daemon to continue looping) */ -int libp2p_secio_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { +int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { libp2p_logger_debug("secio", "Handling incoming secio message.\n"); struct SecioContext* ctx = (struct SecioContext*)protocol_context; // send them the protocol - if (!libp2p_secio_send_protocol(ctx)) + if (!libp2p_secio_send_protocol(stream)) return -1; int retVal = libp2p_secio_handshake(ctx); if (retVal) @@ -518,12 +518,12 @@ int libp2p_secio_make_mac_and_cipher(struct SessionContext* session, struct Stre * @param ctx the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SecioContext* ctx) { +int libp2p_secio_send_protocol(struct Stream* stream) { char* protocol = "/secio/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing); + return stream->write(stream->stream_context, &outgoing); } /*** @@ -531,12 +531,12 @@ int libp2p_secio_send_protocol(struct SecioContext* ctx) { * @param ctx the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SecioContext* ctx) { +int libp2p_secio_receive_protocol(struct Stream* stream) { char* protocol = "/secio/1.0.0\n"; int numSecs = 30; int retVal = 0; struct StreamMessage* buffer = NULL; - ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &buffer, numSecs); + stream->read(stream->stream_context, &buffer, numSecs); if (buffer == NULL) { libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); } else { @@ -1335,6 +1335,12 @@ int libp2p_secio_read_raw(void* stream_context, uint8_t* buffer, int buffer_size return max_to_read; } +int libp2p_secio_close(struct Stream* stream) { + if (stream != NULL && stream->stream_context != NULL) + free(stream->stream_context); + return 1; +} + /*** * Initiates a secio handshake. Use this method when you want to initiate a secio * session. This should not be used to respond to incoming secio requests @@ -1361,13 +1367,13 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp ctx->peer_store = peerstore; ctx->private_key = rsa_private_key; new_stream->parent_stream = parent_stream; - new_stream->close = libp2p_secio_shutdown; + new_stream->close = libp2p_secio_close; new_stream->peek = libp2p_secio_peek; new_stream->read = libp2p_secio_encrypted_read; new_stream->read_raw = libp2p_secio_read_raw; new_stream->write = libp2p_secio_encrypted_write; - if (!libp2p_secio_send_protocol(ctx) - || !libp2p_secio_receive_protocol(ctx) + if (!libp2p_secio_send_protocol(parent_stream) + || !libp2p_secio_receive_protocol(parent_stream) || !libp2p_secio_handshake(ctx)) { libp2p_stream_free(new_stream); new_stream = NULL; diff --git a/test/mock_stream.h b/test/mock_stream.h index 605cdb9..8f8d4f0 100644 --- a/test/mock_stream.h +++ b/test/mock_stream.h @@ -3,17 +3,13 @@ #include #include "libp2p/net/stream.h" -struct MockContext { - struct Stream* stream; -}; - void mock_stream_free(struct Stream* stream); -int mock_stream_close(void* context) { - if (context == NULL) - return 1; - struct MockContext* ctx = (struct MockContext*)context; - mock_stream_free(ctx->stream); +int mock_stream_close(struct Stream* stream) { + if (stream == NULL) + return 0; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; + mock_stream_free(stream); return 1; } @@ -41,8 +37,9 @@ struct Stream* mock_stream_new() { out->read = mock_stream_read; out->read_raw = mock_stream_read_raw; out->write = mock_stream_write; - struct MockContext* ctx = malloc(sizeof(struct MockContext)); - ctx->stream = out; + struct ConnectionContext* ctx = malloc(sizeof(struct ConnectionContext)); + ctx->session_context = (struct SessionContext*)malloc(sizeof(struct SessionContext)); + ctx->session_context->default_stream = out; out->stream_context = ctx; } return out; @@ -51,7 +48,12 @@ struct Stream* mock_stream_new() { void mock_stream_free(struct Stream* stream) { if (stream == NULL) return; - if (stream->stream_context != NULL) + if (stream->stream_context != NULL) { + struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context; + // this will close the session, which will be a loop, so don't + //libp2p_session_context_free(ctx->session_context); + free(ctx->session_context); free(stream->stream_context); + } free(stream); } diff --git a/test/test_conn.h b/test/test_conn.h index ed98544..29a41b4 100644 --- a/test/test_conn.h +++ b/test/test_conn.h @@ -182,7 +182,7 @@ int test_dialer_dial_multistream() { if (stream != NULL) { struct SessionContext session_context; session_context.insecure_stream = stream; - stream->close(&session_context); + stream->close(stream); libp2p_net_multistream_stream_free(stream); } return retVal; diff --git a/test/test_multistream.h b/test/test_multistream.h index f8cd2ba..b2a796d 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -21,7 +21,7 @@ int test_multistream_connect() { if (stream != NULL) { struct SessionContext ctx; ctx.insecure_stream = stream; - stream->close(&ctx); + stream->close(stream); libp2p_net_multistream_stream_free(stream); } @@ -61,7 +61,7 @@ int test_multistream_get_list() { exit: if (session.insecure_stream != NULL) { - session.insecure_stream->close(&session); + session.insecure_stream->close(session.insecure_stream); libp2p_net_multistream_stream_free(session.insecure_stream); } libp2p_stream_message_free(response); diff --git a/test/test_yamux.h b/test/test_yamux.h index e0f0c9a..6237c15 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -53,8 +53,7 @@ int test_yamux_stream_new() { retVal = 1; exit: if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + yamux_stream->close(yamux_stream); return retVal; } @@ -71,43 +70,58 @@ int test_yamux_identify() { goto exit; // Now add in another protocol mock_stream->read = mock_identify_read_protocol; - if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(libp2p_yamux_channel_new(yamux_stream)))) { + if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream))) { goto exit; } // tear down retVal = 1; exit: if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + yamux_stream->close(yamux_stream); return retVal; } int test_yamux_incoming_protocol_request() { int retVal = 0; + // setup - struct Stream* mock_stream = mock_stream_new(); - mock_stream->read = mock_yamux_read_protocol; - struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); - if (yamux_stream == NULL) - goto exit; - // build the protocol handler that can handle identify protocol + // build the protocol handler that can handle yamux and identify protocol struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1); struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers); libp2p_utils_vector_add(protocol_handlers, handler); - struct SessionContext session_context; - session_context.default_stream = yamux_stream; + handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + struct Stream* mock_stream = mock_stream_new(); + struct SessionContext* session_context = ((struct ConnectionContext*)mock_stream->stream_context)->session_context; + mock_stream->read = mock_yamux_read_protocol; + struct StreamMessage* result_message = NULL; + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { + libp2p_logger_error("test_yamux", "Unable to read Yamux protocol from mock stream.\n"); + goto exit; + } + if (libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers) < 0) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol unsuccessful.\n"); + goto exit; + } + // now we should have upgraded to the yamux protocol + libp2p_stream_message_free(result_message); + result_message = NULL; + if (session_context->default_stream->parent_stream == NULL) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol appeared susccessful, but was not.\n"); + goto exit; + } // Someone is requesting the identity protocol mock_stream->read = mock_identify_read_protocol; - struct StreamMessage* result_message; - if (!yamux_stream->read(yamux_stream->stream_context, &result_message, 10)) { + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { libp2p_logger_error("test_yamux", "Unable to read identify protocol.\n"); goto exit; } // handle the marshaling of the protocol - libp2p_protocol_marshal(result_message, &session_context, protocol_handlers); + libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers); + libp2p_stream_message_free(result_message); + result_message = NULL; // now verify the results - struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; + struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context; if (yamux_context->channels->total != 1) { libp2p_logger_error("test_yamux", "Identify protocol was not found.\n"); goto exit; @@ -116,8 +130,8 @@ int test_yamux_incoming_protocol_request() { // tear down retVal = 1; exit: - if (yamux_stream != NULL) - yamux_stream->close(yamux_stream->stream_context); - mock_stream->close(mock_stream->stream_context); + if (session_context->default_stream != NULL) + session_context->default_stream->close(session_context->default_stream); + libp2p_protocol_handlers_shutdown(protocol_handlers); return retVal; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 5a322ad..451bd07 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -5,6 +5,7 @@ #include "libp2p/yamux/yamux.h" #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" +#include "libp2p/net/connectionstream.h" #include "libp2p/conn/session.h" #include "libp2p/utils/logger.h" @@ -59,12 +60,12 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct YamuxContext* context) { +int yamux_send_protocol(struct Stream* stream) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - if (!context->stream->parent_stream->write(context->stream->parent_stream->stream_context, &outgoing)) + if (!stream->write(stream->stream_context, &outgoing)) return 0; return 1; } @@ -99,33 +100,16 @@ int yamux_receive_protocol(struct YamuxContext* context) { * The remote is attempting to negotiate yamux * @param msg the incoming message * @param incoming_size the size of the incoming data buffer - * @param session_context the information about the incoming connection + * @param stream the incoming stream * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { - struct YamuxContext* ctx = (struct YamuxContext*)protocol_context; - // we should have the yamux protocol in msg. Send the protocol back. - if (!yamux_send_protocol(ctx)) { - return 0; - } - /* - struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context); - uint8_t* buf = (uint8_t*) malloc(msg->data_size); - if (buf == NULL) +int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + struct Stream* new_stream = libp2p_yamux_stream_new(stream); + if (new_stream == NULL) return -1; - memcpy(buf, msg->data, msg->data_size); - for(;;) { - int retVal = yamux_decode(yamux, msg->data, msg->data_size); - free(buf); - buf = NULL; - if (!retVal) - break; - else { // try to read more from this stream - // TODO need more information as to what this loop should do - } - } - */ + // upgrade + stream->handle_upgrade(stream, new_stream); return 1; } @@ -135,13 +119,15 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* * @returns true(1) */ int yamux_shutdown(void* protocol_context) { + if (protocol_context != NULL) + free(protocol_context); return 0; } -struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) { +struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = handlers; + handler->context = handler; handler->CanHandle = yamux_can_handle; handler->HandleMessage = yamux_handle_message; handler->Shutdown = yamux_shutdown; @@ -155,11 +141,13 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* * @param stream_context the YamuxContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_yamux_close(void* stream_context) { - if (stream_context == NULL) +int libp2p_yamux_close(struct Stream* stream) { + if (stream == NULL) return 0; - struct YamuxContext* ctx = (struct YamuxContext*)stream_context; - libp2p_yamux_stream_free(ctx->stream); + if (stream->stream_context == NULL) + return 0; + if (stream->parent_stream->close(stream->parent_stream)) + libp2p_yamux_stream_free(stream); return 1; } @@ -263,25 +251,6 @@ struct YamuxContext* libp2p_yamux_context_new() { return ctx; } -/*** - * Free the resources from libp2p_yamux_context_new - * @param ctx the context - */ -void libp2p_yamux_context_free(struct YamuxContext* ctx) { - if (ctx == NULL) - return; - // free all the channels - if (ctx->channels) { - for(int i = 0; i < ctx->channels->total; i++) { - struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); - curr->close(curr->stream_context); - } - libp2p_utils_vector_free(ctx->channels); - } - free(ctx); - return; -} - int libp2p_yamux_negotiate(struct YamuxContext* ctx) { const char* protocolID = "/yamux/1.0.0\n"; struct StreamMessage outgoing; @@ -343,6 +312,18 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { return retVal; } +/*** + * A new protocol was asked for. Give it a "channel" + * @param yamux_stream the yamux stream + * @param new_stream the newly negotiated protocol + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) { + // put this stream in the collection, and tie it to an id + struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; + return libp2p_yamux_stream_add(yamux_context, new_stream); +} + /*** * Negotiate the Yamux protocol * @param parent_stream the parent stream @@ -357,6 +338,7 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { out->write = libp2p_yamux_write; out->peek = libp2p_yamux_peek; out->read_raw = libp2p_yamux_read_raw; + out->handle_upgrade = libp2p_yamux_handle_upgrade; out->address = parent_stream->address; // build YamuxContext struct YamuxContext* ctx = libp2p_yamux_context_new(); @@ -375,6 +357,44 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { return out; } + +/** + * Clean up resources from libp2p_yamux_channel_new + * @param ctx the YamuxChannelContext + */ +int libp2p_yamux_channel_close(void* context) { + if (context == NULL) + return 0; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; + if (ctx != NULL) { + // close the child's stream + ctx->child_stream->close(ctx->child_stream); + libp2p_stream_free(ctx->stream); + free(ctx); + } + return 1; +} + +/*** + * Free the resources from libp2p_yamux_context_new + * @param ctx the context + */ +void libp2p_yamux_context_free(struct YamuxContext* ctx) { + if (ctx == NULL) + return; + // free all the channels + if (ctx->channels) { + for(int i = 0; i < ctx->channels->total; i++) { + struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); + //curr->close(curr->stream_context); + libp2p_yamux_channel_close(curr->stream_context); + } + libp2p_utils_vector_free(ctx->channels); + } + free(ctx); + return; +} + /** * Frees resources held by the stream * @param yamux_stream the stream @@ -387,6 +407,50 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) { libp2p_stream_free(yamux_stream); } +/*** + * Channels calling close on the stream should not be able + * to clean up layers below + * @param context the context + * @returns true(1); + */ +int libp2p_yamux_channel_null_close(struct Stream* stream) { + return 1; +} + +/** + * Create a stream that has a "YamuxChannelContext" related to this yamux protocol + * NOTE: This "wraps" the incoming stream, so that the returned stream is the parent + * of the incoming_stream + * @param incoming_stream the stream of the new protocol + * @returns a new Stream that has a YamuxChannelContext, and incoming_stream->parent_stream is set to this stream + */ +struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream) { + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + out->address = incoming_stream->address; + // don't allow the incoming_stream to close the channel + out->close = libp2p_yamux_channel_null_close; + out->parent_stream = incoming_stream->parent_stream; + out->peek = incoming_stream->parent_stream->peek; + out->read = incoming_stream->parent_stream->read; + out->read_raw = incoming_stream->parent_stream->read_raw; + out->socket_mutex = incoming_stream->parent_stream->socket_mutex; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); + ctx->channel = 0; + ctx->closed = 0; + ctx->state = 0; + ctx->window_size = 0; + ctx->type = YAMUX_CHANNEL_CONTEXT; + ctx->yamux_context = incoming_stream->parent_stream->stream_context; + ctx->stream = out; + ctx->child_stream = incoming_stream; + out->stream_context = ctx; + out->write = incoming_stream->parent_stream->write; + incoming_stream->parent_stream = out; + } + return out; +} + /**** * Add a stream "channel" to the yamux handler * @param ctx the context @@ -396,65 +460,13 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) { int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { if (stream == NULL) return 0; - // the stream's parent should have a YamuxChannelContext - char proto = ((uint8_t*)stream->parent_stream->stream_context)[0]; - if (proto == YAMUX_CHANNEL_CONTEXT) { - // the negotiation was successful. Add it to the list of channels that we have - int itemNo = libp2p_utils_vector_add(ctx->channels, stream); - struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context; - if (incoming->channel != 0) { - // this is wrong. There should have not been a channel number - return 0; - } - incoming->channel = itemNo; - return 1; - } - return 0; -} - -/** - * Clean up resources from libp2p_yamux_channel_new - * @param ctx the YamuxChannelContext - */ -int libp2p_yamux_channel_close(void* context) { - if (context == NULL) + // wrap the new stream in a YamuxChannelContext + struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream); + if (channel_stream == NULL) return 0; - struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; - if (ctx != NULL) { - if (ctx->stream != NULL) - free(ctx->stream); - free(ctx); - } + struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context; + // the negotiation was successful. Add it to the list of channels that we have + int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream); + channel_context->channel = itemNo; return 1; } - -/** - * Create a stream that has a "YamuxChannelContext" related to this yamux protocol - * @param parent_stream the parent yamux stream - * @returns a new Stream that is a YamuxChannelContext - */ -struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) { - struct Stream* out = libp2p_stream_new(); - if (out != NULL) { - out->address = parent_stream->address; - out->close = libp2p_yamux_channel_close; - out->parent_stream = parent_stream; - out->peek = parent_stream->peek; - out->read = parent_stream->read; - out->read_raw = parent_stream->read_raw; - out->socket_mutex = parent_stream->socket_mutex; - struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); - ctx->channel = 0; - ctx->closed = 0; - ctx->state = 0; - ctx->window_size = 0; - ctx->type = YAMUX_CHANNEL_CONTEXT; - ctx->yamux_context = parent_stream->stream_context; - ctx->stream = out; - out->stream_context = ctx; - out->write = parent_stream->write; - } - return out; -} - -