From 1dcac6ecb526e3734e0aa067ce4def279cb4cc57 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 23 Oct 2017 15:21:50 -0500 Subject: [PATCH] Implementing the new way of swarm connection --- conn/session.c | 6 +- include/libp2p/conn/dialer.h | 4 +- include/libp2p/net/connectionstream.h | 12 ++ include/libp2p/net/multistream.h | 2 + include/libp2p/net/protocol.h | 7 +- include/libp2p/net/stream.h | 29 ++- include/libp2p/peer/peer.h | 5 +- include/libp2p/routing/dht_protocol.h | 2 +- include/libp2p/secio/secio.h | 22 +- net/Makefile | 2 +- net/connectionstream.c | 131 ++++++++++++ net/multistream.c | 289 ++++++++------------------ net/protocol.c | 36 ++-- peer/peer.c | 67 +----- routing/dht_protocol.c | 14 +- secio/secio.c | 245 ++++++---------------- test/test_conn.h | 4 - test/test_multistream.h | 2 +- test/test_secio.h | 13 +- yamux/yamux.c | 26 +-- 20 files changed, 404 insertions(+), 514 deletions(-) create mode 100644 include/libp2p/net/connectionstream.h create mode 100644 net/connectionstream.c diff --git a/conn/session.c b/conn/session.c index 49ed2c2..e8b4737 100644 --- a/conn/session.c +++ b/conn/session.c @@ -98,7 +98,7 @@ int libp2p_session_context_free(struct SessionContext* context) { int libp2p_stream_try_lock(struct Stream* stream) { if (stream == NULL) return 0; - if (pthread_mutex_trylock(&stream->socket_mutex) == 0) + if (pthread_mutex_trylock(stream->socket_mutex) == 0) return 1; return 0; } @@ -111,7 +111,7 @@ int libp2p_stream_try_lock(struct Stream* stream) { int libp2p_stream_lock(struct Stream* stream) { if (stream == NULL) return 0; - if (pthread_mutex_lock(&stream->socket_mutex) == 0) + if (pthread_mutex_lock(stream->socket_mutex) == 0) return 1; return 0; } @@ -124,7 +124,7 @@ int libp2p_stream_lock(struct Stream* stream) { int libp2p_stream_unlock(struct Stream* stream) { if (stream == NULL) return 0; - if (pthread_mutex_unlock(&stream->socket_mutex) == 0) + if (pthread_mutex_unlock(stream->socket_mutex) == 0) return 1; return 0; } diff --git a/include/libp2p/conn/dialer.h b/include/libp2p/conn/dialer.h index 0d25550..54805fc 100644 --- a/include/libp2p/conn/dialer.h +++ b/include/libp2p/conn/dialer.h @@ -1,5 +1,7 @@ +#pragma once + /*** - * A local dialer. Uses MultiAddr to figure out the best way to +* A local dialer. Uses MultiAddr to figure out the best way to * connect to a client, then returns an open Connection that can be * closed, read from and written to. The normal procedure is as follows: * 1) Create a Dialer struct, with the required information about the local host diff --git a/include/libp2p/net/connectionstream.h b/include/libp2p/net/connectionstream.h new file mode 100644 index 0000000..a4215e9 --- /dev/null +++ b/include/libp2p/net/connectionstream.h @@ -0,0 +1,12 @@ +#pragma once + +#include "libp2p/net/stream.h" + +/*** + * Create a new stream based on a network connection + * @param fd the handle to the network connection + * @param ip the IP address of the connection + * @param port the port of the connection + * @returns a Stream + */ +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port); diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 5920abd..577911b 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -17,6 +17,8 @@ struct MultistreamContext { struct Libp2pVector* handlers; + struct SessionContext* session_context; + struct Stream* stream; }; /*** diff --git a/include/libp2p/net/protocol.h b/include/libp2p/net/protocol.h index 21f50e3..440d366 100644 --- a/include/libp2p/net/protocol.h +++ b/include/libp2p/net/protocol.h @@ -1,6 +1,7 @@ #pragma once #include "libp2p/conn/session.h" #include "libp2p/utils/vector.h" +#include "libp2p/net/stream.h" /*** * An "interface" for different IPFS protocols @@ -16,7 +17,7 @@ struct Libp2pProtocolHandler { * @param incoming_size the size of the incoming data buffer * @returns true(1) if it can handle this message, false(0) if not */ - int (*CanHandle)(const uint8_t* incoming, size_t incoming_size); + int (*CanHandle)(const struct StreamMessage* msg); /*** * Handles the message * @param incoming the incoming data buffer @@ -25,7 +26,7 @@ struct Libp2pProtocolHandler { * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ - int (*HandleMessage)(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context); + int (*HandleMessage)(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context); /** * Shutting down. Clean up any memory allocations @@ -41,4 +42,4 @@ struct Libp2pProtocolHandler { */ struct Libp2pProtocolHandler* libp2p_protocol_handler_new(); -int libp2p_protocol_marshal(const uint8_t* incoming, size_t incoming_size, struct SessionContext* context, struct Libp2pVector* protocol_handlers); +int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* context, struct Libp2pVector* protocol_handlers); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index ee1dcaa..447b4e9 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -25,6 +25,15 @@ struct StreamMessage* libp2p_stream_message_new(); void libp2p_stream_message_free(struct StreamMessage* msg); +/** + * This is a context struct for a basic IP connection + */ +struct ConnectionContext { + int socket_descriptor; + struct SessionContext* session_context; +}; + + /** * An interface in front of various streams */ @@ -32,9 +41,13 @@ struct Stream { /** * A generic socket descriptor */ - void* socket_descriptor; - pthread_mutex_t socket_mutex; - struct MultiAddress *address; + struct MultiAddress* address; // helps identify who is on the other end + pthread_mutex_t* socket_mutex; // only 1 transmission at a time + struct Stream* parent_stream; // what stream wraps this stream + /** + * A generic place to store implementation-specific context items + */ + void* stream_context; /** * Reads from the stream @@ -45,6 +58,16 @@ struct Stream { */ int (*read)(void* stream_context, struct StreamMessage** message, int timeout_secs); + /** + * Reads a certain amount of bytes directly from the stream + * @param stream_context the context + * @param buffer where to put the results + * @param buffer_size the number of bytes to read + * @param timeout_secs number of seconds before a timeout + * @returns number of bytes read, or -1 on error + */ + int (*read_raw)(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs); + /** * Writes to a stream * @param stream the stream context (usually a SessionContext pointer) diff --git a/include/libp2p/peer/peer.h b/include/libp2p/peer/peer.h index 418f7fb..46666a1 100644 --- a/include/libp2p/peer/peer.h +++ b/include/libp2p/peer/peer.h @@ -4,6 +4,7 @@ #include "libp2p/net/stream.h" #include "libp2p/crypto/rsa.h" #include "libp2p/conn/session.h" +#include "libp2p/conn/dialer.h" struct Peerstore; @@ -50,13 +51,13 @@ void libp2p_peer_free(struct Libp2pPeer* in); * Attempt to connect to the peer, setting connection_type correctly * NOTE: If successful, this will set peer->connection to the stream * - * @param privateKey the local private key to use + * @param dialer the dialer * @param peer the peer to connect to * @param peerstore if connection is successfull, will add peer to peerstore * @param timeout number of seconds before giving up * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); +int libp2p_peer_connect(const struct Dialer* dialer, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); /*** * Clean up a bad connection diff --git a/include/libp2p/routing/dht_protocol.h b/include/libp2p/routing/dht_protocol.h index c5868c0..581ebf7 100644 --- a/include/libp2p/routing/dht_protocol.h +++ b/include/libp2p/routing/dht_protocol.h @@ -63,5 +63,5 @@ int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, st * @param msg the message to send * @returns true(1) if we sent to at least 1, false(0) otherwise */ -int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, +int libp2p_routing_dht_send_message_nearest_x(const struct Dialer* dialer, struct Peerstore* peerstore, struct Datastore* datastore, struct KademliaMessage* msg, int numToSend); diff --git a/include/libp2p/secio/secio.h b/include/libp2p/secio/secio.h index c61b8b8..0a8c865 100644 --- a/include/libp2p/secio/secio.h +++ b/include/libp2p/secio/secio.h @@ -10,38 +10,40 @@ * Handling of a secure connection */ +struct SecioContext { + struct Stream* stream; + struct SessionContext* session_context; + struct RsaPrivateKey* private_key; + struct Peerstore* peer_store; +}; struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store); /*** * performs initial communication over an insecure channel to share * keys, IDs, and initiate connection. This is a framed messaging system - * @param session the secure session to be filled - * @param private_key the local private key to use - * @param remote_requested the other side is who asked for the upgrade + * @param ctx the SecioContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SessionContext* session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore); +int libp2p_secio_handshake(struct SecioContext* ctx); /*** * Initiates a secio handshake. Use this method when you want to initiate a secio * session. This should not be used to respond to incoming secio requests - * @param session_context the session context - * @param private_key the RSA private key to use - * @param peer_store the peer store + * @param ctx the SecioContext * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store); +int libp2p_secio_initiate_handshake(struct SecioContext* ctx); /*** * Send the protocol string to the remote stream * @param session the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SessionContext* session); +int libp2p_secio_send_protocol(struct SecioContext* session); /*** * Attempt to read the secio protocol as a reply from the remote * @param session the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SessionContext* session); +int libp2p_secio_receive_protocol(struct SecioContext* session); diff --git a/net/Makefile b/net/Makefile index c04b95d..715d342 100644 --- a/net/Makefile +++ b/net/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = sctp.o socket.o tcp.o udp.o multistream.o protocol.o +OBJS = sctp.o socket.o tcp.o udp.o multistream.o protocol.o connectionstream.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/net/connectionstream.c b/net/connectionstream.c new file mode 100644 index 0000000..ff1191f --- /dev/null +++ b/net/connectionstream.c @@ -0,0 +1,131 @@ + +/** + * A raw network connection, that implements Stream + */ + +#include +#include +#include +#include +#include +#include "libp2p/net/stream.h" +#include "libp2p/net/p2pnet.h" +#include "multiaddr/multiaddr.h" + +/** + * Close a network connection + * @param stream_context the ConnectionContext + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_net_connection_close(void* stream_context) { + if (stream_context == NULL) + return 0; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context; + if (close(ctx->socket_descriptor) == 0) + // everything was okay + return 1; + // something went wrong + return 0; +} + +/*** + * Check and see if there is anything waiting on this network connection + * @param stream_context the ConnectionContext + * @returns number of bytes waiting, or -1 on error + */ +int libp2p_net_connection_peek(void* stream_context) { + if (stream_context == NULL) + return 0; + struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context; + int socket_fd = ctx->socket_descriptor; + if (socket_fd < 0) + return -1; + + int bytes = 0; + if (ioctl(socket_fd, FIONREAD, &bytes) < 0) { + // Ooff, we're having problems. Don't use this socket again. + return -1; + } + return bytes; +} + +/** + * Read from the network + * @param stream_context the ConnectionContext + * @param msg where to put the results + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_net_connection_read(void* stream_context, struct StreamMessage** msg, int timeout_secs) { + return 0; +} + +/** + * Reads a certain amount of bytes directly from the stream + * @param stream_context the context + * @param buffer where to put the results + * @param buffer_size the number of bytes to read + * @param timeout_secs number of seconds before a timeout + * @returns number of bytes read, or -1 on error + */ +int libp2p_net_connection_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) { + if (stream_context == NULL) + return -1; + struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context; + int num_read = 0; + for(int i = 0; i < buffer_size; i++) { + int retVal = socket_read(ctx->socket_descriptor, (char*)&buffer[i], 1, 0, timeout_secs); + if (retVal < 1) { // get out of the loop + if (retVal < 0) // error + return -1; + break; + } + num_read += retVal; // Everything ok, loop again (possibly) + } + return num_read; +} + +/** + * Writes to a stream + * @param stream the stream context (usually a SessionContext pointer) + * @param buffer what to write + * @returns number of bytes written + */ +int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) { + if (stream_context == NULL) + return -1; + struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context; + return socket_write(ctx->socket_descriptor, (char*)msg->data, msg->data_size, 0); +} + +/*** + * Create a new stream based on a network connection + * @param fd the handle to the network connection + * @param ip the IP address of the connection + * @param port the port of the connection + * @returns a Stream + */ +struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) { + struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream)); + if (out != NULL) { + out->close = libp2p_net_connection_close; + out->peek = libp2p_net_connection_peek; + out->read = libp2p_net_connection_read; + out->read_raw = libp2p_net_connection_read_raw; + out->write = libp2p_net_connection_write; + // Multiaddresss + char str[strlen(ip) + 25]; + memset(str, 0, strlen(ip) + 16); + sprintf(str, "/ip4/%s/tcp/%d", ip, port); + out->address = multiaddress_new_from_string(str); + out->parent_stream = NULL; + // mutex + out->socket_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(out->socket_mutex, NULL); + // context + struct ConnectionContext* ctx = (struct ConnectionContext*) malloc(sizeof(struct ConnectionContext)); + if (ctx != NULL) { + out->stream_context = ctx; + } + } + return out; +} diff --git a/net/multistream.c b/net/multistream.c index 1a94cb1..df11f89 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -23,9 +23,11 @@ int multistream_default_timeout = 5; * An implementation of the libp2p multistream */ -int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t incoming_size) { +int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { char *protocol = "/multistream/1.0.0\n"; int protocol_size = strlen(protocol); + unsigned char* incoming = msg->data; + size_t incoming_size = msg->data_size; // is there a varint in front? size_t num_bytes = 0; if (incoming[0] != '/' && incoming[1] != 'm') { @@ -76,7 +78,7 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) { return 1; } -int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) { +int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { // try sending the protocol back //if (!libp2p_net_multistream_send_protocol(context)) // return -1; @@ -94,10 +96,10 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi if (context->default_stream->read(context, &results, 5)) { // we read something from the network. Process it. // NOTE: If it is a multistream protocol that we are receiving, ignore it. - if (libp2p_net_multistream_can_handle(results->data, results->data_size)) + if (libp2p_net_multistream_can_handle(results)) continue; numRetries = 0; - retVal = libp2p_protocol_marshal(results->data, results->data_size, context, multistream_context->handlers); + retVal = libp2p_protocol_marshal(results, context, multistream_context->handlers); if (results != NULL) free(results); // exit the loop on error (or if they ask us to no longer loop by returning 0) @@ -147,48 +149,48 @@ struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void return handler; } +/** + * Close the connection and free memory + * @param ctx the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) { + int retVal = ctx->stream->close(ctx); + // regardless of retVal, free the context + // TODO: Evaluate if this is the correct way to do it: + free(ctx); + return retVal; +} + /*** * Close the Multistream interface * NOTE: This also closes the socket * @param stream_context a SessionContext - * @returns true(1) + * @returns true(1) on success, otherwise false(0) */ int libp2p_net_multistream_close(void* stream_context) { - struct SessionContext* secure_context = (struct SessionContext*)stream_context; - struct Stream* stream = secure_context->default_stream; - if (stream == NULL || stream->socket_descriptor == NULL) - return 1; - libp2p_net_multistream_stream_free(stream); - secure_context->default_stream = NULL; - secure_context->insecure_stream = NULL; - secure_context->secure_stream = NULL; - return 1; + if (stream_context == NULL) { + return 0; + } + struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context; + return libp2p_net_multistream_context_free(multistream_context); } /*** * Check the stream to see if there is something to read - * @param stream_context a SessionContext + * @param stream_context a MultistreamContext * @returns number of bytes to be read, or -1 if there was an error */ int libp2p_net_multistream_peek(void* stream_context) { if (stream_context == NULL) return -1; - struct SessionContext* session_context = (struct SessionContext*)stream_context; - struct Stream* stream = session_context->default_stream; - if (stream == NULL) + struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context; + struct Stream* parent_stream = multistream_context->stream->parent_stream; + if (parent_stream == NULL) return -1; - int socket_fd = *((int*)stream->socket_descriptor); - if (socket_fd < 0) - return -1; - - int bytes = 0; - if (ioctl(socket_fd, FIONREAD, &bytes) < 0) { - // Ooff, we're having problems. Don't use this socket again. - return -1; - } - return bytes; + return parent_stream->peek(parent_stream); } /** @@ -197,43 +199,32 @@ int libp2p_net_multistream_peek(void* stream_context) { * @param msg the data to send * @returns the number of bytes written */ -int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg) { - struct SessionContext* session_context = (struct SessionContext*)stream_context; - struct Stream* stream = session_context->default_stream; +int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) { + struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context; + struct Stream* parent_stream = multistream_context->stream->parent_stream; int num_bytes = 0; - if (msg->data_size > 0) { // only do this is if there is something to send - // first send the size + if (incoming->data_size > 0) { // only do this is if there is something to send + // first get the size as a varint unsigned char varint[12]; size_t varint_size = 0; - varint_encode(msg->data_size, &varint[0], 12, &varint_size); + varint_encode(incoming->data_size, &varint[0], 12, &varint_size); // now put the size with the data - unsigned char* buffer = (unsigned char*)malloc(msg->data_size + varint_size); - if (buffer == NULL) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*) malloc(varint_size + incoming->data_size); + if (outgoing.data == NULL) { return 0; - memset(buffer, 0, msg->data_size + varint_size); - memcpy(buffer, varint, varint_size); - memcpy(&buffer[varint_size], msg->data, msg->data_size); - // determine if this should run through the secio protocol or not - if (session_context->secure_stream == NULL) { - int sd = *((int*)stream->socket_descriptor); - // do a "raw" write - num_bytes = socket_write(sd, (char*)varint, varint_size, 0); - if (num_bytes == 0) { - free(buffer); - return 0; - } - // then send the actual data - num_bytes += socket_write(sd, (char*)msg->data, msg->data_size, 0); - session_context->last_comm_epoch = os_utils_gmtime(); - } else { - // write using secio - struct StreamMessage outgoing; - outgoing.data = buffer; - outgoing.data_size = msg->data_size + varint_size; - num_bytes = stream->write(stream_context, &outgoing); } - free(buffer); + memset(outgoing.data, 0, incoming->data_size + varint_size); + memcpy(outgoing.data, varint, varint_size); + memcpy(&outgoing.data[varint_size], incoming->data, incoming->data_size); + // now ship it + num_bytes = parent_stream->write(parent_stream, &outgoing); + if (num_bytes > 0) { + // update the last time we communicated + multistream_context->session_context->last_comm_epoch = os_utils_gmtime(); + } + free(outgoing.data); } return num_bytes; @@ -245,114 +236,47 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg * @param results where to put the results. NOTE: this memory is allocated * @param results_size the size of the results in bytes * @param timeout_secs the seconds before a timeout - * @returns number of bytes received + * @returns true(1) on success, false(0) otherwise */ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** results, int timeout_secs) { - struct SessionContext* session_context = (struct SessionContext*)stream_context; - struct Stream* stream = session_context->default_stream; - int bytes = 0; + struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context; + struct Stream* parent_stream = multistream_context->stream->parent_stream; - // TODO: this is arbitrary, and should be dynamic - size_t buffer_size = 362144; - char buffer[buffer_size]; - char* pos = buffer; - size_t num_bytes_requested = 0, left = 0, already_read = 0; - - if (session_context->secure_stream == NULL) { - int socketDescriptor = *( (int*) stream->socket_descriptor); - // first read the varint - while(1) { - unsigned char c = '\0'; - bytes = socket_read(socketDescriptor, (char*)&c, 1, 0, timeout_secs); - if (bytes <= 0) { - // possible error - if (bytes < 0) - libp2p_logger_error("multistream", "socket_read returned %d reading socket %d\n", bytes, socketDescriptor); - return 0; - } - pos[0] = c; - if (c >> 7 == 0) { - pos[1] = 0; - num_bytes_requested = varint_decode((unsigned char*)buffer, strlen(buffer), NULL); - break; - } - pos++; - } - if (num_bytes_requested <= 0) { - libp2p_logger_debug("multistream", "Reading the varint returned %d on socket %d\n", num_bytes_requested, socketDescriptor); + // find out the length + uint8_t varint[12]; + size_t num_bytes_requested = 0; + size_t varint_length = 0; + for(int i = 0; i < 12; i++) { + if (!parent_stream->read_raw(parent_stream->stream_context, &varint[i], 1, timeout_secs)) { return 0; } - - left = num_bytes_requested; - do { - bytes = socket_read(socketDescriptor, &buffer[already_read], left, 0, timeout_secs); - if (bytes < 0) { - bytes = 0; - if ( errno == EAGAIN ) { - // do something intelligent - } else { - libp2p_logger_error("multistream", "socket read returned error %d on socket descriptor %d.\n", errno, socketDescriptor); - return 0; - } - } - left = left - bytes; - already_read += bytes; - } while (left > 0); - - if (already_read != num_bytes_requested) - return 0; - - // parse the results, removing the leading size indicator - *results = libp2p_stream_message_new(); - struct StreamMessage* rslts = *results; - if (rslts == NULL) - return 0; - rslts->data_size = num_bytes_requested; - rslts->data = (uint8_t*) malloc(num_bytes_requested); - if (rslts->data == NULL) { - libp2p_stream_message_free(rslts); - return 0; + if (varint[i] >> 7 == 0) { + num_bytes_requested = varint_decode(&varint[0], i+1, &varint_length); + break; } - memcpy(rslts->data, buffer, num_bytes_requested); - session_context->last_comm_epoch = os_utils_gmtime(); - } else { // use secio instead of raw read/writes - struct StreamMessage* read_from_stream; - if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) { - return 0; - } - // pull out num_bytes_requested - num_bytes_requested = varint_decode(read_from_stream->data, read_from_stream->data_size, &left); - memcpy(buffer, read_from_stream->data, read_from_stream->data_size); - buffer_size = read_from_stream->data_size; - libp2p_stream_message_free(read_from_stream); - read_from_stream = NULL; - while (num_bytes_requested > buffer_size - left) { - // need to read more into buffer - if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) { - return 0; - } - memcpy(&buffer[buffer_size], read_from_stream->data, read_from_stream->data_size); - buffer_size += read_from_stream->data_size; - libp2p_stream_message_free(read_from_stream); - } - *results = libp2p_stream_message_new(); - struct StreamMessage* rslts = *results; - if (rslts == NULL) { - libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested); - return 0; - } - rslts->data_size = num_bytes_requested; - rslts->data = (uint8_t*) malloc(num_bytes_requested); - if (rslts->data == NULL) { - libp2p_stream_message_free(rslts); - libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested); - return 0; - } - memcpy(rslts->data, &buffer[left], num_bytes_requested); - session_context->last_comm_epoch = os_utils_gmtime(); } - return num_bytes_requested; + if (num_bytes_requested <= 0) + return 0; + + // now get the data + *results = libp2p_stream_message_new(); + struct StreamMessage* rslts = *results; + rslts->data_size = num_bytes_requested; + rslts->data = (uint8_t*) malloc(num_bytes_requested); + if (rslts->data == NULL) { + libp2p_stream_message_free(rslts); + rslts = NULL; + } + // now get the data from the parent stream + if (!parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs)) { + // problem reading from the parent stream + libp2p_stream_message_free(*results); + *results = NULL; + return 0; + } + + return 1; } @@ -397,7 +321,7 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, // try to receive the protocol id return_result = libp2p_net_multistream_read(&session, &results, timeout_secs); - if (results == NULL || return_result == 0 || results->data_size < 1 || !libp2p_net_multistream_can_handle(results->data, results->data_size)) { + if (results == NULL || return_result == 0 || results->data_size < 1 || !libp2p_net_multistream_can_handle(results)) { libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results); goto exit; } @@ -454,44 +378,10 @@ int libp2p_net_multistream_negotiate(struct SessionContext* session) { return retVal; } - -/** - * Expect to read a message - * @param fd the socket file descriptor - * @returns the retrieved message, or NULL - */ -/* -struct Libp2pMessage* libp2p_net_multistream_get_message(struct Stream* stream) { - int retVal = 0; - unsigned char* results = NULL; - size_t results_size = 0; - struct Libp2pMessage* msg = NULL; - // read what they sent - libp2p_net_multistream_read(stream, &results, &results_size); - // unprotobuf it - if (!libp2p_message_protobuf_decode(results, results_size, &msg)) - goto exit; - // clean up - retVal = 1; - exit: - if (results != NULL) - free(results); - if (retVal != 1 && msg != NULL) - libp2p_message_free(msg); - - return msg; -} -*/ - void libp2p_net_multistream_stream_free(struct Stream* stream) { if (stream != NULL) { - if (stream->socket_descriptor != NULL) { - close( *((int*)stream->socket_descriptor)); - free(stream->socket_descriptor); - } - if (stream->address != NULL) - multiaddress_free(stream->address); - free(stream); + stream->parent_stream->close(stream->parent_stream->stream_context); + // TODO: free memory allocations } } @@ -504,14 +394,7 @@ void libp2p_net_multistream_stream_free(struct Stream* stream) { struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port) { struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); if (out != NULL) { - pthread_mutex_init(&out->socket_mutex, NULL); - out->socket_descriptor = malloc(sizeof(int)); - *((int*)out->socket_descriptor) = socket_fd; - int res = *((int*)out->socket_descriptor); - if (res != socket_fd) { - libp2p_net_multistream_stream_free(out); - return NULL; - } + out->parent_stream = NULL; out->close = libp2p_net_multistream_close; out->read = libp2p_net_multistream_read; out->write = libp2p_net_multistream_write; diff --git a/net/protocol.c b/net/protocol.c index 05cbd6e..9369a4d 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -3,6 +3,10 @@ #include "libp2p/utils/logger.h" #include "libp2p/net/protocol.h" +/* + * Handle the different protocols + */ + /*** * Compare incoming to see if they are requesting a protocol upgrade * @param incoming the incoming string @@ -10,10 +14,10 @@ * @param test the protocol string to compare it with (i.e. "/secio" or "/nodeio" * @returns true(1) if there was a match, false(0) otherwise */ -const struct Libp2pProtocolHandler* protocol_compare(const unsigned char* incoming, size_t incoming_size, struct Libp2pVector* protocol_handlers) { +const struct Libp2pProtocolHandler* protocol_compare(struct StreamMessage* msg, struct Libp2pVector* protocol_handlers) { for(int i = 0; i < protocol_handlers->total; i++) { const struct Libp2pProtocolHandler* handler = (const struct Libp2pProtocolHandler*) libp2p_utils_vector_get(protocol_handlers, i); - if (handler->CanHandle(incoming, incoming_size)) { + if (handler->CanHandle(msg)) { return handler; } } @@ -43,26 +47,24 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() { * @param handlers a Vector of protocol handlers * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success */ -int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) { - const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers); - - char str[incoming_size + 1]; - memcpy(str, incoming, incoming_size); - str[incoming_size] = 0; - for(int i = 0; i < incoming_size; i++) { - if (str[i] == '\n') { - str[i] = 0; - break; - } - } +int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* session, struct Libp2pVector* handlers) { + const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers); if (handler == NULL) { + // turn msg->data to a null terminated string for the error message + char str[msg->data_size + 1]; + memcpy(str, msg->data, msg->data_size); + str[msg->data_size] = 0; + for(int i = 0; i < msg->data_size; i++) { + if (str[i] == '\n') { + str[i] = 0; + break; + } + } libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str); return -1; - } else { - libp2p_logger_debug("protocol", "Found handler for %s.\n", str); } //TODO: strip off the protocol? - return handler->HandleMessage(incoming, incoming_size, session, handler->context); + return handler->HandleMessage(msg, session, handler->context); } diff --git a/peer/peer.c b/peer/peer.c index 0e7a12c..2c8acf8 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -97,7 +97,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) { * @param peerstore if connection is successfull, will add peer to peerstore * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { +int libp2p_peer_connect(const struct Dialer* dialer, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { // fix the connection type if in an invalid state if (peer->connection_type == CONNECTION_TYPE_CONNECTED && peer->sessionContext == NULL) peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; @@ -107,65 +107,12 @@ int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPee struct Libp2pLinkedList* current_address = peer->addr_head; while (current_address != NULL && peer->connection_type != CONNECTION_TYPE_CONNECTED) { struct MultiAddress *ma = (struct MultiAddress*)current_address->item; - if (multiaddress_is_ip(ma)) { - char* ip = NULL; - if (!multiaddress_get_ip_address(ma, &ip)) - continue; - int port = multiaddress_get_ip_port(ma); - // out with the old - if (peer->sessionContext != NULL) { - libp2p_session_context_free(peer->sessionContext); - } - peer->sessionContext = libp2p_session_context_new(); - peer->sessionContext->host = ip; - peer->sessionContext->port = port; - peer->sessionContext->datastore = datastore; - peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout); - if (peer->sessionContext->insecure_stream == NULL) { - libp2p_logger_error("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer)); - free(ip); - return 0; - } - peer->sessionContext->default_stream = peer->sessionContext->insecure_stream; - peer->connection_type = CONNECTION_TYPE_CONNECTED; - // lock the stream - if (!libp2p_stream_lock(peer->sessionContext->default_stream)) { - libp2p_logger_error("peer", "Unable to lock the newly created peer stream for peer %s.\n", libp2p_peer_id_to_string(peer)); - free(ip); - return 0; - } - // switch to secio - if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) { - libp2p_logger_error("peer", "Attempted secio handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer)); - free(ip); - libp2p_stream_unlock(peer->sessionContext->default_stream); - return 0; - } - //switch to multistream - if (!libp2p_net_multistream_negotiate(peer->sessionContext)) { - libp2p_logger_error("peer", "Attempted multistream handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer)); - free(ip); - libp2p_stream_unlock(peer->sessionContext->default_stream); - return 0; - } - // switch to yamux - if (!yamux_send_protocol(peer->sessionContext)) { - libp2p_logger_error("peer", "Attempted yamux handshake, but could not send protocol header for peer %s.\n", libp2p_peer_id_to_string(peer)); - free(ip); - libp2p_stream_unlock(peer->sessionContext->default_stream); - return 0; - } - libp2p_stream_unlock(peer->sessionContext->default_stream); - /* - // expect yamux back - if (!yamux_receive_protocol(peer->sessionContext)) { - libp2p_logger_error("peer", "Attempted yamux handshake, but received unexpected response.\n"); - free(ip); - return 0; - } - */ - free(ip); - } // is IP + // use the dialer to attempt to dial this MultiAddress and join the swarm + struct Stream* yamux_stream = libp2p_conn_dialer_get_stream(dialer, ma, "yamux"); + if (yamux_stream != NULL) { + // we're okay, get out + break; + } now = time(NULL); if (now >= (prev + timeout)) break; diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index e645cbe..e4f795c 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -20,11 +20,11 @@ struct DhtContext { struct ProviderStore* provider_store; }; -int libp2p_routing_dht_can_handle(const uint8_t* incoming, size_t incoming_size) { - if (incoming_size < 8) +int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) { + if (msg->data_size < 8) return 0; - char* result = strnstr((char*)incoming, "/ipfs/kad", incoming_size); - if (result != NULL && result == (char*)incoming) + char* result = strnstr((char*)msg->data, "/ipfs/kad", msg->data_size); + if (result != NULL && result == (char*)msg->data) return 1; return 0; } @@ -34,7 +34,7 @@ int libp2p_routing_dht_shutdown(void* context) { return 1; } -int libp2p_routing_dht_handle_msg(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* context) { +int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct SessionContext* session_context, void* context) { libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id); struct DhtContext* ctx = (struct DhtContext*)context; if (!libp2p_routing_dht_handshake(session_context)) @@ -572,7 +572,7 @@ int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struc * @param msg the message to send * @returns true(1) if we sent to at least 1, false(0) otherwise */ -int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, +int libp2p_routing_dht_send_message_nearest_x(const struct Dialer* dialer, struct Peerstore* peerstore, struct Datastore* datastore, struct KademliaMessage* msg, int numToSend) { // TODO: Calculate "Nearest" // but for now, grab x peers, and send to them @@ -585,7 +585,7 @@ int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* privat struct Libp2pPeer* remote_peer = entry->peer; if (!remote_peer->is_local) { // connect (if not connected) - if (libp2p_peer_connect(private_key, remote_peer, peerstore, datastore, 5)) { + if (libp2p_peer_connect(dialer, remote_peer, peerstore, datastore, 5)) { // send message if (libp2p_routing_dht_send_message(remote_peer->sessionContext, msg)) numSent++; diff --git a/secio/secio.c b/secio/secio.c index 430e9f9..5ab66e8 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -34,37 +34,31 @@ const char* SupportedExchanges = "P-256,P-384,P-521"; const char* SupportedCiphers = "AES-256,AES-128,Blowfish"; const char* SupportedHashes = "SHA256,SHA512"; -struct SecioContext { - struct RsaPrivateKey* private_key; - struct Peerstore* peer_store; -}; - -int libp2p_secio_can_handle(const uint8_t* incoming, size_t incoming_size) { +int libp2p_secio_can_handle(const struct StreamMessage* msg) { const char* protocol = "/secio/1.0.0"; // sanity checks - if (incoming_size < 12) + if (msg->data_size < 12) return 0; - char* result = strnstr((char*)incoming, protocol, incoming_size); - if (result != NULL && result == (char*)incoming) + char* result = strnstr((char*)msg->data, protocol, msg->data_size); + if (result != NULL && result == (char*)msg->data) return 1; return 0; } /*** * Handle a secio message - * @param incoming the incoming bytes - * @param incoming_size the size of the incoming buffer + * @param msg the incoming message * @param session_context who is attempting to connect * @param protocol_context a SecioContext that contains the needed information * @returns <0 on error, 0 if okay (does not allow daemon to continue looping) */ -int libp2p_secio_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { +int libp2p_secio_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { libp2p_logger_debug("secio", "Handling incoming secio message.\n"); struct SecioContext* ctx = (struct SecioContext*)protocol_context; // send them the protocol - if (!libp2p_secio_send_protocol(session_context)) + if (!libp2p_secio_send_protocol(ctx)) return -1; - int retVal = libp2p_secio_handshake(session_context, ctx->private_key, ctx->peer_store); + int retVal = libp2p_secio_handshake(ctx); if (retVal) return 0; return -1; @@ -83,9 +77,9 @@ int libp2p_secio_shutdown(void* context) { * @param peer_store the peer store * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store) { - if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { - return libp2p_secio_handshake(session_context, private_key, peer_store); +int libp2p_secio_initiate_handshake(struct SecioContext* ctx) { + if (libp2p_secio_send_protocol(ctx) && libp2p_secio_receive_protocol(ctx)) { + return libp2p_secio_handshake(ctx); } libp2p_logger_error("secio", "Secio protocol exchange failed.\n"); return 0; @@ -511,159 +505,30 @@ int libp2p_secio_make_mac_and_cipher(struct SessionContext* session, struct Stre return 1; } -/*** - * Write bytes to an unencrypted stream - * @param session the session information - * @param bytes the bytes to write - * @param data_length the number of bytes to write - * @returns the number of bytes written - */ -int libp2p_secio_unencrypted_write(struct SessionContext* session, unsigned char* bytes, size_t data_length) { - int num_bytes = 0; - - if (data_length > 0) { // only do this is if there is something to send - // first send the size - uint32_t size = htonl(data_length); - char* size_as_char = (char*)&size; - int left = 4; - int written = 0; - int written_this_time = 0; - do { - written_this_time = socket_write(*((int*)session->default_stream->socket_descriptor), &size_as_char[written], left, 0); - if (written_this_time < 0) { - written_this_time = 0; - if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // TODO: use epoll or select to wait for socket to be writable - } else { - return 0; - } - } - left = left - written_this_time; - } while (left > 0); - // then send the actual data - left = data_length; - written = 0; - do { - written_this_time = socket_write(*((int*)session->default_stream->socket_descriptor), (char*)&bytes[written], left, 0); - if (written_this_time < 0) { - written_this_time = 0; - if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // TODO: use epoll or select to wait for socket to be writable - } else { - return 0; - } - } - left = left - written_this_time; - written += written_this_time; - } while (left > 0); - num_bytes = written; - } // there was something to send - - return num_bytes; -} - -/*** - * Read bytes from the incoming stream - * @param session the session information - * @param results where to put the bytes read - * @param results_size the size of the results - * @returns the number of bytes read - */ -int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size, int timeout_secs) { - uint32_t buffer_size; - - if (session == NULL || session->insecure_stream == NULL || session->insecure_stream->socket_descriptor == NULL) { - libp2p_logger_error("secio", "Attempted unencrypted read on invalid session.\n"); - return 0; - } - // first read the 4 byte integer - char* size = (char*)&buffer_size; - int left = 4; - int read = 0; - int read_this_time = 0; - do { - read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0, timeout_secs); - if (read_this_time <= 0) { - if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // TODO: use epoll or select to wait for socket to be writable - libp2p_logger_debug("secio", "Attempted read, but got EAGAIN or EWOULDBLOCK. Code %d.\n", errno); - return 0; - } else { - // is this really an error? - if (errno != 0) { - libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno)); - return 0; - } - else - libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: 0 bytes read, but errno shows no error. Trying again.\n"); - } - } else { - left = left - read_this_time; - read += read_this_time; - } - } while (left > 0); - buffer_size = ntohl(buffer_size); - if (buffer_size == 0) { - libp2p_logger_error("secio", "unencrypted read buffer size is 0.\n"); - return 0; - } - - // now read the number of bytes we've found, minus the 4 that we just read - left = buffer_size; - read = 0; - read_this_time = 0; - *results = malloc(left); - if (*results == NULL) { - libp2p_logger_error("secio", "Unable to allocate memory for the incoming message. Size: %ulld", left); - return 0; - } - unsigned char* ptr = *results; - do { - read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0, timeout_secs); - if (read_this_time < 0) { - read_this_time = 0; - if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { - // TODO: use epoll or select to wait for socket to be writable - } else { - libp2p_logger_error("secio", "read from socket returned %d.\n", errno); - return 0; - } - } else if (read_this_time == 0) { - // socket_read returned 0, which it shouldn't - libp2p_logger_error("secio", "socket_read returned 0 trying to read from %s.\n", session->remote_peer_id); - return 0; - } - left = left - read_this_time; - } while (left > 0); - - *results_size = buffer_size; - return buffer_size; -} - /*** * Send the protocol string to the remote stream - * @param session the context + * @param ctx the context * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_send_protocol(struct SessionContext* session) { +int libp2p_secio_send_protocol(struct SecioContext* ctx) { char* protocol = "/secio/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - return session->default_stream->write(session, &outgoing); + return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing); } /*** * Attempt to read the secio protocol as a reply from the remote - * @param session the context + * @param ctx the context * @returns true(1) if we received what we think we should have, false(0) otherwise */ -int libp2p_secio_receive_protocol(struct SessionContext* session) { +int libp2p_secio_receive_protocol(struct SecioContext* ctx) { char* protocol = "/secio/1.0.0\n"; int numSecs = 30; int retVal = 0; struct StreamMessage* buffer = NULL; - session->default_stream->read(session, &buffer, numSecs); + ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &buffer, numSecs); if (buffer == NULL) { libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); } else { @@ -755,19 +620,22 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in * @returns the number of bytes written */ int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) { - struct SessionContext* session = (struct SessionContext*) stream_context; + struct SecioContext* ctx = (struct SecioContext*) stream_context; + struct Stream* parent_stream = ctx->stream->parent_stream; + struct SessionContext* session_context = ctx->session_context; + // writer uses the local cipher and mac - unsigned char* buffer = NULL; - size_t buffer_size = 0; - if (!libp2p_secio_encrypt(session, bytes->data, bytes->data_size, &buffer, &buffer_size)) { + struct StreamMessage outgoing; + if (!libp2p_secio_encrypt(session_context, bytes->data, bytes->data_size, &outgoing.data, &outgoing.data_size)) { libp2p_logger_error("secio", "secio_encrypt returned false.\n"); return 0; } - int retVal = libp2p_secio_unencrypted_write(session, buffer, buffer_size); + + int retVal = parent_stream->write(parent_stream->stream_context, &outgoing); if (!retVal) { libp2p_logger_error("secio", "secio_unencrypted_write returned false\n"); } - free(buffer); + free(outgoing.data); return retVal; } @@ -848,21 +716,20 @@ int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* in */ int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** bytes, int timeout_secs) { int retVal = 0; - struct SessionContext* session = (struct SessionContext*)stream_context; + struct SecioContext* ctx = (struct SecioContext*)stream_context; + struct Stream* parent_stream = ctx->stream->parent_stream; // reader uses the remote cipher and mac // read the data - unsigned char* incoming = NULL; - size_t incoming_size = 0; - if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size, timeout_secs) <= 0) { + struct StreamMessage* msg = NULL; + if (!parent_stream->read(parent_stream->stream_context, &msg, timeout_secs)) { libp2p_logger_error("secio", "Unencrypted_read returned false.\n"); goto exit; } - retVal = libp2p_secio_decrypt(session, incoming, incoming_size, bytes); + retVal = libp2p_secio_decrypt(ctx->session_context, msg->data, msg->data_size, bytes); if (!retVal) libp2p_logger_error("secio", "Decrypting incoming stream returned false.\n"); exit: - if (incoming != NULL) - free(incoming); + libp2p_stream_message_free(msg); return retVal; } @@ -875,10 +742,11 @@ int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** byt * @param peerstore the collection of peers * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SessionContext* local_session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { +int libp2p_secio_handshake(struct SecioContext* secio_context) { int retVal = 0; size_t results_size = 0, bytes_written = 0; - struct StreamMessage* stream_message = NULL; + struct StreamMessage* incoming = NULL; + struct StreamMessage outgoing; // used for outgoing messages unsigned char* propose_in_bytes = NULL; // the remote protobuf size_t propose_in_size = 0; unsigned char* propose_out_bytes = NULL; // the local protobuf @@ -899,6 +767,10 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs struct PublicKey pub_key = {0}; struct Libp2pPeer* remote_peer = NULL; + struct SessionContext* local_session = secio_context->session_context; + struct RsaPrivateKey* private_key = secio_context->private_key; + struct Peerstore* peerstore = secio_context->peer_store; + //TODO: make sure we're not talking to ourself // send the protocol id and the outgoing Propose struct @@ -949,7 +821,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs goto exit; // now send the protocol and Propose struct - bytes_written = libp2p_secio_unencrypted_write(local_session, propose_out_bytes, propose_out_size); + outgoing.data = propose_out_bytes; + outgoing.data_size = propose_out_size; + bytes_written = secio_context->stream->parent_stream->write(secio_context->stream->parent_stream->stream_context, &outgoing); if (bytes_written != propose_out_size) { libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); @@ -958,7 +832,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs } // try to get the Propse struct from the remote peer - bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size, 10); + bytes_written = secio_context->stream->parent_stream->read(secio_context->stream->parent_stream->stream_context, &incoming, 10); if (bytes_written <= 0) { libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); goto exit; @@ -966,10 +840,12 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs //libp2p_logger_debug("secio", "Received their propose struct.\n"); } - if (!libp2p_secio_propose_protobuf_decode(propose_in_bytes, propose_in_size -1, &propose_in)) { + if (!libp2p_secio_propose_protobuf_decode(incoming->data, incoming->data_size -1, &propose_in)) { libp2p_logger_error("secio", "Unable to un-protobuf the remote's Propose struct\n"); goto exit; } + libp2p_stream_message_free(incoming); + incoming = NULL; // get their nonce if (propose_in->rand_size != 16) @@ -1074,8 +950,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs libp2p_secio_exchange_protobuf_encode(exchange_out, exchange_out_protobuf, exchange_out_protobuf_size, &bytes_written); exchange_out_protobuf_size = bytes_written; - //libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Writing exchange_out\n"); - bytes_written = libp2p_secio_unencrypted_write(local_session, exchange_out_protobuf, exchange_out_protobuf_size); + outgoing.data = exchange_out_protobuf; + outgoing.data_size = exchange_out_protobuf_size; + bytes_written = secio_context->stream->parent_stream->write(secio_context->stream->parent_stream, &outgoing); if (exchange_out_protobuf_size != bytes_written) { libp2p_logger_error("secio", "Unable to write exchange_out\n"); goto exit; @@ -1088,7 +965,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs // receive Exchange packet libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n"); - bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size, 10); + bytes_written = secio_context->stream->parent_stream->read(secio_context->stream->parent_stream->stream_context, &incoming, 10); if (bytes_written == 0) { libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_peer_handle_connection_error(remote_peer); @@ -1096,9 +973,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs } else { //libp2p_logger_debug("secio", "Read exchange packet.\n"); } - libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); - free(results); - results = NULL; + libp2p_secio_exchange_protobuf_decode(incoming->data, incoming->data_size, &exchange_in); + libp2p_stream_message_free(incoming); + incoming = NULL; // end of receive Exchange packet // parse and verify @@ -1171,7 +1048,6 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs libp2p_secio_initialize_crypto(local_session); // send their nonce to verify encryption works - struct StreamMessage outgoing; outgoing.data = (uint8_t*)local_session->remote_nonce; outgoing.data_size = 16; if (libp2p_secio_encrypted_write(local_session, &outgoing) <= 0) { @@ -1182,20 +1058,23 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs // receive our nonce to verify encryption works libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce\n"); results = NULL; - int bytes_read = libp2p_secio_encrypted_read(local_session, &stream_message, 10); - if (bytes_read <= 0 || stream_message == NULL) { + int bytes_read = libp2p_secio_encrypted_read(secio_context->stream->stream_context, &incoming, 10); + if (bytes_read <= 0 || incoming == NULL) { libp2p_logger_error("secio", "Encrypted read returned %d\n", bytes_read); goto exit; } - if (stream_message->data_size != 16) { - libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", stream_message->data_size); + if (incoming->data_size != 16) { + libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", incoming->data_size); goto exit; } - if (libp2p_secio_bytes_compare(stream_message->data, (unsigned char*)local_session->local_nonce, 16) != 0) { + if (libp2p_secio_bytes_compare(incoming->data, (unsigned char*)local_session->local_nonce, 16) != 0) { libp2p_logger_error("secio", "Bytes of nonce did not match\n"); goto exit; } + libp2p_stream_message_free(incoming); + incoming = NULL; + // set up the secure stream in the struct local_session->secure_stream = local_session->insecure_stream; local_session->secure_stream->read = libp2p_secio_encrypted_read; @@ -1231,7 +1110,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs libp2p_secio_propose_free(propose_out); libp2p_secio_propose_free(propose_in); - libp2p_stream_message_free(stream_message); + libp2p_stream_message_free(incoming); if (retVal != 1) { libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n"); diff --git a/test/test_conn.h b/test/test_conn.h index d6079ab..747a432 100644 --- a/test/test_conn.h +++ b/test/test_conn.h @@ -90,10 +90,6 @@ int test_dialer_dial_multistream() { stream = libp2p_conn_dialer_get_stream(dialer, destination_address, "multistream"); if (stream == NULL) goto exit; - int socket_descriptor = *((int*)stream->socket_descriptor); - if ( socket_descriptor < 0 || socket_descriptor > 255) { - goto exit; - } // now ping diff --git a/test/test_multistream.h b/test/test_multistream.h index 062f638..f8cd2ba 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -36,7 +36,7 @@ int test_multistream_get_list() { struct SessionContext session; session.insecure_stream = libp2p_net_multistream_connect("10.211.55.2", 4001); - if (*((int*)session.insecure_stream->socket_descriptor) < 0) + if (session.insecure_stream == NULL) goto exit; // try to respond something, ls command diff --git a/test/test_secio.h b/test/test_secio.h index caf631f..5588ef1 100644 --- a/test/test_secio.h +++ b/test/test_secio.h @@ -33,6 +33,7 @@ void print_stretched_key(struct StretchedKey* key) { int test_secio_handshake() { + /* libp2p_logger_add_class("secio"); int retVal = 0; @@ -119,6 +120,7 @@ int test_secio_handshake() { goto exit; } */ + /* // a new way to do the above if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { libp2p_logger_error("test_secio", "Unable to do handshake\n"); @@ -145,7 +147,7 @@ int test_secio_handshake() { print_stretched_key(secure_session.remote_stretched_key); fprintf(stdout, "\n"); */ - + /* // now attempt to do something with it... try to negotiate a multistream if (libp2p_net_multistream_negotiate(secure_session) == 0) { fprintf(stdout, "Unable to negotiate multistream\n"); @@ -215,6 +217,7 @@ int test_secio_handshake() { if (secure_session.shared_key != NULL) free(secure_session.shared_key); */ + /* if (private_key != NULL) libp2p_crypto_private_key_free(private_key); if (decode_base64 != NULL) @@ -222,6 +225,8 @@ int test_secio_handshake() { if (rsa_private_key != NULL) libp2p_crypto_rsa_rsa_private_key_free(rsa_private_key); return retVal; + */ + return 0; } int libp2p_secio_encrypt(const struct SessionContext* session, const unsigned char* incoming, size_t incoming_size, unsigned char** outgoing, size_t* outgoing_size); @@ -380,6 +385,7 @@ int test_secio_encrypt_like_go() { */ int test_secio_handshake_go() { + /* libp2p_logger_add_class("secio"); int retVal = 0; @@ -460,6 +466,7 @@ int test_secio_handshake_go() { goto exit; } */ + /* // a new way to do the above if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { libp2p_logger_error("test_secio", "Unable to do handshake.\n"); @@ -486,7 +493,7 @@ int test_secio_handshake_go() { print_stretched_key(secure_session.remote_stretched_key); fprintf(stdout, "\n"); */ - + /* // now attempt to do something with it... try to negotiate a multistream if (libp2p_net_multistream_negotiate(secure_session) == 0) { fprintf(stdout, "Unable to negotiate multistream\n"); @@ -526,4 +533,6 @@ int test_secio_handshake_go() { if (peerstore != NULL) libp2p_peerstore_free(peerstore); return retVal; + */ + return 0; } diff --git a/yamux/yamux.c b/yamux/yamux.c index 1ee34f0..557f743 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -13,16 +13,16 @@ * @param incoming_size the size of the incoming data buffer * @returns true(1) if it can handle this message, false(0) if not */ -int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) { +int yamux_can_handle(const struct StreamMessage* msg) { char *protocol = "/yamux/1.0.0\n"; int protocol_size = strlen(protocol); // is there a varint in front? size_t num_bytes = 0; - if (incoming[0] != protocol[0] && incoming[1] != protocol[1]) { - varint_decode(incoming, incoming_size, &num_bytes); + if (msg->data[0] != protocol[0] && msg->data[1] != protocol[1]) { + varint_decode(msg->data, msg->data_size, &num_bytes); } - if (incoming_size >= protocol_size - num_bytes) { - if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0) + if (msg->data_size >= protocol_size - num_bytes) { + if (strncmp(protocol, (char*) &msg->data[num_bytes], protocol_size) == 0) return 1; } return 0; @@ -31,12 +31,12 @@ int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) { /** * the yamux stream received some bytes. Process them * @param stream the stream that the data came in on - * @param incoming_size the size of the stream buffer + * @param msg the message * @param incoming the stream buffer */ -void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8_t* incoming) { +void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { struct Libp2pVector* handlers = stream->userdata; - int retVal = libp2p_protocol_marshal(incoming, incoming_size, stream->session->session_context, handlers); + int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers); if (retVal == -1) { // TODO handle error condition libp2p_logger_error("yamux", "Marshalling returned error.\n"); @@ -94,21 +94,21 @@ int yamux_receive_protocol(struct SessionContext* context) { /*** * Handles the message - * @param incoming the incoming data buffer + * @param msg the incoming message * @param incoming_size the size of the incoming data buffer * @param session_context the information about the incoming connection * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int yamux_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { +int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context); - uint8_t* buf = (uint8_t*) malloc(incoming_size); + uint8_t* buf = (uint8_t*) malloc(msg->data_size); if (buf == NULL) return -1; - memcpy(buf, incoming, incoming_size); + memcpy(buf, msg->data, msg->data_size); for(;;) { - int retVal = yamux_decode(yamux, incoming, incoming_size); + int retVal = yamux_decode(yamux, msg->data, msg->data_size); free(buf); buf = NULL; if (!retVal)