diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 7e5f96a..d6c1b25 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -5,6 +5,7 @@ * For a somewhat accurate diagram of how this may work, @see https://github.com/ipfs/js-ipfs-bitswap */ +#include "libp2p/utils/logger.h" #include "ipfs/exchange/bitswap/network.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" @@ -104,23 +105,16 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc ipfs_bitswap_message_free(message); return 0; } - struct Libp2pPeer* temp_peer = libp2p_peer_new(); - temp_peer->id_size = strlen(sessionContext->remote_peer_id); - temp_peer->id = malloc(temp_peer->id_size); - if (temp_peer->id == NULL) { - libp2p_peer_free(temp_peer); - ipfs_bitswap_message_free(message); - return 0; - } - memcpy(temp_peer->id, sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id)); - struct Libp2pPeer* peer = libp2p_peerstore_get_or_add_peer(node->peerstore, temp_peer); - libp2p_peer_free(temp_peer); + struct Libp2pPeer* peer = libp2p_peerstore_get_or_add_peer_by_id(node->peerstore, (unsigned char*)sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id)); if (peer == NULL) { + libp2p_logger_error("bitswap_network", "Unable to find or add peer %s of length %d to peerstore.\n", sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id)); ipfs_bitswap_message_free(message); return 0; } + // find the queue struct PeerRequestEntry* queueEntry = ipfs_bitswap_peer_request_queue_find_entry(bitswapContext->peerRequestQueue, peer); if (queueEntry == NULL) { + // add the queue struct PeerRequest* peerRequest =ipfs_bitswap_peer_request_new(); peerRequest->peer = peer; ipfs_bitswap_peer_request_queue_add(bitswapContext->peerRequestQueue, peerRequest); @@ -134,17 +128,18 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc struct WantlistEntry* entry = (struct WantlistEntry*) libp2p_utils_vector_get(message->wantlist->entries, i); // turn the "block" back into a cid struct Cid* cid = NULL; - if (!ipfs_cid_protobuf_decode(entry->block, entry->block_size, &cid)) { + if (!ipfs_cid_protobuf_decode(entry->block, entry->block_size, &cid) || cid->hash_length == 0) { + libp2p_logger_error("bitswap_network", "Message had invalid CID\n"); ipfs_cid_free(cid); return 0; } if (entry->cancel) ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid); else { - struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new(); - entry->cid = cid; - entry->cancel = 0; - libp2p_utils_vector_add(queueEntry->current->cids_they_want, entry); + struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new(); + cidEntry->cid = cid; + cidEntry->cancel = 0; + libp2p_utils_vector_add(queueEntry->current->cids_they_want, cidEntry); } } } diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 0bd8e41..51aa6dc 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -160,6 +160,39 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR return NULL; } +/*** + * Determine if any of the cids in the list are waiting to be filled + * @param cidEntries a Vector of CidEntry objects + * @returns true(1) if we have some waiting, false(0) otherwise + */ +int ipfs_bitswap_peer_request_cids_waiting(struct Libp2pVector* cidEntries) { + if (cidEntries == NULL) + return 0; + for(int i = 0; i < cidEntries->total; i++) { + const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i); + if (entry != NULL && !entry->cancel) + return 1; + } + return 0; +} + +/*** + * Determine if there is something to process in this request + * @param entry the entry to look at + * @returns true(1) if there is something to do + */ +int ipfs_bitswap_peer_request_something_to_do(struct PeerRequestEntry* entry) { + if (entry != NULL) { + struct PeerRequest* request = entry->current; + if (request->blocks_we_want_to_send->total > 0) + return 1; + if (request->cids_we_want->total > 0) + return 1; + if (ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want)) + return 1; + } + return 0; +} /** * Pull a PeerRequest off the queue @@ -171,13 +204,17 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* if (queue != NULL) { pthread_mutex_lock(&queue->queue_mutex); struct PeerRequestEntry* entry = queue->first; - if (entry != NULL) { + if (entry != NULL && ipfs_bitswap_peer_request_something_to_do(entry)) { retVal = entry->current; queue->first = queue->first->next; } pthread_mutex_unlock(&queue->queue_mutex); + // disable temporarily + // JMJ Debugging + /* if (entry != NULL) ipfs_bitswap_peer_request_entry_free(entry); + */ } return retVal; } @@ -246,22 +283,6 @@ int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext* return 0; } -/*** - * Determine if any of the cids in the list are waiting to be filled - * @param cidEntries a Vector of CidEntry objects - * @returns true(1) if we have some waiting, false(0) otherwise - */ -int ipfs_peer_request_cids_waiting(struct Libp2pVector* cidEntries) { - if (cidEntries == NULL) - return 0; - for(int i = 0; i < cidEntries->total; i++) { - const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i); - if (entry != NULL && !entry->cancel) - return 1; - } - return 0; -} - /**** * Handle a PeerRequest * @param context the BitswapContext @@ -281,7 +302,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context } // determine if we're connected int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED; - int need_to_connect = request->cids_we_want->total != 0 || ipfs_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0; + int need_to_connect = request->cids_we_want->total != 0 || ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0; // determine if we need to connect if (need_to_connect) { diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index 02afcf1..949eaf7 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -604,7 +604,7 @@ int test_routing_retrieve_large_file() { stat(temp_file_name, &buf); if (buf.st_size != 1000000) { - fprintf(stderr, "File size should be 1000000, but is %lu\n", buf.st_size); + fprintf(stderr, "File size should be 1000000, but is %lu\n", (unsigned long)buf.st_size); goto exit; } diff --git a/util/thread_pool.c b/util/thread_pool.c index c7a3624..009c6e7 100644 --- a/util/thread_pool.c +++ b/util/thread_pool.c @@ -306,6 +306,10 @@ static void thread_hold(int sig_id) { } +#if defined(__APPLE__) && defined(__MACH__) + int pthread_setname_np(const char* name); +#endif + /* What each thread is doing * * In principle this is an endless loop. The only time this loop gets interuppted is once