Beginnings of stream locking
This commit is contained in:
parent
b399762d82
commit
cfde84b15c
2 changed files with 27 additions and 22 deletions
|
@ -333,7 +333,8 @@ int ipfs_core_http_process_swarm_connect(struct IpfsNode* local_node, struct Htt
|
|||
return 0;
|
||||
}
|
||||
sprintf((char*)response->bytes, json, address);
|
||||
libp2p_peer_free(new_peer);
|
||||
// getting rid of the peer here will close the connection. That's not what we want
|
||||
//libp2p_peer_free(new_peer);
|
||||
multiaddress_free(ma);
|
||||
return 1;
|
||||
|
||||
|
|
|
@ -96,30 +96,34 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
|||
if (current_peer_entry->sessionContext == NULL || current_peer_entry->sessionContext->default_stream == NULL) {
|
||||
current_peer_entry->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
|
||||
} else {
|
||||
//libp2p_logger_debug("bitswap_engine", "We're connected to %s. Lets see if there is a message waiting for us.\n", libp2p_peer_id_to_string(current_peer_entry));
|
||||
int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext);
|
||||
if (retVal < 0) {
|
||||
libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n");
|
||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||
} else if (retVal > 0) {
|
||||
libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry));
|
||||
unsigned char* buffer = NULL;
|
||||
size_t buffer_len = 0;
|
||||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
|
||||
// handle it
|
||||
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer);
|
||||
int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
||||
free(buffer);
|
||||
did_some_processing = 1;
|
||||
if (retVal == -1) {
|
||||
libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n");
|
||||
// there was a problem. Clean up
|
||||
// check the network to see if there is anything waiting for us (if the stream is idle)
|
||||
if (libp2p_stream_try_lock(current_peer_entry->sessionContext->default_stream)) {
|
||||
libp2p_logger_error("bitswap_engine", "I have the lock for peer %s.\n", libp2p_peer_id_to_string(current_peer_entry));
|
||||
int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext);
|
||||
if (retVal < 0) {
|
||||
libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n");
|
||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||
} else if (retVal > 0) {
|
||||
libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry));
|
||||
unsigned char* buffer = NULL;
|
||||
size_t buffer_len = 0;
|
||||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
|
||||
// handle it
|
||||
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer);
|
||||
int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
||||
free(buffer);
|
||||
did_some_processing = 1;
|
||||
if (retVal == -1) {
|
||||
libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n");
|
||||
// there was a problem. Clean up
|
||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||
}
|
||||
} else {
|
||||
libp2p_logger_error("bitswap_engine", "It was said that there was %d bytes to read, but there wasn't. Cleaning up connection.\n");
|
||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||
}
|
||||
} else {
|
||||
libp2p_logger_error("bitswap_engine", "It was said that there was %d bytes to read, but there wasn't. Cleaning up connection.\n");
|
||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||
}
|
||||
libp2p_stream_unlock(current_peer_entry->sessionContext->default_stream);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
Loading…
Reference in a new issue