From 6147769f4bdc75cc75966e14d401415a062ab0fa Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 23 Oct 2017 09:01:03 -0500 Subject: [PATCH] Reading from stream now uses StreamMessage struct --- .cproject | 1 + conn/session.c | 29 ++++++++++++ include/libp2p/net/multistream.h | 6 +-- include/libp2p/net/stream.h | 28 +++++++++-- net/multistream.c | 81 ++++++++++++++++++-------------- routing/dht_protocol.c | 35 +++++++------- secio/secio.c | 80 ++++++++++++++++--------------- test/test_multistream.h | 14 +++--- test/test_secio.h | 24 +++++----- yamux/yamux.c | 20 ++++---- 10 files changed, 191 insertions(+), 127 deletions(-) diff --git a/.cproject b/.cproject index e3a1378..6807c38 100644 --- a/.cproject +++ b/.cproject @@ -27,6 +27,7 @@ diff --git a/conn/session.c b/conn/session.c index 95815f8..49ed2c2 100644 --- a/conn/session.c +++ b/conn/session.c @@ -129,6 +129,35 @@ int libp2p_stream_unlock(struct Stream* stream) { return 0; } +/*** + * Create a new StreamMessage struct + * @returns a StreamMessage struct + */ +struct StreamMessage* libp2p_stream_message_new() { + struct StreamMessage* out = (struct StreamMessage*) malloc(sizeof(struct StreamMessage)); + if (out != NULL) { + out->data = NULL; + out->data_size = 0; + out->error_number = 0; + } + return out; +} + +/** + * free resources of a StreamMessage struct + * @param msg the StreamMessage to free + */ +void libp2p_stream_message_free(struct StreamMessage* msg) { + if (msg != NULL) { + if (msg->data != NULL) { + free(msg->data); + msg->data = NULL; + } + free(msg); + msg = NULL; + } +} + /**** * Make a copy of a SessionContext * @param original the original diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 4089272..6cec932 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -44,12 +44,12 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context); /** * Read from a multistream socket * @param socket_fd the socket file descriptor - * @param data the data to send - * @param data_length the length of the data + * @param data where to put the results * @param timeout_secs number of seconds before read gives up. Will return 0 data length. * @returns the number of bytes written */ -int libp2p_net_multistream_read(void* stream_context, unsigned char** data, size_t* data_length, int timeout_secs); +int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** data, int timeout_secs); + /** * Write to an open multistream host * @param socket_fd the socket file descriptor diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index d94b56a..dc201f9 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -2,6 +2,29 @@ #include +/** + * Encapsulates a message that (was/will be) sent + * across a stream + */ +struct StreamMessage { + uint8_t* data; + size_t data_size; + int error_number; +}; + +/*** + * Create a new StreamMessage struct + * @returns a StreamMessage struct + */ +struct StreamMessage* libp2p_stream_message_new(); + +/** + * free resources of a StreamMessage struct + * @param msg the StreamMessage to free + */ +void libp2p_stream_message_free(struct StreamMessage* msg); + + /** * An interface in front of various streams */ @@ -16,12 +39,11 @@ struct Stream { /** * Reads from the stream * @param stream the stream context (usually a SessionContext pointer) - * @param buffer where to put the results - * @param bytes_read how many bytes were read + * @param message where to put the incoming message (will be allocated) * @param timeout_secs number of seconds before a timeout * @returns true(1) on success, false(0) otherwise */ - int (*read)(void* stream_context, unsigned char** buffer, size_t* bytes_read, int timeout_secs); + int (*read)(void* stream_context, struct StreamMessage** message, int timeout_secs); /** * Writes to a stream diff --git a/net/multistream.c b/net/multistream.c index a02cb42..8ce1121 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -60,9 +60,8 @@ int libp2p_net_multistream_send_protocol(struct SessionContext *context) { */ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) { char* protocol = "/multistream/1.0.0\n"; - uint8_t* results = NULL; - size_t results_size = 0; - if (!context->default_stream->read(context, &results, &results_size, 30)) { + struct StreamMessage* results = NULL; + if (!context->default_stream->read(context, &results, 30)) { libp2p_logger_error("multistream", "receive_protocol: Unable to read results.\n"); return 0; } @@ -82,21 +81,20 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context; // try to read from the network - uint8_t *results = 0; - size_t bytes_read = 0; + struct StreamMessage* results = NULL; int retVal = 0; int max_retries = 10; int numRetries = 0; // handle the call for(;;) { // try to read for 5 seconds - if (context->default_stream->read(context, &results, &bytes_read, 5)) { + if (context->default_stream->read(context, &results, 5)) { // we read something from the network. Process it. // NOTE: If it is a multistream protocol that we are receiving, ignore it. - if (libp2p_net_multistream_can_handle(results, bytes_read)) + if (libp2p_net_multistream_can_handle(results->data, results->data_size)) continue; numRetries = 0; - retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers); + retVal = libp2p_protocol_marshal(results->data, results->data_size, context, multistream_context->handlers); if (results != NULL) free(results); // exit the loop on error (or if they ask us to no longer loop by returning 0) @@ -244,7 +242,7 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data * @param timeout_secs the seconds before a timeout * @returns number of bytes received */ -int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size, int timeout_secs) { +int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** results, int timeout_secs) { struct SessionContext* session_context = (struct SessionContext*)stream_context; struct Stream* stream = session_context->default_stream; int bytes = 0; @@ -300,39 +298,52 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s return 0; // parse the results, removing the leading size indicator - *results = malloc(num_bytes_requested); - if (*results == NULL) + *results = libp2p_stream_message_new(); + struct StreamMessage* rslts = *results; + if (rslts == NULL) return 0; - memcpy(*results, buffer, num_bytes_requested); - *results_size = num_bytes_requested; + rslts->data_size = num_bytes_requested; + rslts->data = (uint8_t*) malloc(num_bytes_requested); + if (rslts->data == NULL) { + libp2p_stream_message_free(rslts); + return 0; + } + memcpy(rslts->data, buffer, num_bytes_requested); session_context->last_comm_epoch = os_utils_gmtime(); } else { // use secio instead of raw read/writes - unsigned char* read_from_stream; - size_t size_read_from_stream; - if (session_context->default_stream->read(session_context, &read_from_stream, &size_read_from_stream, timeout_secs) == 0) { + struct StreamMessage* read_from_stream; + if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) { return 0; } // pull out num_bytes_requested - num_bytes_requested = varint_decode(read_from_stream, size_read_from_stream, &left); - memcpy(buffer, read_from_stream, size_read_from_stream); - free(read_from_stream); - buffer_size = size_read_from_stream; + num_bytes_requested = varint_decode(read_from_stream->data, read_from_stream->data_size, &left); + memcpy(buffer, read_from_stream->data, read_from_stream->data_size); + buffer_size = read_from_stream->data_size; + libp2p_stream_message_free(read_from_stream); + read_from_stream = NULL; while (num_bytes_requested > buffer_size - left) { // need to read more into buffer - if (session_context->default_stream->read(session_context, &read_from_stream, &size_read_from_stream, timeout_secs) == 0) { + if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) { return 0; } - memcpy(&buffer[buffer_size], read_from_stream, size_read_from_stream); - free(read_from_stream); - buffer_size += size_read_from_stream; + memcpy(&buffer[buffer_size], read_from_stream->data, read_from_stream->data_size); + buffer_size += read_from_stream->data_size; + libp2p_stream_message_free(read_from_stream); } - *results = malloc(num_bytes_requested); - *results_size = num_bytes_requested; - if (*results == NULL) { + *results = libp2p_stream_message_new(); + struct StreamMessage* rslts = *results; + if (rslts == NULL) { libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested); return 0; } - memcpy(*results, &buffer[left], num_bytes_requested); + rslts->data_size = num_bytes_requested; + rslts->data = (uint8_t*) malloc(num_bytes_requested); + if (rslts->data == NULL) { + libp2p_stream_message_free(rslts); + libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested); + return 0; + } + memcpy(rslts->data, &buffer[left], num_bytes_requested); session_context->last_comm_epoch = os_utils_gmtime(); } @@ -359,8 +370,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) { */ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, int port, int timeout_secs) { int retVal = -1, return_result = -1, socket = -1; - unsigned char* results = NULL; - size_t results_size; + struct StreamMessage* results = NULL; struct Stream* stream = NULL; uint32_t ip = hostname_to_ip(hostname); @@ -381,8 +391,8 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, session.default_stream = stream; // try to receive the protocol id - return_result = libp2p_net_multistream_read(&session, &results, &results_size, timeout_secs); - if (return_result == 0 || results_size < 1 || !libp2p_net_multistream_can_handle(results, results_size)) { + return_result = libp2p_net_multistream_read(&session, &results, timeout_secs); + if (results == NULL || return_result == 0 || results->data_size < 1 || !libp2p_net_multistream_can_handle(results->data, results->data_size)) { libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results); goto exit; } @@ -418,15 +428,14 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, */ int libp2p_net_multistream_negotiate(struct SessionContext* session) { const char* protocolID = "/multistream/1.0.0\n"; - unsigned char* results = NULL; - size_t results_length = 0; + struct StreamMessage* results = NULL; int retVal = 0; // send the protocol id if (!libp2p_net_multistream_write(session, (unsigned char*)protocolID, strlen(protocolID))) goto exit; // expect the same back - libp2p_net_multistream_read(session, &results, &results_length, multistream_default_timeout); - if (results_length == 0) + libp2p_net_multistream_read(session, &results, multistream_default_timeout); + if (results == NULL || results->data_size == 0) goto exit; if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0) goto exit; diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index d9811f7..b1e5310 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -82,30 +82,27 @@ int libp2p_routing_dht_protobuf_message(struct KademliaMessage* message, unsigne int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { int retVal = 0; char* protocol = "/ipfs/kad/1.0.0\n"; - unsigned char* results = NULL; - size_t results_size = 0; + struct StreamMessage* results = NULL; if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { libp2p_logger_error("dht_protocol", "Unable to write to stream during upgrade attempt.\n"); goto exit; } - if (!context->default_stream->read(context, &results, &results_size, 5)) { + if (!context->default_stream->read(context, &results, 5)) { libp2p_logger_error("dht_protocol", "Unable to read from stream during upgrade attempt.\n"); goto exit; } - if (results_size != strlen(protocol)) { + if (results == NULL || results->data_size != strlen(protocol)) { libp2p_logger_error("dht_protocol", "Expected response size incorrect during upgrade attempt.\n"); goto exit; } - if (strncmp((char*)results, protocol, results_size) != 0) { + if (strncmp((char*)results->data, protocol, results->data_size) != 0) { libp2p_logger_error("dht_protocol", "Expected %s but received %s.\n", protocol, results); goto exit; } retVal = 1; exit: - if (results != NULL) { - free(results); - results = NULL; - } + libp2p_stream_message_free(results); + results = NULL; return retVal; } @@ -443,16 +440,17 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct K * @returns true(1) on success, otherwise false(0) */ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) { - unsigned char* buffer = NULL, *result_buffer = NULL; - size_t buffer_size = 0, result_buffer_size = 0; + unsigned char *result_buffer = NULL; + struct StreamMessage* buffer = NULL; + size_t result_buffer_size = 0; int retVal = 0; struct KademliaMessage* message = NULL; // read from stream - if (!session->default_stream->read(session, &buffer, &buffer_size, 5)) + if (!session->default_stream->read(session, &buffer, 5)) goto exit; // unprotobuf - if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message)) + if (!libp2p_message_protobuf_decode(buffer->data, buffer->data_size, &message)) goto exit; // handle message @@ -486,8 +484,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee } retVal = 1; exit: - if (buffer != NULL) - free(buffer); + libp2p_stream_message_free(buffer); if (result_buffer != NULL) free(result_buffer); if (message != NULL) @@ -503,20 +500,20 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee * @returns true(1) on success, false(0) otherwise */ int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, struct KademliaMessage** result) { - uint8_t* results = NULL; - size_t results_size = 0; + struct StreamMessage* results = NULL; - if (!sessionContext->default_stream->read(sessionContext, &results, &results_size, 5)) { + if (!sessionContext->default_stream->read(sessionContext, &results, 5)) { libp2p_logger_error("online", "Attempted to read from Kademlia stream, but could not.\n"); goto exit; } // see if we can unprotobuf - if (!libp2p_message_protobuf_decode(results, results_size, result)) { + if (!libp2p_message_protobuf_decode(results->data, results->data_size, result)) { libp2p_logger_error("online", "Received kademlia response, but cannot decode it.\n"); goto exit; } exit: + libp2p_stream_message_free(results); return result != NULL; } diff --git a/secio/secio.c b/secio/secio.c index db5778b..d79ada7 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -659,25 +659,22 @@ int libp2p_secio_send_protocol(struct SessionContext* session) { int libp2p_secio_receive_protocol(struct SessionContext* session) { char* protocol = "/secio/1.0.0\n"; int numSecs = 30; - unsigned char* buffer = NULL; - size_t buffer_size = 0; - int retVal = session->default_stream->read(session, &buffer, &buffer_size, numSecs); - if (retVal == 0 || buffer != NULL) { - if (buffer == NULL) { - libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); - } else { - if (strncmp(protocol, (char*)buffer, strlen(protocol)) == 0) { - free(buffer); - return 1; - } - else { - libp2p_logger_error("secio", "Expected the secio protocol header, but received %s.\n", buffer); - } + int retVal = 0; + struct StreamMessage* buffer = NULL; + session->default_stream->read(session, &buffer, numSecs); + if (buffer == NULL) { + libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); + } else { + // see if they sent the correct response + if (strncmp(protocol, (char*)buffer->data, strlen(protocol)) == 0) { + retVal = 1; + } + else { + libp2p_logger_error("secio", "Expected the secio protocol header, but received %s.\n", buffer); } } - if (buffer != NULL) - free(buffer); - return 0; + libp2p_stream_message_free(buffer); + return retVal; } /** @@ -782,9 +779,8 @@ int libp2p_secio_encrypted_write(void* stream_context, const unsigned char* byte * @param outgoing_size the amount of memory allocated for the results * @returns number of unencrypted bytes */ -int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* incoming, size_t incoming_size, unsigned char** outgoing, size_t* outgoing_size) { +int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* incoming, size_t incoming_size, struct StreamMessage** outgoing) { size_t data_section_size = incoming_size - 32; - *outgoing_size = 0; unsigned char* buffer; // verify MAC @@ -802,9 +798,13 @@ int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* in // MAC verification failed libp2p_logger_error("secio", "libp2p_secio_decrypt: MAC verification failed.\n"); // copy the raw bytes into outgoing for further analysis - *outgoing = (unsigned char*)malloc(incoming_size); - *outgoing_size = incoming_size; - memcpy(*outgoing, incoming, incoming_size); + *outgoing = libp2p_stream_message_new(); + struct StreamMessage* message = *outgoing; + if (message != NULL) { + message->data_size = incoming_size; + message->data = (uint8_t*) malloc(incoming_size); + memcpy(message->data, incoming, incoming_size); + } return 0; } @@ -824,22 +824,28 @@ int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* in } mbedtls_aes_free(&cipher_ctx); - *outgoing = malloc(data_section_size); - *outgoing_size = data_section_size; - memcpy(*outgoing, buffer, data_section_size); + *outgoing = libp2p_stream_message_new(); + struct StreamMessage* message = *outgoing; + message->data_size = data_section_size; + message->data = (uint8_t*) malloc(data_section_size); + if (message->data == NULL) { + libp2p_stream_message_free(message); + *outgoing = NULL; + return 0; + } + memcpy(message->data, buffer, data_section_size); free(buffer); - return *outgoing_size; + return message->data_size; } /** * Read from an encrypted stream * @param session the session parameters * @param bytes where the bytes will be stored - * @param num_bytes the number of bytes read from the stream * @returns the number of bytes read */ -int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, size_t* num_bytes, int timeout_secs) { +int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** bytes, int timeout_secs) { int retVal = 0; struct SessionContext* session = (struct SessionContext*)stream_context; // reader uses the remote cipher and mac @@ -850,7 +856,7 @@ int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, siz libp2p_logger_error("secio", "Unencrypted_read returned false.\n"); goto exit; } - retVal = libp2p_secio_decrypt(session, incoming, incoming_size, bytes, num_bytes); + retVal = libp2p_secio_decrypt(session, incoming, incoming_size, bytes); if (!retVal) libp2p_logger_error("secio", "Decrypting incoming stream returned false.\n"); exit: @@ -871,6 +877,7 @@ int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, siz int libp2p_secio_handshake(struct SessionContext* local_session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { int retVal = 0; size_t results_size = 0, bytes_written = 0; + struct StreamMessage* stream_message = NULL; unsigned char* propose_in_bytes = NULL; // the remote protobuf size_t propose_in_size = 0; unsigned char* propose_out_bytes = NULL; // the local protobuf @@ -1172,16 +1179,16 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs // receive our nonce to verify encryption works libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce\n"); results = NULL; - int bytes_read = libp2p_secio_encrypted_read(local_session, &results, &results_size, 10); - if (bytes_read <= 0) { + int bytes_read = libp2p_secio_encrypted_read(local_session, &stream_message, 10); + if (bytes_read <= 0 || stream_message == NULL) { libp2p_logger_error("secio", "Encrypted read returned %d\n", bytes_read); goto exit; } - if (results_size != 16) { - libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", results_size); + if (stream_message->data_size != 16) { + libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", stream_message->data_size); goto exit; } - if (libp2p_secio_bytes_compare(results, (unsigned char*)local_session->local_nonce, 16) != 0) { + if (libp2p_secio_bytes_compare(stream_message->data, (unsigned char*)local_session->local_nonce, 16) != 0) { libp2p_logger_error("secio", "Bytes of nonce did not match\n"); goto exit; } @@ -1221,10 +1228,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs libp2p_secio_propose_free(propose_out); libp2p_secio_propose_free(propose_in); + libp2p_stream_message_free(stream_message); - if (retVal == 1) { - //libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake success!\n"); - } else { + if (retVal != 1) { libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n"); } return retVal; diff --git a/test/test_multistream.h b/test/test_multistream.h index 2ceeb44..d839fe6 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -30,8 +30,7 @@ int test_multistream_connect() { int test_multistream_get_list() { int retVal = 0; - unsigned char* response; - size_t response_size; + struct StreamMessage* response; char* filtered = NULL; struct SessionContext session; @@ -47,13 +46,13 @@ int test_multistream_get_list() { goto exit; // retrieve response - retVal = libp2p_net_multistream_read(&session, &response, &response_size, 5); + retVal = libp2p_net_multistream_read(&session, &response, 5); if (retVal <= 0) goto exit; - filtered = malloc(response_size + 1); - strncpy(filtered, (char*)response, response_size); - filtered[response_size] = 0; + filtered = malloc(response->data_size + 1); + strncpy(filtered, (char*)response->data, response->data_size); + filtered[response->data_size] = 0; fprintf(stdout, "Response from multistream ls: %s", (char*)filtered); @@ -64,8 +63,7 @@ int test_multistream_get_list() { session.insecure_stream->close(&session); libp2p_net_multistream_stream_free(session.insecure_stream); } - if (response != NULL) - free(response); + libp2p_stream_message_free(response); if (filtered != NULL) free(filtered); diff --git a/test/test_secio.h b/test/test_secio.h index a377201..f90639f 100644 --- a/test/test_secio.h +++ b/test/test_secio.h @@ -159,17 +159,16 @@ int test_secio_handshake() { } // retrieve the response - unsigned char* results; - size_t results_size; - if (libp2p_net_multistream_read(secure_session, &results, &results_size, 30) == 0) { + struct StreamMessage* results; + if (libp2p_net_multistream_read(secure_session, &results, 30) == 0) { fprintf(stdout, "Unable to read ls results from multistream\n"); free(results); goto exit; } - fprintf(stdout, "Results of ls (%d bytes long):\n%s\n", (int)results_size, results); + fprintf(stdout, "Results of ls (%d bytes long):\n%s\n", (int)results->data_size, results->data); - free(results); + libp2p_stream_message_free(results); results = NULL; // try to yamux char* yamux_string = "/yamux/1.0.0\n"; @@ -177,14 +176,14 @@ int test_secio_handshake() { libp2p_logger_error("test_secio", "Unable to send yamux protocol request\n"); goto exit; } - if (!libp2p_net_multistream_read(secure_session, &results, &results_size, 30)) { + if (!libp2p_net_multistream_read(secure_session, &results, 30)) { libp2p_logger_error("test_secio", "Unable to read reply to yamux request.\n"); goto exit; } - fprintf(stdout, "Results of yamux request: %s\n", results); + fprintf(stdout, "Results of yamux request: %s\n", results->data); - free(results); + libp2p_stream_message_free(results); results = NULL; retVal = 1; @@ -496,17 +495,16 @@ int test_secio_handshake_go() { } // retrieve the response - unsigned char* results; - size_t results_size; - if (libp2p_net_multistream_read(secure_session, &results, &results_size, 30) == 0) { + struct StreamMessage* results; + if (libp2p_net_multistream_read(secure_session, &results, 30) == 0) { fprintf(stdout, "Unable to read ls results from multistream\n"); free(results); goto exit; } - fprintf(stdout, "Results of ls: %.*s", (int)results_size, results); + fprintf(stdout, "Results of ls: %.*s", (int)results->data_size, results->data); - free(results); + libp2p_stream_message_free(results); results = NULL; retVal = 1; diff --git a/yamux/yamux.c b/yamux/yamux.c index f22abee..189a39c 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -71,18 +71,22 @@ int yamux_send_protocol(struct SessionContext* context) { */ int yamux_receive_protocol(struct SessionContext* context) { char* protocol = "/yamux/1.0.0\n"; - uint8_t* results = NULL; - size_t results_size = 0; - if (!context->default_stream->read(context, &results, &results_size, 30)) { + struct StreamMessage* results = NULL; + int retVal = 0; + + if (!context->default_stream->read(context, &results, 30)) { libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); - return 0; + goto exit; } // the first byte is the size, so skip it - char* ptr = strstr((char*)&results[1], protocol); - if (ptr == NULL || ptr - (char*)results > 1) { - return 0; + char* ptr = strstr((char*)&results->data[1], protocol); + if (ptr == NULL || ptr - (char*)results->data > 1) { + goto exit; } - return 1; + retVal = 1; + exit: + libp2p_stream_message_free(results); + return retVal; } /***