From 06389966840f4873f1fadeeb5333e90da922956b Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Wed, 2 Aug 2017 07:04:06 -0500 Subject: [PATCH] processing want list --- exchange/bitswap/network.c | 12 +++-- exchange/bitswap/peer_request_queue.c | 49 ++++++++++++++++++- exchange/bitswap/wantlist_queue.c | 4 +- .../exchange/bitswap/peer_request_queue.h | 8 ++- 4 files changed, 66 insertions(+), 7 deletions(-) diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 74cfc40..7e5f96a 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -54,8 +54,8 @@ int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection, return 0; for(int i = 0; i < collection->total; collection++) { - const struct Cid* current = (const struct Cid*)libp2p_utils_vector_get(collection, i); - if (ipfs_cid_compare(current, cid) == 0) { + const struct CidEntry* current = (const struct CidEntry*)libp2p_utils_vector_get(collection, i); + if (ipfs_cid_compare(current->cid, cid) == 0) { libp2p_utils_vector_delete(collection, i); return 1; } @@ -140,8 +140,12 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc } if (entry->cancel) ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid); - else - libp2p_utils_vector_add(queueEntry->current->cids_they_want, cid); + else { + struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new(); + entry->cid = cid; + entry->cancel = 0; + libp2p_utils_vector_add(queueEntry->current->cids_they_want, entry); + } } } return 1; diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 735c93e..0bd8e41 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -10,6 +10,18 @@ #include "ipfs/exchange/bitswap/message.h" #include "ipfs/exchange/bitswap/network.h" +/*** + * Allocate memory for CidEntry + * @returns new CidEntry struct + */ +struct CidEntry* ipfs_bitswap_peer_request_cid_entry_new() { + struct CidEntry* entry = (struct CidEntry*) malloc(sizeof(struct CidEntry)); + if (entry != NULL) { + entry->cid = NULL; + entry->cancel = 0; + } + return entry; +} /** * Allocate resources for a new PeerRequest * @returns a new PeerRequest struct or NULL if there was a problem @@ -216,6 +228,40 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct return 0; } +/**** + * Find blocks they want, and put them in the request + */ +int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext* context, struct PeerRequest* request) { + for(int i = 0; i < request->cids_they_want->total; i++) { + struct CidEntry* cidEntry = (struct CidEntry*)libp2p_utils_vector_get(request->cids_they_want, i); + if (cidEntry != NULL && !cidEntry->cancel) { + struct Block* block = NULL; + context->ipfsNode->blockstore->Get(context->ipfsNode->blockstore->blockstoreContext, cidEntry->cid, &block); + if (block != NULL) { + libp2p_utils_vector_add(request->blocks_we_want_to_send, block); + cidEntry->cancel = 1; + } + } + } + return 0; +} + +/*** + * Determine if any of the cids in the list are waiting to be filled + * @param cidEntries a Vector of CidEntry objects + * @returns true(1) if we have some waiting, false(0) otherwise + */ +int ipfs_peer_request_cids_waiting(struct Libp2pVector* cidEntries) { + if (cidEntries == NULL) + return 0; + for(int i = 0; i < cidEntries->total; i++) { + const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i); + if (entry != NULL && !entry->cancel) + return 1; + } + return 0; +} + /**** * Handle a PeerRequest * @param context the BitswapContext @@ -235,7 +281,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context } // determine if we're connected int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED; - int need_to_connect = request->cids_we_want->total != 0 || request->cids_they_want->total != 0 || request->blocks_we_want_to_send->total != 0; + int need_to_connect = request->cids_we_want->total != 0 || ipfs_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0; // determine if we need to connect if (need_to_connect) { @@ -247,6 +293,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context // build a message struct BitswapMessage* msg = ipfs_bitswap_message_new(); // see if we can fulfill any of their requests. If so, fill in msg->payload + ipfs_bitswap_peer_request_get_blocks_they_want(context, request); ipfs_bitswap_message_add_blocks(msg, request->blocks_we_want_to_send); // add requests that we would like ipfs_bitswap_message_add_wantlist_items(msg, request->cids_we_want); diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index 3884aba..0d850f6 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -250,7 +250,9 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i); // add this to their queue struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current); - libp2p_utils_vector_add(queueEntry->cids_we_want, cid); + struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new(); + entry->cid = cid; + libp2p_utils_vector_add(queueEntry->cids_we_want, entry); // process this queue via bitswap protocol ipfs_bitswap_peer_request_process_entry(context, queueEntry); } diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h index 5107ee8..d227a82 100644 --- a/include/ipfs/exchange/bitswap/peer_request_queue.h +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -17,7 +17,7 @@ struct CidEntry { struct PeerRequest { pthread_mutex_t request_mutex; struct Libp2pPeer* peer; - // Cid collection of cids that they want. Note cancellations are removed immediately + // CidEntry collection of cids that they want struct Libp2pVector* cids_they_want; // CidEntry collection of cids that we want or are canceling struct Libp2pVector* cids_we_want; @@ -39,6 +39,12 @@ struct PeerRequestQueue { struct PeerRequestEntry* last; }; +/*** + * Allocate memory for CidEntry + * @returns new CidEntry struct + */ +struct CidEntry* ipfs_bitswap_peer_request_cid_entry_new(); + /** * Allocate resources for a new PeerRequest * @returns a new PeerRequest struct or NULL if there was a problem