diff --git a/core/http_request.c b/core/http_request.c index 312e926..2d922e8 100644 --- a/core/http_request.c +++ b/core/http_request.c @@ -333,7 +333,8 @@ int ipfs_core_http_process_swarm_connect(struct IpfsNode* local_node, struct Htt return 0; } sprintf((char*)response->bytes, json, address); - libp2p_peer_free(new_peer); + // getting rid of the peer here will close the connection. That's not what we want + //libp2p_peer_free(new_peer); multiaddress_free(ma); return 1; diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index 9b17f5a..2efa264 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -96,30 +96,34 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { if (current_peer_entry->sessionContext == NULL || current_peer_entry->sessionContext->default_stream == NULL) { current_peer_entry->connection_type = CONNECTION_TYPE_NOT_CONNECTED; } else { - //libp2p_logger_debug("bitswap_engine", "We're connected to %s. Lets see if there is a message waiting for us.\n", libp2p_peer_id_to_string(current_peer_entry)); - int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext); - if (retVal < 0) { - libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n"); - libp2p_peer_handle_connection_error(current_peer_entry); - } else if (retVal > 0) { - libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry)); - unsigned char* buffer = NULL; - size_t buffer_len = 0; - if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) { - // handle it - libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer); - int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); - free(buffer); - did_some_processing = 1; - if (retVal == -1) { - libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n"); - // there was a problem. Clean up + // check the network to see if there is anything waiting for us (if the stream is idle) + if (libp2p_stream_try_lock(current_peer_entry->sessionContext->default_stream)) { + libp2p_logger_error("bitswap_engine", "I have the lock for peer %s.\n", libp2p_peer_id_to_string(current_peer_entry)); + int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext); + if (retVal < 0) { + libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n"); + libp2p_peer_handle_connection_error(current_peer_entry); + } else if (retVal > 0) { + libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry)); + unsigned char* buffer = NULL; + size_t buffer_len = 0; + if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) { + // handle it + libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer); + int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); + free(buffer); + did_some_processing = 1; + if (retVal == -1) { + libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n"); + // there was a problem. Clean up + libp2p_peer_handle_connection_error(current_peer_entry); + } + } else { + libp2p_logger_error("bitswap_engine", "It was said that there was %d bytes to read, but there wasn't. Cleaning up connection.\n"); libp2p_peer_handle_connection_error(current_peer_entry); } - } else { - libp2p_logger_error("bitswap_engine", "It was said that there was %d bytes to read, but there wasn't. Cleaning up connection.\n"); - libp2p_peer_handle_connection_error(current_peer_entry); } + libp2p_stream_unlock(current_peer_entry->sessionContext->default_stream); } } } else {