diff --git a/core/null.c b/core/null.c index 356b3c2..b92a314 100644 --- a/core/null.c +++ b/core/null.c @@ -58,20 +58,18 @@ void ipfs_null_connection (void *ptr) { libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); // try to read from the network - uint8_t *results = 0; - size_t bytes_read = 0; + struct StreamMessage *results = 0; // handle the call for(;;) { // immediately attempt to negotiate multistream if (!libp2p_net_multistream_send_protocol(session)) break; - if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT)) { - // problem reading; - break; + if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) { + // problem reading; + break; } - retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers); - if (results != NULL) - free(results); + retVal = libp2p_protocol_marshal(results->data, results->data_size, session, connection_param->local_node->protocol_handlers); + libp2p_stream_message_free(results); // exit the loop on error (or if they ask us to no longer loop by returning 0) if (retVal <= 0) break; diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index bb6eb68..e995eb8 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -104,13 +104,12 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { libp2p_peer_handle_connection_error(current_peer_entry); } else if (retVal > 0) { libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry)); - unsigned char* buffer = NULL; - size_t buffer_len = 0; - if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) { + struct StreamMessage* buffer = NULL; + if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) { // handle it - libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer); - int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); - free(buffer); + libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data); + int retVal = libp2p_protocol_marshal(buffer->data, buffer->data_size, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); + libp2p_stream_message_free(buffer); did_some_processing = 1; if (retVal == -1) { libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n"); diff --git a/importer/resolver.c b/importer/resolver.c index 7ba6698..e51c6b1 100644 --- a/importer/resolver.c +++ b/importer/resolver.c @@ -168,15 +168,15 @@ struct HashtableNode* ipfs_resolver_remote_get(const char* path, struct Hashtabl if (!libp2p_routing_dht_upgrade_stream(&session_context)) return NULL; stream->write(&session_context, message_protobuf, message_protobuf_size); - unsigned char* response; - size_t response_size; + struct StreamMessage* response; // we should get back a protobuf'd record - stream->read(&session_context, &response, &response_size, 5); - if (response_size == 1) + stream->read(&session_context, &response, 5); + if (response->data_size == 1) return NULL; // turn the protobuf into a Node struct HashtableNode* node; - ipfs_hashtable_node_protobuf_decode(response, response_size, &node); + ipfs_hashtable_node_protobuf_decode(response->data, response->data_size, &node); + libp2p_stream_message_free(response); return node; } diff --git a/journal/journal.c b/journal/journal.c index 1bafe8f..dccc017 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -303,6 +303,7 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { + struct StreamMessage* msg = NULL; // remove protocol uint8_t *incoming_pos = (uint8_t*) incoming; size_t pos_size = incoming_size; @@ -315,11 +316,15 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s break; } else { // read next segment from network - if (!session_context->default_stream->read(session_context, &incoming_pos, &pos_size, 10)) + if (!session_context->default_stream->read(session_context, &msg, 10)) { return -1; + } + incoming_pos = msg->data; second_read = 1; } } + libp2p_stream_message_free(msg); + msg = NULL; } libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id); struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;