From 9fd44b787871e71081aa2c51a646ce0dc3cc1058 Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 4 Sep 2017 11:01:17 -0500 Subject: [PATCH] implementing handler interface in front of multistream --- include/libp2p/net/multistream.h | 14 +++++- include/libp2p/peer/peer.h | 2 +- net/multistream.c | 74 ++++++++++++++++++++++++++------ net/protocol.c | 3 ++ peer/peer.c | 3 +- secio/secio.c | 18 +++++++- 6 files changed, 97 insertions(+), 17 deletions(-) diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index cbf9464..53b54c7 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -15,12 +15,24 @@ */ +struct MultistreamContext { + struct Libp2pVector* handlers; +}; + /*** * The handler to handle calls to the protocol * @param stream_context the context * @returns the protocol handler */ -struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* stream_context); +struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector); + +/** + * Sends the protocol header to the remote + * @param context the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_net_multistream_send_protocol(struct SessionContext *context); + /** * Read from a multistream socket diff --git a/include/libp2p/peer/peer.h b/include/libp2p/peer/peer.h index df0415c..bc92c60 100644 --- a/include/libp2p/peer/peer.h +++ b/include/libp2p/peer/peer.h @@ -56,7 +56,7 @@ void libp2p_peer_free(struct Libp2pPeer* in); * @param timeout number of seconds before giving up * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout); +int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); /*** * Clean up a bad connection diff --git a/net/multistream.c b/net/multistream.c index 097bd51..79d6c5a 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -28,7 +28,9 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco int protocol_size = strlen(protocol); // is there a varint in front? size_t num_bytes = 0; - varint_decode(incoming, incoming_size, &num_bytes); + if (incoming[0] != '/' && incoming[1] != 'm') { + varint_decode(incoming, incoming_size, &num_bytes); + } if (incoming_size >= protocol_size - num_bytes) { if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0) return 1; @@ -36,8 +38,49 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco return 0; } +int libp2p_net_multistream_send_protocol(struct SessionContext *context) { + char *protocol = "/multistream/1.0.0\n"; + return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)); +} + int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) { - return 0; + // try sending the protocol back + //if (!libp2p_net_multistream_send_protocol(context)) + // return -1; + + struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context; + + // try to read from the network + uint8_t *results = 0; + size_t bytes_read = 0; + int retVal = 0; + int max_retries = 10; + int numRetries = 0; + // handle the call + for(;;) { + // try to read for 5 seconds + if (context->default_stream->read(context, &results, &bytes_read, 5)) { + // we read something from the network. Process it. + // NOTE: If it is a multistream protocol that we are receiving, ignore it. + if (libp2p_net_multistream_can_handle(results, bytes_read)) + continue; + numRetries = 0; + retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers); + if (results != NULL) + free(results); + // exit the loop on error (or if they ask us to no longer loop by returning 0) + if (retVal <= 0) + break; + } else { + // we were unable to read from the network. + // if it timed out, we should try again (if we're not out of retries) + if (numRetries >= max_retries) + break; + numRetries++; + } + } + + return retVal; } int libp2p_net_multistream_shutdown(void* protocol_context) { @@ -49,10 +92,18 @@ int libp2p_net_multistream_shutdown(void* protocol_context) { * @param stream_context the context * @returns the protocol handler */ -struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* stream_context) { +struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) { + + // build the context + struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); + if (context == NULL) + return NULL; + context->handlers = (struct Libp2pVector*) handler_vector; + + // build the handler struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = stream_context; + handler->context = context; handler->CanHandle = libp2p_net_multistream_can_handle; handler->HandleMessage = libp2p_net_multistream_handle_message; handler->Shutdown = libp2p_net_multistream_shutdown; @@ -275,7 +326,6 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, int retVal = -1, return_result = -1, socket = -1; unsigned char* results = NULL; size_t results_size; - size_t num_bytes = 0; struct Stream* stream = NULL; uint32_t ip = hostname_to_ip(hostname); @@ -286,8 +336,6 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, goto exit; // send the multistream handshake - char* protocol_buffer = "/multistream/1.0.0\n"; - stream = libp2p_net_multistream_stream_new(socket, hostname, port); if (stream == NULL) goto exit; @@ -299,15 +347,15 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, // try to receive the protocol id return_result = libp2p_net_multistream_read(&session, &results, &results_size, timeout_secs); - if (return_result == 0 || results_size < 1) + if (return_result == 0 || results_size < 1 || !libp2p_net_multistream_can_handle(results, results_size)) { + libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results); goto exit; + } - num_bytes = libp2p_net_multistream_write(&session, (unsigned char*)protocol_buffer, strlen(protocol_buffer)); - if (num_bytes <= 0) - goto exit; - - if (strstr((char*)results, "multistream") == NULL) + if (!libp2p_net_multistream_send_protocol(&session)) { + libp2p_logger_error("multistream", "Attempted to send the multistream protocol header, but could not.\n"); goto exit; + } // we are now in the loop, so we can switch to another protocol (i.e. /secio/1.0.0) diff --git a/net/protocol.c b/net/protocol.c index 2f5ec6c..9f71c4f 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -45,6 +45,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() { */ int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) { const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers); + char str[incoming_size + 1]; memcpy(str, incoming, incoming_size); str[incoming_size] = 0; @@ -54,12 +55,14 @@ int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, break; } } + if (handler == NULL) { libp2p_logger_error("protocol", "Unable to find handler for %s.\n", str); return -1; } else { libp2p_logger_debug("protocol", "Found handler for %s.\n", str); } + //TODO: strip off the protocol? return handler->HandleMessage(incoming, incoming_size, session, handler->context); } diff --git a/peer/peer.c b/peer/peer.c index 49bcd31..5625ad2 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -96,7 +96,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) { * @param peerstore if connection is successfull, will add peer to peerstore * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout) { +int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer)); time_t now, prev = time(NULL); // find an appropriate address @@ -109,6 +109,7 @@ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* pee continue; int port = multiaddress_get_ip_port(ma); peer->sessionContext = libp2p_session_context_new(); + peer->sessionContext->datastore = datastore; peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout); if (peer->sessionContext->insecure_stream == NULL) { libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer)); diff --git a/secio/secio.c b/secio/secio.c index 67bd05a..756081f 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -85,6 +85,7 @@ int libp2p_secio_initiate_handshake(struct SessionContext* session_context, stru if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { return libp2p_secio_handshake(session_context, private_key, peer_store); } + libp2p_logger_error("secio", "Secio protocol exchange failed.\n"); return 0; } @@ -651,9 +652,16 @@ int libp2p_secio_receive_protocol(struct SessionContext* session) { size_t buffer_size = 0; int retVal = session->default_stream->read(session, &buffer, &buffer_size, numSecs); if (retVal == 0 || buffer != NULL) { - if (strncmp(protocol, (char*)buffer, strlen(protocol)) == 0) + if (strncmp(protocol, (char*)buffer, strlen(protocol)) == 0) { + free(buffer); return 1; + } + else { + libp2p_logger_error("secio", "Expected the secio protocol header, but received %s.\n", buffer); + } } + if (buffer != NULL) + free(buffer); return 0; } @@ -922,6 +930,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva if (bytes_written != propose_out_size) { libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); + } else { + libp2p_logger_debug("secio", "Sent propose out.\n"); } // try to get the Propse struct from the remote peer @@ -929,6 +939,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva if (bytes_written <= 0) { libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); goto exit; + } else { + libp2p_logger_debug("secio", "Received their propose struct.\n"); } if (!libp2p_secio_propose_protobuf_decode(propose_in_bytes, propose_in_size -1, &propose_in)) { @@ -1031,6 +1043,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva if (exchange_out_protobuf_size != bytes_written) { libp2p_logger_error("secio", "Unable to write exchange_out\n"); goto exit; + } else { + libp2p_logger_debug("secio", "Sent exchange_out.\n"); } free(exchange_out_protobuf); exchange_out_protobuf = NULL; @@ -1043,6 +1057,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_peer_handle_connection_error(remote_peer); goto exit; + } else { + libp2p_logger_debug("secio", "Read exchange packet.\n"); } libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); free(results);