From 0d4d475c2c3cc1137f2740b4bf563662b087b4c2 Mon Sep 17 00:00:00 2001 From: John Jones Date: Tue, 28 Nov 2017 22:44:18 -0500 Subject: [PATCH] Swarm now functioning with yamux. Still needs more debugging. --- Makefile | 3 + conn/dialer.c | 22 +++- include/libp2p/conn/dialer.h | 8 +- include/libp2p/net/multistream.h | 15 +++ include/libp2p/secio/secio.h | 25 +++-- include/libp2p/swarm/swarm.h | 44 ++++++++ net/multistream.c | 106 +++++++++++++++++--- secio/secio.c | 112 +++++++++++++++++---- swarm/Makefile | 18 ++++ swarm/swarm.c | 166 +++++++++++++++++++++++++++++++ test/test_conn.h | 8 +- test/test_yamux.h | 6 +- yamux/session.c | 52 +++++++--- 13 files changed, 520 insertions(+), 65 deletions(-) create mode 100644 include/libp2p/swarm/swarm.h create mode 100644 swarm/Makefile diff --git a/Makefile b/Makefile index 192abfb..272a600 100644 --- a/Makefile +++ b/Makefile @@ -19,6 +19,7 @@ OBJS = \ routing/*.o \ secio/*.o \ utils/*.o \ + swarm/*.o \ yamux/*.o link: @@ -37,6 +38,7 @@ compile: cd record; make all; cd routing; make all; cd secio; make all; + cd swarm; make all; cd utils; make all; cd yamux; make all; @@ -60,6 +62,7 @@ clean: cd record; make clean; cd routing; make clean; cd secio; make clean; + cd swarm; make clean; cd utils; make clean; cd test; make clean; cd yamux; make clean; diff --git a/conn/dialer.c b/conn/dialer.c index f99f95b..faa49e8 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -25,7 +25,7 @@ struct TransportDialer* libp2p_conn_tcp_transport_dialer_new(); * @param private_key the local private key * @returns a new Dialer struct */ -struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { +struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key, struct SwarmContext* swarm) { int success = 0; struct Dialer* dialer = (struct Dialer*)malloc(sizeof(struct Dialer)); if (dialer != NULL) { @@ -33,6 +33,7 @@ struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* dialer->private_key = rsa_private_key; dialer->transport_dialers = NULL; dialer->fallback_dialer = libp2p_conn_tcp_transport_dialer_new(dialer->peer_id, rsa_private_key); + dialer->swarm = swarm; if (peer != NULL) { dialer->peer_id = malloc(peer->id_size + 1); memset(dialer->peer_id, 0, peer->id_size + 1); @@ -104,6 +105,8 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer if (conn_stream != NULL) { if (peer->sessionContext == NULL) { peer->sessionContext = libp2p_session_context_new(); + struct ConnectionContext* conn_ctx = conn_stream->stream_context; + conn_ctx->session_context = peer->sessionContext; } peer->sessionContext->insecure_stream = conn_stream; peer->sessionContext->default_stream = conn_stream; @@ -115,11 +118,22 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer } if (conn_stream == NULL) return 0; - // multistream - struct Stream* new_stream = libp2p_net_multistream_stream_new(conn_stream, 0); + // we're connected. start listening for responses + libp2p_swarm_add_peer(dialer->swarm, peer); + // wait for multistream + int counter = 0; + if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) { + return 0; + } + struct Stream* new_stream = peer->sessionContext->default_stream; if (new_stream != NULL) { // secio over multistream - new_stream = libp2p_secio_stream_new(new_stream, peer, dialer->peerstore, dialer->private_key); + new_stream = libp2p_secio_stream_new(new_stream, dialer->peerstore, dialer->private_key); + counter = 0; + if (!libp2p_secio_ready(peer->sessionContext, 10) ) { + return 0; + } + counter = 0; if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; // multistream over secio diff --git a/include/libp2p/conn/dialer.h b/include/libp2p/conn/dialer.h index 56a39aa..f8cf083 100644 --- a/include/libp2p/conn/dialer.h +++ b/include/libp2p/conn/dialer.h @@ -16,6 +16,7 @@ #include "libp2p/conn/connection.h" #include "libp2p/conn/transport_dialer.h" #include "libp2p/peer/peer.h" +#include "libp2p/swarm/swarm.h" struct Dialer { /** @@ -34,16 +35,19 @@ struct Dialer { //TODO: See dial.go, need to implement Protector struct TransportDialer* fallback_dialer; // the default dialer. NOTE: this should not be in the list of transport_dialers + struct SwarmContext* swarm; }; /** * Create a Dialer with the specified local information * NOTE: This fills in the fallback_dialer too - * @param peer_id the local PeerID + * @param peer the local Peer + * @param peerstore the local peerstore * @param private_key the local private key + * @param swarm the swarm * @returns a new Dialer struct */ -struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* private_key); +struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* private_key, struct SwarmContext* swarm); /** * free resources from the Dialer struct diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 8f95881..9188c98 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -14,11 +14,17 @@ * So in short, much of this will change. But for now, think of it as a Proof of Concept. */ +enum MultistreamStatus { + multistream_status_initialized, + multistream_status_syn, + multistream_status_ack +}; struct MultistreamContext { struct Libp2pVector* handlers; struct SessionContext* session_context; struct Stream* stream; + volatile enum MultistreamStatus status; }; /*** @@ -114,3 +120,12 @@ struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessag struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, int theyRequested); void libp2p_net_multistream_stream_free(struct Stream* stream); + +/*** + * Wait for multistream stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs); + diff --git a/include/libp2p/secio/secio.h b/include/libp2p/secio/secio.h index 500ff51..0312c6f 100644 --- a/include/libp2p/secio/secio.h +++ b/include/libp2p/secio/secio.h @@ -10,6 +10,13 @@ * Handling of a secure connection */ +enum SecioStatus { + secio_status_unknown, + secio_status_initialized, + secio_status_syn, + secio_status_ack +}; + struct SecioContext { struct Stream* stream; struct SessionContext* session_context; @@ -17,6 +24,7 @@ struct SecioContext { struct Peerstore* peer_store; struct StreamMessage* buffered_message; size_t buffered_message_pos; + volatile enum SecioStatus status; }; struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store); @@ -25,12 +33,11 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv * Initiates a secio handshake. Use this method when you want to initiate a secio * session. This should not be used to respond to incoming secio requests * @param parent_stream the parent stream - * @param remote_peer the remote peer * @param peerstore the peerstore * @param rsa_private_key the local private key * @returns a Secio Stream */ -struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key); +struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key); /*** * Initiates a secio handshake. Use this method when you want to initiate a secio @@ -58,10 +65,16 @@ int libp2p_secio_receive_protocol(struct Stream* stream); * performs initial communication over an insecure channel to share * keys, IDs, and initiate connection. This is a framed messaging system * NOTE: session must contain a valid socket_descriptor that is a multistream. - * @param local_session the secure session to be filled - * @param private_key our private key to use - * @param peerstore the collection of peers + * @param secio_stream a stream that is a Secio stream * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SecioContext* secio_context); +int libp2p_secio_handshake(struct Stream* secio_stream); + +/*** + * Wait for secio stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_secio_ready(struct SessionContext* session_context, int timeout_secs); diff --git a/include/libp2p/swarm/swarm.h b/include/libp2p/swarm/swarm.h new file mode 100644 index 0000000..590522b --- /dev/null +++ b/include/libp2p/swarm/swarm.h @@ -0,0 +1,44 @@ +#pragma once + +/*** + * This listens for requests from the connected peers + */ +#include "libp2p/utils/thread_pool.h" +#include "libp2p/db/datastore.h" +#include "libp2p/db/filestore.h" +#include "libp2p/peer/peer.h" + +struct SwarmContext { + threadpool thread_pool; + struct Libp2pVector* protocol_handlers; + struct Datastore* datastore; + struct Filestore* filestore; +}; + +/*** + * Add a connected peer to the swarm + * NOTE: We should already have a connection to the peer + * @param context the SwarmContext + * @param peer the connected peer + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_swarm_add_peer(struct SwarmContext* context, struct Libp2pPeer* peer); + +/** + * add an incoming connection + * @param context the SwarmContext + * @param file_descriptor the incoming file descriptor of the connection + * @param ip the incoming ip (ipv4 format) + * @param port the incoming port + * @return true(1) on success, false(0) otherwise + */ +int libp2p_swarm_add_connection(struct SwarmContext* context, int file_descriptor, int ip, int port ); + +/** + * Fire up the swarm engine, and return its context + * @param protocol_handlers the protocol handlers + * @param datastore the datastore + * @param filestore the file store + * @returns the SwarmContext + */ +struct SwarmContext* libp2p_swarm_new(struct Libp2pVector* protocol_handlers, struct Datastore* datastore, struct Filestore* filestore); diff --git a/net/multistream.c b/net/multistream.c index 6dec817..9795afb 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -165,7 +165,7 @@ struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessag * @param msg the data to send * @returns the number of bytes written */ -int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) { +int libp2p_net_multistream_write_without_check(void* stream_context, struct StreamMessage* incoming) { struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context; struct Stream* parent_stream = multistream_context->stream->parent_stream; int num_bytes = 0; @@ -184,6 +184,47 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* inc return num_bytes; } +/*** + * Wait for multistream stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs) { + int counter = 0; + while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (session_context->default_stream->stream_type == STREAM_TYPE_MULTISTREAM && counter < 5) { + struct MultistreamContext* ctx = (struct MultistreamContext*)session_context->default_stream->stream_context; + while (ctx->status != multistream_status_ack && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (ctx->status == multistream_status_ack) + return 1; + } + return 0; +} + +/** + * Write to an open multistream host + * @param stream_context the session context + * @param msg the data to send + * @returns the number of bytes written + */ +int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) { + struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context; + + if (multistream_context->status != multistream_status_ack) { + libp2p_logger_error("multistream", "Attempt to write before protocol is completely set up.\n"); + return 0; + } + + return libp2p_net_multistream_write_without_check(stream_context, incoming); +} + /** * Read from a multistream socket * @param socket_fd the socket file descriptor @@ -306,8 +347,9 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, } /** - * Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function. - * Servers should send the protocol ID, and then expect it back. + * Negotiate the multistream protocol by sending the protocol id. This is a server side function. + * Servers should send the protocol ID, and then expect it back. Receiving the + * protocol id back is the responsibility of a future read, not part of this function. * NOTE: the SessionContext should already contain the connected stream. If not, use * libp2p_net_multistream_connect instead of this method. * @@ -320,38 +362,46 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq struct StreamMessage outgoing; struct StreamMessage* results = NULL; int retVal = 0; - int haveTheirs = 0; - int peek_result = 0; + //int haveTheirs = 0; + //int peek_result = 0; + /* if (!theyRequested) { // see if they're trying to send something first peek_result = libp2p_net_multistream_peek(ctx); - /* - if (peek_result < 0) { - libp2p_logger_error("multistream", "Attempted a peek, but received an error.\n"); - return 0; - } - */ if (peek_result > 0) { libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result); // get the protocol - //ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, multistream_default_timeout); libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); - if (results == NULL || results->data_size == 0) + if (results == NULL || results->data_size == 0) { + libp2p_logger_debug("multistream", "negotiate: We tried to read the %d bytes, but got nothing.\n", peek_result); goto exit; - if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) + } + if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { + libp2p_logger_debug("multistream", "negotiate: We expected the multistream id, but got %s.\n", results->data); goto exit; + } + libp2p_logger_debug("multistream", "negotiate: We read %d bytes from the network, and received the multistream id.\n", results->data_size); haveTheirs = 1; } } + */ // send the protocol id outgoing.data = (uint8_t*)protocolID; outgoing.data_size = strlen(protocolID); - if (!libp2p_net_multistream_write(ctx, &outgoing)) { + if (!libp2p_net_multistream_write_without_check(ctx, &outgoing)) { libp2p_logger_debug("multistream", "negotiate: Attempted to send the multistream id, but the write failed.\n"); goto exit; } + // update the status + if (theyRequested) { + ctx->status = multistream_status_ack; + } else { + ctx->status = multistream_status_syn; + } + + /* // wait for them to send the protocol id back if (!theyRequested && !haveTheirs) { libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n"); @@ -366,6 +416,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq goto exit; } } + */ retVal = 1; exit: @@ -406,6 +457,7 @@ int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Str // take multistream out of the picture if (new_stream->parent_stream == multistream) { new_stream->parent_stream = multistream->parent_stream; + multistream->parent_stream->handle_upgrade(multistream->parent_stream, new_stream); } return 1; } @@ -429,12 +481,14 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i out->negotiate = libp2p_net_multistream_handshake; out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->address = parent_stream->address; + out->socket_mutex = parent_stream->socket_mutex; // build MultistreamContext struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); if (ctx == NULL) { libp2p_net_multistream_stream_free(out); return NULL; } + ctx->status = multistream_status_initialized; out->stream_context = ctx; ctx->stream = out; ctx->handlers = NULL; @@ -445,6 +499,15 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i libp2p_net_multistream_stream_free(out); return NULL; } + if (!theyRequested) { + int timeout = 5; + int counter = 0; + // wait for the response + while(ctx->status != multistream_status_ack && counter < timeout) { + sleep(1); + counter++; + } + } } return out; } @@ -457,7 +520,18 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i * @returns <0 on error, 0 for the caller to stop handling this, 1 for success */ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { - // attempt negotiations + if (stream->stream_type == STREAM_TYPE_MULTISTREAM) { + // we sent a multistream, and this is them responding + struct MultistreamContext* ctx = (struct MultistreamContext*) stream->stream_context; + if (ctx->status == multistream_status_ack) { + // uh oh, this stream is already set up. error + return -1; + } else { + ctx->status = multistream_status_ack; + } + return 1; + } + // the incoming stream is not a multistream. They are attempting to upgrade to multistream struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1); if (new_stream != NULL) { // upgrade diff --git a/secio/secio.c b/secio/secio.c index 1903dde..94447ee 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -58,12 +58,26 @@ int libp2p_secio_can_handle(const struct StreamMessage* msg) { int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { libp2p_logger_debug("secio", "Handling incoming secio message.\n"); struct SecioContext* ctx = (struct SecioContext*)protocol_context; - // send them the protocol - if (!libp2p_secio_send_protocol(stream)) - return -1; - int retVal = libp2p_secio_handshake(ctx); - if (retVal) + struct Stream* secio_stream = NULL; + // get the latest stream for the session context, as it may have changed (multithreaded) + struct Stream* root_stream = stream; + while (root_stream->parent_stream != NULL) + root_stream = root_stream->parent_stream; + struct ConnectionContext* connection_context = (struct ConnectionContext*)root_stream->stream_context; + stream = connection_context->session_context->default_stream; + // have we not already started negotiating? + if (stream->stream_type != STREAM_TYPE_SECIO) { + secio_stream = libp2p_secio_stream_new(stream, ctx->peer_store, ctx->private_key); + // send them the protocol + if (!libp2p_secio_send_protocol(stream)) + return -1; + } else { + secio_stream = stream; + } + int retVal = libp2p_secio_handshake(secio_stream); + if (retVal) { return 0; + } return -1; } @@ -80,6 +94,9 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv context->buffered_message_pos = -1; context->private_key = private_key; context->peer_store = peer_store; + context->stream = NULL; + context->session_context = NULL; + context->status = secio_status_unknown; handler->context = context; handler->CanHandle = libp2p_secio_can_handle; handler->HandleMessage = libp2p_secio_handle_message; @@ -800,6 +817,11 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) { struct SecioContext* ctx = (struct SecioContext*) stream_context; struct Stream* parent_stream = ctx->stream->parent_stream; + + if (ctx->status != secio_status_ack) { + return parent_stream->write(parent_stream->stream_context, bytes); + } + struct SessionContext* session_context = ctx->session_context; // writer uses the local cipher and mac @@ -897,6 +919,10 @@ int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** byt int retVal = 0; struct SecioContext* ctx = (struct SecioContext*)stream_context; struct Stream* parent_stream = ctx->stream->parent_stream; + + if (ctx->status != secio_status_ack) { + return parent_stream->read(parent_stream->stream_context, bytes, timeout_secs); + } // reader uses the remote cipher and mac // read the data struct StreamMessage* msg = NULL; @@ -970,7 +996,7 @@ struct Libp2pPeer* libp2p_secio_get_peer_or_add(struct Peerstore* peerstore, str * @param peerstore the collection of peers * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SecioContext* secio_context) { +int libp2p_secio_handshake(struct Stream* secio_stream) { int retVal = 0; size_t bytes_written = 0; struct StreamMessage* incoming = NULL; @@ -993,10 +1019,15 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { struct StretchedKey* k1 = NULL, *k2 = NULL; struct Libp2pPeer* remote_peer = NULL; + struct SecioContext* secio_context = secio_stream->stream_context; struct SessionContext* local_session = secio_context->session_context; struct RsaPrivateKey* private_key = secio_context->private_key; struct Peerstore* peerstore = secio_context->peer_store; + libp2p_logger_debug("secio", "handshake: Getting read lock.\n"); + pthread_mutex_lock(secio_stream->socket_mutex); + libp2p_logger_debug("secio", "handshake: Got read lock.\n"); + //TODO: make sure we're not talking to ourself // send the protocol id and the outgoing Propose struct @@ -1010,6 +1041,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { propose_out = libp2p_secio_propose_build(local_session->local_nonce, private_key, SupportedExchanges, SupportedCiphers, SupportedHashes); + /* if (libp2p_logger_watching_class("secio")) { fprintf(stdout, "Our public key: "); for(int i = 0; i < propose_out->public_key_size; i++) { @@ -1017,6 +1049,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { } fprintf(stdout, "\n"); } + */ // protobuf the proposal propose_out_size = libp2p_secio_propose_protobuf_encode_size(propose_out); @@ -1027,15 +1060,17 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { // now send the Propose struct outgoing.data = propose_out_bytes; outgoing.data_size = propose_out_size; - bytes_written = libp2p_secio_unencrypted_write(secio_context->stream, &outgoing); + libp2p_logger_debug("secio", "About to send propose_out, of %d bytes.\n", propose_out_size); + bytes_written = libp2p_secio_unencrypted_write(secio_stream, &outgoing); if (bytes_written != propose_out_size) { libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); goto exit; } + libp2p_logger_debug("secio", "Sent propose_out, waiting for propose_in.\n"); // try to get the Propose struct from the remote peer - bytes_written = libp2p_secio_unencrypted_read(secio_context->stream, &incoming, 10); + bytes_written = libp2p_secio_unencrypted_read(secio_stream, &incoming, 10); if (bytes_written <= 0) { libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); goto exit; @@ -1141,7 +1176,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { outgoing.data = exchange_out_protobuf; outgoing.data_size = exchange_out_protobuf_size; - bytes_written = libp2p_secio_unencrypted_write(secio_context->stream, &outgoing); + bytes_written = libp2p_secio_unencrypted_write(secio_stream, &outgoing); if (exchange_out_protobuf_size != bytes_written) { libp2p_logger_error("secio", "Unable to write exchange_out\n"); goto exit; @@ -1154,7 +1189,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { // receive Exchange packet libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n"); - bytes_written = libp2p_secio_unencrypted_read(secio_context->stream, &incoming, 10); + bytes_written = libp2p_secio_unencrypted_read(secio_stream, &incoming, 10); if (bytes_written == 0) { libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_peer_handle_connection_error(remote_peer); @@ -1236,6 +1271,8 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { libp2p_secio_initialize_crypto(local_session); + secio_context->status = secio_status_ack; + // send their nonce to verify encryption works outgoing.data = (uint8_t*)local_session->remote_nonce; outgoing.data_size = 16; @@ -1264,17 +1301,21 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) { libp2p_stream_message_free(incoming); incoming = NULL; + /* Stream->HandleUpgrade now does this... // set up the secure stream in the struct local_session->secure_stream = local_session->insecure_stream; local_session->secure_stream->read = libp2p_secio_encrypted_read; local_session->secure_stream->write = libp2p_secio_encrypted_write; // set secure as default local_session->default_stream = local_session->secure_stream; - + */ retVal = 1; //libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake complete\n"); exit: + libp2p_logger_debug("secio", "Releasing read lock.\n"); + pthread_mutex_unlock(secio_stream->socket_mutex); + libp2p_logger_debug("secio", "Read lock released.\n"); if (propose_in_bytes != NULL) free(propose_in_bytes); if (propose_out_bytes != NULL) @@ -1356,13 +1397,19 @@ int libp2p_secio_close(struct Stream* stream) { * Initiates a secio handshake. Use this method when you want to initiate a secio * session. This should not be used to respond to incoming secio requests * @param parent_stream the parent stream - * @param remote_peer the remote peer * @param peerstore the peerstore * @param rsa_private_key the local private key * @returns a Secio Stream */ -struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { +struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { struct Stream* new_stream = libp2p_stream_new(); + // get SessionContext + struct Stream* root_stream = parent_stream; + while (root_stream->parent_stream != NULL ) + root_stream = root_stream->parent_stream; + struct ConnectionContext* connection_context = (struct ConnectionContext*)root_stream->stream_context; + struct SessionContext* session_context = connection_context->session_context; + if (new_stream != NULL) { new_stream->stream_type = STREAM_TYPE_SECIO; struct SecioContext* ctx = (struct SecioContext*) malloc(sizeof(struct SecioContext)); @@ -1375,18 +1422,19 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp ctx->buffered_message_pos = -1; new_stream->stream_context = ctx; ctx->stream = new_stream; - ctx->session_context = remote_peer->sessionContext; + ctx->session_context = session_context; ctx->peer_store = peerstore; ctx->private_key = rsa_private_key; + ctx->status = secio_status_initialized; new_stream->parent_stream = parent_stream; new_stream->close = libp2p_secio_close; new_stream->peek = libp2p_secio_peek; new_stream->read = libp2p_secio_encrypted_read; new_stream->read_raw = libp2p_secio_read_raw; new_stream->write = libp2p_secio_encrypted_write; - if (!libp2p_secio_send_protocol(parent_stream) - || !libp2p_secio_receive_protocol(parent_stream) - || !libp2p_secio_handshake(ctx)) { + new_stream->socket_mutex = parent_stream->socket_mutex; + parent_stream->handle_upgrade(parent_stream, new_stream); + if (!libp2p_secio_send_protocol(parent_stream)) { libp2p_stream_free(new_stream); new_stream = NULL; } @@ -1394,3 +1442,33 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp return new_stream; } +/*** + * Wait for secio stream to become ready + * @param session_context the session context to check + * @param timeout_secs the number of seconds to wait for things to become ready + * @returns true(1) if it becomes ready, false(0) otherwise + */ +int libp2p_secio_ready(struct SessionContext* session_context, int timeout_secs) { + int counter = 0; + while (session_context != NULL + && session_context->default_stream != NULL + && session_context->default_stream->stream_type != STREAM_TYPE_SECIO + && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (session_context != NULL + && session_context->default_stream != NULL + && session_context->default_stream->stream_type == STREAM_TYPE_SECIO + && counter < 5) { + struct SecioContext* ctx = (struct SecioContext*)session_context->default_stream->stream_context; + while (ctx->status != secio_status_ack && counter <= timeout_secs) { + counter++; + sleep(1); + } + if (ctx->status == secio_status_ack) + return 1; + } + return 0; +} + diff --git a/swarm/Makefile b/swarm/Makefile new file mode 100644 index 0000000..cd43de1 --- /dev/null +++ b/swarm/Makefile @@ -0,0 +1,18 @@ +CC = gcc +CFLAGS = -O0 -Wall -I../include -I../../c-protobuf -I../../c-multiaddr/include -std=c99 + +ifdef DEBUG +CFLAGS += -g3 +endif + +LFLAGS = +DEPS = +OBJS = swarm.o + +%.o: %.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) + +all: $(OBJS) + +clean: + rm -f *.o diff --git a/swarm/swarm.c b/swarm/swarm.c index 8b13789..c9ae58d 100644 --- a/swarm/swarm.c +++ b/swarm/swarm.c @@ -1 +1,167 @@ +/*** + * This listens for requests from the connected peers + */ +#include +#include +#include +#include "libp2p/net/protocol.h" +#include "libp2p/net/connectionstream.h" +#include "libp2p/swarm/swarm.h" +#include "libp2p/utils/logger.h" + +/** + * Helps pass information to a new thread + */ +struct SwarmSession { + struct SessionContext* session_context; + struct SwarmContext* swarm_context; +}; + +int DEFAULT_NETWORK_TIMEOUT = 5; + +/*** + * Listens on a particular stream, and marshals the request + * @param stream the stream to listen to + * @param protocol_handlers a vector of protocol handlers + * @returns <0 on error, 0 if we shouldn't handle this anymore, or 1 on success + */ +int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* protocol_handlers) { + struct StreamMessage* results = NULL; + int retVal = 0; + // Read from the network + libp2p_logger_debug("swarm", "Attempting to get read lock.\n"); + pthread_mutex_lock(stream->socket_mutex); + libp2p_logger_debug("swarm", "Got read lock.\n"); + if (!stream->read(stream->stream_context, &results, 1)) { + libp2p_logger_debug("swarm", "Releasing read lock\n"); + pthread_mutex_unlock(stream->socket_mutex); + libp2p_logger_error("swarm", "Unable to read from network. Exiting.\n"); + return retVal; + } + libp2p_logger_debug("swarm", "Releasing read lock.\n"); + pthread_mutex_unlock(stream->socket_mutex); + if (results != NULL) { + libp2p_logger_debug("swarm", "Attempting to marshal %d bytes from network.\n", results->data_size); + retVal = libp2p_protocol_marshal(results, stream, protocol_handlers); + libp2p_logger_debug("swarm", "The return value from the attempt to marshal %d bytes was %d.\n", results->data_size, retVal); + libp2p_stream_message_free(results); + } else { + libp2p_logger_debug("swarm", "Attempted read, but results were null. This is normal.\n"); + } + return retVal; +} + +/*** + * This is on its own thread, and listens for incoming data from a particular client + * @param session the SessionContext + */ +void libp2p_swarm_listen(void* ctx) { + struct SwarmSession* swarm_session = (struct SwarmSession*) ctx; + struct SessionContext* session_context = swarm_session->session_context; + int retVal = 0; + for(;;) { + // Read from the network + retVal = libp2p_swarm_listen_and_handle(session_context->default_stream, swarm_session->swarm_context->protocol_handlers); + if (retVal < 0) { + // exit the loop on error + libp2p_logger_debug("swarm", "listen: Exiting loop due to retVal being %d.\n", retVal); + break; + } + } // end of loop + + // clean up memory + if (session_context->host != NULL) + free(session_context->host); + free(swarm_session); +} + +/*** + * Add a connected peer to the swarm + * NOTE: We should already have a connection to the peer + * @param context the SwarmContext + * @param peer the connected peer + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_swarm_add_peer(struct SwarmContext* context, struct Libp2pPeer* peer) { + // spin off a thread for this peer + struct SwarmSession* swarm_session = (struct SwarmSession*) malloc(sizeof(struct SwarmSession)); + swarm_session->session_context = peer->sessionContext; + swarm_session->swarm_context = context; + + if (thpool_add_work(context->thread_pool, libp2p_swarm_listen, swarm_session) < 0) { + libp2p_logger_error("swarm", "Unable to fire up thread for peer %s\n", libp2p_peer_id_to_string(peer)); + return 0; + } + libp2p_logger_info("swarm", "add_connection: added connection for peer %s.\n", libp2p_peer_id_to_string(peer)); + + return 1; +} + + +/** + * add an incoming connection + * @param context the SwarmContext + * @param file_descriptor the incoming file descriptor of the connection + * @param ip the incoming ip (ipv4 format) + * @param port the incoming port + * @return true(1) on success, false(0) otherwise + */ +int libp2p_swarm_add_connection(struct SwarmContext* context, int file_descriptor, int ip, int port ) { + + // build a session context + struct SessionContext* session = libp2p_session_context_new(); + if (session == NULL) { + libp2p_logger_error("swarm", "Unable to allocate SessionContext. Out of memory?\n"); + return 0; + } + + session->datastore = context->datastore; + session->filestore = context->filestore; + // convert IP address to text + session->host = malloc(INET_ADDRSTRLEN); + if (session->host == NULL) { + // we are out of memory + free(session->host); + return 0; + } + if (inet_ntop(AF_INET, &ip, session->host, INET_ADDRSTRLEN) == NULL) { + free(session->host); + session->host = NULL; + session->port = 0; + return 0; + } + session->port = port; + session->insecure_stream = libp2p_net_connection_new(file_descriptor, session->host, session->port, session); + session->default_stream = session->insecure_stream; + + struct SwarmSession* swarm_session = (struct SwarmSession*) malloc(sizeof(struct SwarmSession)); + swarm_session->session_context = session; + swarm_session->swarm_context = context; + + if (thpool_add_work(context->thread_pool, libp2p_swarm_listen, swarm_session) < 0) { + libp2p_logger_error("swarm", "Unable to fire up thread for connection %d\n", file_descriptor); + return 0; + } + libp2p_logger_info("swarm", "add_connection: added connection %d.\n", file_descriptor); + + return 1; +} + +/** + * Fire up the swarm engine, and return its context + * @param protocol_handlers the protocol handlers + * @param datastore the datastore + * @param filestore the file store + * @returns the SwarmContext + */ +struct SwarmContext* libp2p_swarm_new(struct Libp2pVector* protocol_handlers, struct Datastore* datastore, struct Filestore* filestore) { + struct SwarmContext* context = (struct SwarmContext*) malloc(sizeof(struct SwarmContext)); + if (context != NULL) { + context->thread_pool = thpool_init(25); + context->protocol_handlers = protocol_handlers; + context->datastore = datastore; + context->filestore = filestore; + } + return context; +} diff --git a/test/test_conn.h b/test/test_conn.h index 29a41b4..e883221 100644 --- a/test/test_conn.h +++ b/test/test_conn.h @@ -12,7 +12,7 @@ int test_dialer_new() { peer->id = malloc(strlen(peer_id)+1); strcpy(peer->id, peer_id); peer->id_size = strlen(peer_id); - struct Dialer* dialer = libp2p_conn_dialer_new(peer, NULL, private_key); + struct Dialer* dialer = libp2p_conn_dialer_new(peer, NULL, private_key, NULL); if (dialer == NULL) goto exit; retVal = 1; @@ -38,7 +38,7 @@ int test_dialer_dial() { goto exit; peer->id_size = strlen((char*)peer->id); - dialer = libp2p_conn_dialer_new(peer, NULL, NULL); + dialer = libp2p_conn_dialer_new(peer, NULL, NULL, NULL); if (dialer == NULL) goto exit; @@ -109,7 +109,7 @@ int test_dialer_join_swarm() { peerstore = libp2p_peerstore_new(local_peer); // 3) make the dialer - dialer = libp2p_conn_dialer_new(local_peer, peerstore, rsa_private_key); + dialer = libp2p_conn_dialer_new(local_peer, peerstore, rsa_private_key, NULL); // 4) make the remote peer remote_ma = multiaddress_new_from_string(remote_swarm); @@ -155,7 +155,7 @@ int test_dialer_dial_multistream() { peer->id_size = strlen((char*)peer->id); - dialer = libp2p_conn_dialer_new(peer, NULL, NULL); + dialer = libp2p_conn_dialer_new(peer, NULL, NULL, NULL); if (dialer == NULL) goto exit; diff --git a/test/test_yamux.h b/test/test_yamux.h index c968745..7984516 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -343,7 +343,7 @@ int test_yamux_client_server_connect() { libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); sleep(1); // set up client (easiest to use transport dialers) - struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); + struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); if (stream == NULL) { @@ -391,7 +391,7 @@ int test_yamux_client_server_multistream() { libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); sleep(1); // set up client (easiest to use transport dialers) - struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); + struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); if (stream == NULL) { @@ -468,7 +468,7 @@ int test_yamux_multistream_client() { handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); libp2p_utils_vector_add(protocol_handlers, handler); // set up client (easiest to use transport dialers) - struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); + struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); if (stream == NULL) { diff --git a/yamux/session.c b/yamux/session.c index b8b03c4..1840022 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -152,6 +152,36 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po return outgoing.data_size; } +/*** + * Get message, removing the frame prefix + * @param incoming the incoming bytes + * @param incoming_size the size of incoming + * @param return_message where to put the results + * @returns true(1) on success, false(0) otherwise + */ +int yamux_pull_message_from_frame(const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message) { + if (incoming_size <= 12) { + return 0; + } + int sz = sizeof(struct yamux_frame); + *return_message = libp2p_stream_message_new(); + struct StreamMessage* msg = *return_message; + if (msg == NULL) { + libp2p_logger_debug("yamux", "pull_message_from_frame: Unable to allocate memory for message.\n"); + return 0; + } + msg->data_size = incoming_size - sz; + msg->data = (uint8_t*) malloc(msg->data_size); + if (msg->data == NULL) { + libp2p_logger_debug("yamux", "pull_message_from_frame: Unable to allocate memory for data portion of message.\n"); + libp2p_stream_message_free(msg); + *return_message = NULL; + return 0; + } + memcpy(msg->data, &incoming[sz], msg->data_size); + return 1; +} + /** * Decode an incoming message * @param context the YamuxContext or YamuxChannelContext @@ -181,6 +211,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s decode_frame(&f); + // check yamux version if (f.version != YAMUX_VERSION) { libp2p_logger_error("yamux", "Incorrect Yamux version. Expected %d but received %d.\n", YAMUX_VERSION, f.version); @@ -287,6 +318,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); + yamux_pull_message_from_frame(incoming, incoming_size, return_message); return (re < 0) ? re : (re + incoming_size); } } @@ -296,21 +328,12 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s if (f.flags & yamux_frame_syn) { libp2p_logger_debug("yamux", "Looks like we have a new stream coming in. Stream %d.\n", f.streamid); - struct StreamMessage* msg = libp2p_stream_message_new(); - - if (incoming_size > sizeof(struct yamux_frame)) { - msg->data_size = incoming_size - sizeof(struct yamux_frame); - libp2p_logger_debug("yamux", "Stream %d has data after the frame, with a length of %d.\n", f.streamid, msg->data_size); - msg->data = malloc(msg->data_size); - memcpy(msg->data, &incoming[sizeof(struct yamux_frame)], msg->data_size); - } else { - libp2p_logger_debug("yamux", "Stream %d has no extra data after the frame.\n", f.streamid); - } + yamux_pull_message_from_frame(incoming, incoming_size, return_message); // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n"); - struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg); + struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); if (yamuxChannelStream == NULL) { libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); return -EPROTO; @@ -319,7 +342,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s if (yamux_session->new_stream_fn) { libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); - yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg); + yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); } // handle window update (if there is one) struct yamux_session_stream ss = yamux_session->streams[f.streamid]; @@ -332,19 +355,22 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); channelContext->state = yamux_stream_syn_recv; if (f.type == yamux_frame_window_update) { + libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid); // send it back yamux_stream_window_update(channelContext, ss.stream->window_size); } // TODO: Start negotiations of multistream struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); if (multistream != NULL) { + libp2p_logger_debug("yamux", "Successfully negotiated multistream on stream %d.\n", f.streamid); channelContext->child_stream = multistream; + } else { + libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); } } else { libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); } - *return_message = msg; } else { libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right.");