From 836fb5387bde9eece468516d4fa0f04d8bfab641 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 31 Jul 2017 17:59:51 -0500 Subject: [PATCH] Attempting to fulfill remote requests --- exchange/bitswap/engine.c | 4 +- exchange/bitswap/message.c | 2 + exchange/bitswap/network.c | 28 +++++++- exchange/bitswap/peer_request_queue.c | 64 +++++++++++++------ exchange/bitswap/wantlist_queue.c | 22 +++---- .../exchange/bitswap/peer_request_queue.h | 17 ++++- .../ipfs/exchange/bitswap/wantlist_queue.h | 2 +- 7 files changed, 101 insertions(+), 38 deletions(-) diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index 3ff1458..e36a786 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -30,7 +30,7 @@ int ipfs_bitswap_engine_free(struct BitswapEngine* engine) { } /*** - * A separate thread that processes the queue + * A separate thread that processes the queue of local requests * @param context the context */ void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { @@ -50,7 +50,7 @@ void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { } /*** - * A separate thread that processes the queue + * A separate thread that processes the queue of remote requests * @param context the context */ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { diff --git a/exchange/bitswap/message.c b/exchange/bitswap/message.c index 92d3777..189de96 100644 --- a/exchange/bitswap/message.c +++ b/exchange/bitswap/message.c @@ -682,6 +682,8 @@ int ipfs_bitswap_message_add_blocks(struct BitswapMessage* message, struct Libp2 if (message == NULL) return 0; + if (blocks == NULL || blocks->total == 0) + return 0; if (message->payload == NULL) { message->payload = libp2p_utils_vector_new(blocks->total); if (message->payload == NULL) diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index fd799d7..74cfc40 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -43,6 +43,28 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru return 1; } +/*** + * Remove a cid from the queue + * @param cids the vector of cids + * @param cid the cid to remove + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection, struct Cid* cid) { + if (collection == NULL || cid == NULL) + return 0; + + for(int i = 0; i < collection->total; collection++) { + const struct Cid* current = (const struct Cid*)libp2p_utils_vector_get(collection, i); + if (ipfs_cid_compare(current, cid) == 0) { + libp2p_utils_vector_delete(collection, i); + return 1; + } + } + return 0; +} + + + /*** * Handle a raw incoming bitswap message from the network * @param node us @@ -116,8 +138,10 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc ipfs_cid_free(cid); return 0; } - // add the cid to their queue - libp2p_utils_vector_add(queueEntry->current->cids, cid); + if (entry->cancel) + ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid); + else + libp2p_utils_vector_add(queueEntry->current->cids_they_want, cid); } } return 1; diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 6e56d54..735c93e 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -15,21 +15,32 @@ * @returns a new PeerRequest struct or NULL if there was a problem */ struct PeerRequest* ipfs_bitswap_peer_request_new() { + int retVal = 0; struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest)); if (request != NULL) { - request->cids = libp2p_utils_vector_new(1); - if (request->cids == NULL) { - free(request); - return NULL; - } - request->blocks = libp2p_utils_vector_new(1); - if (request->blocks == NULL) { - libp2p_utils_vector_free(request->cids); - free(request); - return NULL; - } + request->cids_they_want = libp2p_utils_vector_new(1); + if (request->cids_they_want == NULL) + goto exit; + request->cids_we_want = libp2p_utils_vector_new(1); + if (request->cids_we_want == NULL) + goto exit; + request->blocks_we_want_to_send = libp2p_utils_vector_new(1); + if (request->blocks_we_want_to_send == NULL) + goto exit; request->peer = NULL; } + retVal = 1; + exit: + if (retVal == 0 && request != NULL) { + if (request->blocks_we_want_to_send != NULL) + libp2p_utils_vector_free(request->blocks_we_want_to_send); + if (request->cids_they_want != NULL) + libp2p_utils_vector_free(request->cids_they_want); + if (request->cids_we_want != NULL) + libp2p_utils_vector_free(request->cids_we_want); + free(request); + request = NULL; + } return request; } @@ -194,9 +205,14 @@ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) { * @param block the block * @returns true(1) on success, otherwise false(0) */ -int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block) { +int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct Libp2pPeer* who, struct Block* block) { // find the right entry - // add to the block array + struct PeerRequest* entry = ipfs_peer_request_queue_find_peer(queue, who); + if (entry != NULL) + { + // add to the block array + libp2p_utils_vector_add(entry->blocks_we_want_to_send, block); + } return 0; } @@ -207,9 +223,19 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct * @returns true(1) on succes, otherwise false(0) */ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) { + // determine if we have enough information to continue + if (request == NULL) + return 0; + if (request->peer == NULL) + return 0; + if (!request->peer->is_local) { + if (request->peer->connection_type != CONNECTION_TYPE_CONNECTED) + if (request->peer->addr_head == NULL || request->peer->addr_head->item == NULL) + return 0; + } // determine if we're connected - int connected = request->peer == NULL || request->peer->connection_type == CONNECTION_TYPE_CONNECTED; - int need_to_connect = request->cids != NULL; + int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED; + int need_to_connect = request->cids_we_want->total != 0 || request->cids_they_want->total != 0 || request->blocks_we_want_to_send->total != 0; // determine if we need to connect if (need_to_connect) { @@ -220,10 +246,10 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context if (connected) { // build a message struct BitswapMessage* msg = ipfs_bitswap_message_new(); - // add requests - ipfs_bitswap_message_add_wantlist_items(msg, request->cids); - // add blocks - ipfs_bitswap_message_add_blocks(msg, request->blocks); + // see if we can fulfill any of their requests. If so, fill in msg->payload + ipfs_bitswap_message_add_blocks(msg, request->blocks_we_want_to_send); + // add requests that we would like + ipfs_bitswap_message_add_wantlist_items(msg, request->cids_we_want); // send message if (ipfs_bitswap_network_send_message(context, request->peer, msg)) return 1; diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index be0f683..3884aba 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -197,9 +197,9 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const // it's local, there should be only 1 return 0; } else { - struct SessionContext* contextA = (struct SessionContext*)a->context; - struct SessionContext* contextB = (struct SessionContext*)b->context; - return libp2p_session_context_compare(contextA, contextB); + struct Libp2pPeer* contextA = (struct Libp2pPeer*)a->context; + struct Libp2pPeer* contextB = (struct Libp2pPeer*)b->context; + return libp2p_peer_compare(contextA, contextB); } } @@ -250,11 +250,8 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i); // add this to their queue struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current); - libp2p_utils_vector_add(queueEntry->cids, cid); - // process this queue - // NOTE: We need to ask the remotes via bitswap, and wait for a response before returning - // there will need to be some fancy stuff to know when we get it back so that this method - // can return with the block + libp2p_utils_vector_add(queueEntry->cids_we_want, cid); + // process this queue via bitswap protocol ipfs_bitswap_peer_request_process_entry(context, queueEntry); } return 1; @@ -263,7 +260,10 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc } /** - * Called by the Bitswap engine, this processes an item on the WantListQueue + * Called by the Bitswap engine, this processes an item on the WantListQueue. This is called when + * we want a file locally from a remote source. Send a message immediately, adding in stuff that + * perhaps the remote source wanted. + * * @param context the context * @param entry the WantListQueueEntry * @returns true(1) on success, false(0) if not. @@ -293,8 +293,8 @@ int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct W if (session->type == WANTLIST_SESSION_TYPE_LOCAL) { context->ipfsNode->exchange->HasBlock(context->ipfsNode->exchange, entry->block); } else { - struct SessionContext* sessionContext = (struct SessionContext*) session->context; - ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, sessionContext, entry->block); + struct Libp2pPeer* peer = (struct Libp2pPeer*) session->context; + ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, peer, entry->block); } } diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h index d83c754..5107ee8 100644 --- a/include/ipfs/exchange/bitswap/peer_request_queue.h +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -9,11 +9,22 @@ #include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/blocks/block.h" +struct CidEntry { + struct Cid* cid; + int cancel; +}; + struct PeerRequest { pthread_mutex_t request_mutex; struct Libp2pPeer* peer; - struct Libp2pVector* cids; - struct Libp2pVector* blocks; + // Cid collection of cids that they want. Note cancellations are removed immediately + struct Libp2pVector* cids_they_want; + // CidEntry collection of cids that we want or are canceling + struct Libp2pVector* cids_we_want; + // blocks to send to them + struct Libp2pVector* blocks_we_want_to_send; + // blocks they sent us are processed immediately, so no queue necessary + // although the cid can go in cids_we_want again, with a cancel flag }; struct PeerRequestEntry { @@ -92,7 +103,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR * @param block the block * @returns true(1) on success, otherwise false(0) */ -int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block); +int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct Libp2pPeer* who, struct Block* block); /*** * Allocate resources for a PeerRequestEntry struct diff --git a/include/ipfs/exchange/bitswap/wantlist_queue.h b/include/ipfs/exchange/bitswap/wantlist_queue.h index 05ba823..21f6a59 100644 --- a/include/ipfs/exchange/bitswap/wantlist_queue.h +++ b/include/ipfs/exchange/bitswap/wantlist_queue.h @@ -14,7 +14,7 @@ enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_RE struct WantListSession { enum WantListSessionType type; - void* context; // either an IpfsNode (local) or a SessionContext (remote) + void* context; // either an IpfsNode (local) or a Libp2pPeer (remote) }; struct WantListQueueEntry {