From 2391f23b4e74fb8e58de0f01bcede417cfdae3b4 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Thu, 12 Oct 2017 12:37:40 -0500 Subject: [PATCH] Beginnings of stream locking --- conn/session.c | 39 ++++++++++++++++++++++++++++++++ include/libp2p/conn/session.h | 3 ++- include/libp2p/net/multistream.h | 7 ++++++ include/libp2p/net/stream.h | 24 ++++++++++++++++++++ include/libp2p/yamux/yamux.h | 8 +++++++ net/multistream.c | 34 +++++++++++++++++++++++++++- peer/peer.c | 30 ++++++++++++++++++++---- secio/secio.c | 1 + test/test_secio.h | 35 +++++++++++++++++++++++++++- yamux/yamux.c | 22 ++++++++++++++++++ 10 files changed, 196 insertions(+), 7 deletions(-) diff --git a/conn/session.c b/conn/session.c index 43f8742..95815f8 100644 --- a/conn/session.c +++ b/conn/session.c @@ -90,6 +90,45 @@ int libp2p_session_context_free(struct SessionContext* context) { return 1; } +/*** + * Attempt to lock a stream for personal use. Does not block. + * @param stream the stream to lock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_try_lock(struct Stream* stream) { + if (stream == NULL) + return 0; + if (pthread_mutex_trylock(&stream->socket_mutex) == 0) + return 1; + return 0; +} + +/*** + * Attempt to lock a stream for personal use. Blocks until the lock is acquired + * @param stream the stream to lock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_lock(struct Stream* stream) { + if (stream == NULL) + return 0; + if (pthread_mutex_lock(&stream->socket_mutex) == 0) + return 1; + return 0; +} + +/*** + * Attempt to unlock the mutex for this stream + * @param stream the stream to unlock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_unlock(struct Stream* stream) { + if (stream == NULL) + return 0; + if (pthread_mutex_unlock(&stream->socket_mutex) == 0) + return 1; + return 0; +} + /**** * Make a copy of a SessionContext * @param original the original diff --git a/include/libp2p/conn/session.h b/include/libp2p/conn/session.h index d6ffa59..41e269f 100644 --- a/include/libp2p/conn/session.h +++ b/include/libp2p/conn/session.h @@ -1,5 +1,5 @@ #pragma once - +#include #include "libp2p/crypto/key.h" #include "libp2p/db/datastore.h" #include "libp2p/db/filestore.h" @@ -18,6 +18,7 @@ struct SessionContext { char* host; int port; enum IPTrafficType traffic_type; + pthread_mutex_t stream_mutex; // once the connection is established /** * Note: default_stream should be used in most cases. Often, insecure_stream and secure_stream will be diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 53b54c7..4089272 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -33,6 +33,13 @@ struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void */ int libp2p_net_multistream_send_protocol(struct SessionContext *context); +/*** + * Check to see if the reply is the multistream protocol header we expect + * NOTE: if we initiate the connection, we should expect the same back + * @param context the SessionContext + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_net_multistream_receive_protocol(struct SessionContext* context); /** * Read from a multistream socket diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 5fb06a8..d94b56a 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -1,5 +1,7 @@ #pragma once +#include + /** * An interface in front of various streams */ @@ -8,6 +10,7 @@ struct Stream { * A generic socket descriptor */ void* socket_descriptor; + pthread_mutex_t socket_mutex; struct MultiAddress *address; /** @@ -46,3 +49,24 @@ struct Stream { */ int (*peek)(void* stream_context); }; + +/*** + * Attempt to lock a stream for personal use. Does not block. + * @param stream the stream to lock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_try_lock(struct Stream* stream); + +/*** + * Attempt to lock a stream for personal use. Blocks until the lock is acquired + * @param stream the stream to lock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_lock(struct Stream* stream); + +/*** + * Attempt to unlock the mutex for this stream + * @param stream the stream to unlock + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_stream_unlock(struct Stream* stream); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 5a204fb..e45d365 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -13,3 +13,11 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(); * @returns true(1) on success, false(0) otherwise */ int yamux_send_protocol(struct SessionContext* context); + +/*** + * Check to see if the reply is the yamux protocol header we expect + * NOTE: if we initiate the connection, we should expect the same back + * @param context the SessionContext + * @returns true(1) on success, false(0) otherwise + */ +int yamux_receive_protocol(struct SessionContext* context); diff --git a/net/multistream.c b/net/multistream.c index 55b9204..a02cb42 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -38,9 +38,40 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco return 0; } +/*** + * Send the multistream header out the default stream + * @param context the context + * @returns true(1) on success, false(0) otherwise + */ int libp2p_net_multistream_send_protocol(struct SessionContext *context) { char *protocol = "/multistream/1.0.0\n"; - return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)); + if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { + libp2p_logger_error("multistream", "send_protocol: Unable to send multistream protocol header.\n"); + return 0; + } + return 1; +} + +/*** + * Check to see if the reply is the multistream protocol header we expect + * NOTE: if we initiate the connection, we should expect the same back + * @param context the SessionContext + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_net_multistream_receive_protocol(struct SessionContext* context) { + char* protocol = "/multistream/1.0.0\n"; + uint8_t* results = NULL; + size_t results_size = 0; + if (!context->default_stream->read(context, &results, &results_size, 30)) { + libp2p_logger_error("multistream", "receive_protocol: Unable to read results.\n"); + return 0; + } + // the first byte is the size, so skip it + char* ptr = strstr((char*)&results[1], protocol); + if (ptr == NULL || ptr - (char*)results > 1) { + return 0; + } + return 1; } int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) { @@ -456,6 +487,7 @@ void libp2p_net_multistream_stream_free(struct Stream* stream) { struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port) { struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); if (out != NULL) { + pthread_mutex_init(&out->socket_mutex, NULL); out->socket_descriptor = malloc(sizeof(int)); *((int*)out->socket_descriptor) = socket_fd; int res = *((int*)out->socket_descriptor); diff --git a/peer/peer.c b/peer/peer.c index 2a7cdff..0e7a12c 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -122,26 +122,48 @@ int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPee peer->sessionContext->datastore = datastore; peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout); if (peer->sessionContext->insecure_stream == NULL) { - libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer)); + libp2p_logger_error("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer)); free(ip); return 0; } - if (peer->sessionContext->insecure_stream != NULL) { - peer->sessionContext->default_stream = peer->sessionContext->insecure_stream; - peer->connection_type = CONNECTION_TYPE_CONNECTED; + peer->sessionContext->default_stream = peer->sessionContext->insecure_stream; + peer->connection_type = CONNECTION_TYPE_CONNECTED; + // lock the stream + if (!libp2p_stream_lock(peer->sessionContext->default_stream)) { + libp2p_logger_error("peer", "Unable to lock the newly created peer stream for peer %s.\n", libp2p_peer_id_to_string(peer)); + free(ip); + return 0; } // switch to secio if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) { libp2p_logger_error("peer", "Attempted secio handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer)); free(ip); + libp2p_stream_unlock(peer->sessionContext->default_stream); + return 0; + } + //switch to multistream + if (!libp2p_net_multistream_negotiate(peer->sessionContext)) { + libp2p_logger_error("peer", "Attempted multistream handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer)); + free(ip); + libp2p_stream_unlock(peer->sessionContext->default_stream); return 0; } // switch to yamux if (!yamux_send_protocol(peer->sessionContext)) { libp2p_logger_error("peer", "Attempted yamux handshake, but could not send protocol header for peer %s.\n", libp2p_peer_id_to_string(peer)); free(ip); + libp2p_stream_unlock(peer->sessionContext->default_stream); return 0; } + libp2p_stream_unlock(peer->sessionContext->default_stream); + /* + // expect yamux back + if (!yamux_receive_protocol(peer->sessionContext)) { + libp2p_logger_error("peer", "Attempted yamux handshake, but received unexpected response.\n"); + free(ip); + return 0; + } + */ free(ip); } // is IP now = time(NULL); diff --git a/secio/secio.c b/secio/secio.c index 2b2784d..db5778b 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -995,6 +995,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs struct MultiAddress* ma = multiaddress_new_from_string(url); if (ma == NULL) { libp2p_logger_error("secio", "Unable to generate MultiAddress from [%s].\n", url); + goto exit; } else { libp2p_logger_debug("secio", "Adding %s to peer %s.\n", url, libp2p_peer_id_to_string(remote_peer)); struct Libp2pLinkedList* ma_ll = libp2p_utils_linked_list_new(); diff --git a/test/test_secio.h b/test/test_secio.h index 2b7cb4b..a377201 100644 --- a/test/test_secio.h +++ b/test/test_secio.h @@ -94,6 +94,7 @@ int test_secio_handshake() { } // attempt to write the protocol, and see what comes back + /* char* protocol = "/secio/1.0.0\n"; int protocol_size = strlen(protocol); secure_session->insecure_stream->write(secure_session, (unsigned char*)protocol, protocol_size); @@ -117,6 +118,22 @@ int test_secio_handshake() { } goto exit; } + */ + // a new way to do the above + if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { + libp2p_logger_error("test_secio", "Unable to do handshake\n"); + if (secure_session->shared_key != NULL) { + fprintf(stdout, "Shared key: "); + for(int i = 0; i < secure_session->shared_key_size; i++) + fprintf(stdout, "%d ", secure_session->shared_key[i]); + fprintf(stdout, "\nLocal stretched key: "); + print_stretched_key(secure_session->local_stretched_key); + fprintf(stdout, "\nRemote stretched key: "); + print_stretched_key(secure_session->remote_stretched_key); + fprintf(stdout, "\n"); + } + goto exit; + } /* fprintf(stdout, "Shared key: "); @@ -415,6 +432,7 @@ int test_secio_handshake_go() { } // attempt to write the protocol, and see what comes back + /* char* protocol = "/secio/1.0.0\n"; int protocol_size = strlen(protocol); secure_session->insecure_stream->write(secure_session, (unsigned char*)protocol, protocol_size); @@ -423,7 +441,6 @@ int test_secio_handshake_go() { size_t bytes_read = 0; int timeout = 30; secure_session->insecure_stream->read(secure_session, &buffer, &bytes_read, timeout); - if (!libp2p_secio_handshake(secure_session, rsa_private_key, peerstore)) { fprintf(stderr, "test_secio_handshake: Unable to do handshake\n"); if (secure_session->shared_key != NULL) { @@ -438,6 +455,22 @@ int test_secio_handshake_go() { } goto exit; } + */ + // a new way to do the above + if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { + libp2p_logger_error("test_secio", "Unable to do handshake.\n"); + if (secure_session->shared_key != NULL) { + fprintf(stdout, "Shared key: "); + for(int i = 0; i < secure_session->shared_key_size; i++) + fprintf(stdout, "%d ", secure_session->shared_key[i]); + fprintf(stdout, "\nLocal stretched key: "); + print_stretched_key(secure_session->local_stretched_key); + fprintf(stdout, "\nRemote stretched key: "); + print_stretched_key(secure_session->remote_stretched_key); + fprintf(stdout, "\n"); + } + goto exit; + } /* fprintf(stdout, "Shared key: "); diff --git a/yamux/yamux.c b/yamux/yamux.c index c2dddd5..f22abee 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -63,6 +63,28 @@ int yamux_send_protocol(struct SessionContext* context) { return 1; } +/*** + * Check to see if the reply is the yamux protocol header we expect + * NOTE: if we initiate the connection, we should expect the same back + * @param context the SessionContext + * @returns true(1) on success, false(0) otherwise + */ +int yamux_receive_protocol(struct SessionContext* context) { + char* protocol = "/yamux/1.0.0\n"; + uint8_t* results = NULL; + size_t results_size = 0; + if (!context->default_stream->read(context, &results, &results_size, 30)) { + libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); + return 0; + } + // the first byte is the size, so skip it + char* ptr = strstr((char*)&results[1], protocol); + if (ptr == NULL || ptr - (char*)results > 1) { + return 0; + } + return 1; +} + /*** * Handles the message * @param incoming the incoming data buffer