Fixing repeated needless sends
This commit is contained in:
parent
8944e407e9
commit
d226e480c9
5 changed files with 42 additions and 6 deletions
|
@ -179,14 +179,14 @@ void ipfs_null_connection (void *ptr) {
|
||||||
retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers);
|
retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers);
|
||||||
free(results);
|
free(results);
|
||||||
if (retVal == -1) {
|
if (retVal == -1) {
|
||||||
libp2p_logger_debug("null", "ipfs_null_marshal returned false\n");
|
libp2p_logger_debug("null", "protocol_marshal returned error.\n");
|
||||||
break;
|
break;
|
||||||
} else if (retVal == 0) {
|
} else if (retVal == 0) {
|
||||||
// clean up, but let someone else handle this from now on
|
// clean up, but let someone else handle this from now on
|
||||||
libp2p_logger_debug("null", "ipfs_null_marshal returns 0. The daemon will no longer handle this.\n");
|
libp2p_logger_debug("null", "protocol_marshal returned 0. The daemon will no longer handle this.\n");
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
libp2p_logger_debug("null", "ipfs_null_marshal returned 1. Looping again.");
|
libp2p_logger_debug("null", "protocol_marshal returned 1. Looping again.\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -91,7 +91,7 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
||||||
libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n");
|
libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n");
|
||||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||||
} else if (retVal > 0) {
|
} else if (retVal > 0) {
|
||||||
libp2p_logger_debug("bitswap_engine", "Something waiting on network for peer %s.\n", current_peer_entry->id);
|
libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, current_peer_entry->id);
|
||||||
unsigned char* buffer = NULL;
|
unsigned char* buffer = NULL;
|
||||||
size_t buffer_len = 0;
|
size_t buffer_len = 0;
|
||||||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
|
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
|
||||||
|
@ -100,9 +100,13 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
||||||
free(buffer);
|
free(buffer);
|
||||||
did_some_processing = 1;
|
did_some_processing = 1;
|
||||||
if (retVal == -1) {
|
if (retVal == -1) {
|
||||||
|
libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n");
|
||||||
// there was a problem. Clean up
|
// there was a problem. Clean up
|
||||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
libp2p_logger_error("bitswap_engine", "It was said that there was %d bytes to read, but there wasn't. Cleaning up connection.\n");
|
||||||
|
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -657,7 +657,11 @@ int ipfs_bitswap_message_add_wantlist_items(struct BitswapMessage* message, stru
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
for(int i = 0; i < cids->total; i++) {
|
for(int i = 0; i < cids->total; i++) {
|
||||||
const struct CidEntry* cidEntry = (const struct CidEntry*)libp2p_utils_vector_get(cids, i);
|
struct CidEntry* cidEntry = (struct CidEntry*)libp2p_utils_vector_get(cids, i);
|
||||||
|
if (cidEntry->cancel && cidEntry->cancel_has_been_sent)
|
||||||
|
continue;
|
||||||
|
if (!cidEntry->cancel && cidEntry->request_has_been_sent)
|
||||||
|
continue;
|
||||||
struct WantlistEntry* entry = ipfs_bitswap_wantlist_entry_new();
|
struct WantlistEntry* entry = ipfs_bitswap_wantlist_entry_new();
|
||||||
entry->block_size = ipfs_cid_protobuf_encode_size(cidEntry->cid);
|
entry->block_size = ipfs_cid_protobuf_encode_size(cidEntry->cid);
|
||||||
entry->block = (unsigned char*) malloc(entry->block_size);
|
entry->block = (unsigned char*) malloc(entry->block_size);
|
||||||
|
@ -668,6 +672,10 @@ int ipfs_bitswap_message_add_wantlist_items(struct BitswapMessage* message, stru
|
||||||
entry->cancel = cidEntry->cancel;
|
entry->cancel = cidEntry->cancel;
|
||||||
entry->priority = 1;
|
entry->priority = 1;
|
||||||
libp2p_utils_vector_add(message->wantlist->entries, entry);
|
libp2p_utils_vector_add(message->wantlist->entries, entry);
|
||||||
|
if (cidEntry->cancel)
|
||||||
|
cidEntry->cancel_has_been_sent = 1;
|
||||||
|
else
|
||||||
|
cidEntry->request_has_been_sent = 1;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,8 @@ struct CidEntry* ipfs_bitswap_peer_request_cid_entry_new() {
|
||||||
if (entry != NULL) {
|
if (entry != NULL) {
|
||||||
entry->cid = NULL;
|
entry->cid = NULL;
|
||||||
entry->cancel = 0;
|
entry->cancel = 0;
|
||||||
|
entry->cancel_has_been_sent = 0;
|
||||||
|
entry->request_has_been_sent = 0;
|
||||||
}
|
}
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
@ -337,6 +339,26 @@ int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext*
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determine if we have anything we want (that we haven't sent already)
|
||||||
|
* @param cid_entries the list of CidEntries that are in our queue to be sent
|
||||||
|
* @returns true(1) if we have something to send, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_we_want_cids(struct Libp2pVector* cid_entries) {
|
||||||
|
if (cid_entries == NULL)
|
||||||
|
return 0;
|
||||||
|
if (cid_entries->total == 0)
|
||||||
|
return 0;
|
||||||
|
for(int i = 0; i < cid_entries->total; i++) {
|
||||||
|
const struct CidEntry* entry = (const struct CidEntry*) libp2p_utils_vector_get(cid_entries, i);
|
||||||
|
if (entry->cancel && !entry->cancel_has_been_sent)
|
||||||
|
return 1;
|
||||||
|
if (!entry->cancel && !entry->request_has_been_sent)
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/****
|
/****
|
||||||
* Handle a PeerRequest
|
* Handle a PeerRequest
|
||||||
* @param context the BitswapContext
|
* @param context the BitswapContext
|
||||||
|
@ -356,7 +378,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
|
||||||
}
|
}
|
||||||
// determine if we're connected
|
// determine if we're connected
|
||||||
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
|
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
|
||||||
int need_to_connect = request->cids_we_want->total != 0 || ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0;
|
int need_to_connect = ipfs_bitswap_peer_request_we_want_cids(request->cids_we_want) || ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0;
|
||||||
|
|
||||||
// determine if we need to connect
|
// determine if we need to connect
|
||||||
if (need_to_connect) {
|
if (need_to_connect) {
|
||||||
|
|
|
@ -12,6 +12,8 @@
|
||||||
struct CidEntry {
|
struct CidEntry {
|
||||||
struct Cid* cid;
|
struct Cid* cid;
|
||||||
int cancel;
|
int cancel;
|
||||||
|
int cancel_has_been_sent;
|
||||||
|
int request_has_been_sent;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct PeerRequest {
|
struct PeerRequest {
|
||||||
|
|
Loading…
Reference in a new issue