processing want list
This commit is contained in:
parent
7632949e30
commit
0638996684
4 changed files with 66 additions and 7 deletions
|
@ -54,8 +54,8 @@ int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection,
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
for(int i = 0; i < collection->total; collection++) {
|
for(int i = 0; i < collection->total; collection++) {
|
||||||
const struct Cid* current = (const struct Cid*)libp2p_utils_vector_get(collection, i);
|
const struct CidEntry* current = (const struct CidEntry*)libp2p_utils_vector_get(collection, i);
|
||||||
if (ipfs_cid_compare(current, cid) == 0) {
|
if (ipfs_cid_compare(current->cid, cid) == 0) {
|
||||||
libp2p_utils_vector_delete(collection, i);
|
libp2p_utils_vector_delete(collection, i);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -140,8 +140,12 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc
|
||||||
}
|
}
|
||||||
if (entry->cancel)
|
if (entry->cancel)
|
||||||
ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid);
|
ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid);
|
||||||
else
|
else {
|
||||||
libp2p_utils_vector_add(queueEntry->current->cids_they_want, cid);
|
struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new();
|
||||||
|
entry->cid = cid;
|
||||||
|
entry->cancel = 0;
|
||||||
|
libp2p_utils_vector_add(queueEntry->current->cids_they_want, entry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -10,6 +10,18 @@
|
||||||
#include "ipfs/exchange/bitswap/message.h"
|
#include "ipfs/exchange/bitswap/message.h"
|
||||||
#include "ipfs/exchange/bitswap/network.h"
|
#include "ipfs/exchange/bitswap/network.h"
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Allocate memory for CidEntry
|
||||||
|
* @returns new CidEntry struct
|
||||||
|
*/
|
||||||
|
struct CidEntry* ipfs_bitswap_peer_request_cid_entry_new() {
|
||||||
|
struct CidEntry* entry = (struct CidEntry*) malloc(sizeof(struct CidEntry));
|
||||||
|
if (entry != NULL) {
|
||||||
|
entry->cid = NULL;
|
||||||
|
entry->cancel = 0;
|
||||||
|
}
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Allocate resources for a new PeerRequest
|
* Allocate resources for a new PeerRequest
|
||||||
* @returns a new PeerRequest struct or NULL if there was a problem
|
* @returns a new PeerRequest struct or NULL if there was a problem
|
||||||
|
@ -216,6 +228,40 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/****
|
||||||
|
* Find blocks they want, and put them in the request
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext* context, struct PeerRequest* request) {
|
||||||
|
for(int i = 0; i < request->cids_they_want->total; i++) {
|
||||||
|
struct CidEntry* cidEntry = (struct CidEntry*)libp2p_utils_vector_get(request->cids_they_want, i);
|
||||||
|
if (cidEntry != NULL && !cidEntry->cancel) {
|
||||||
|
struct Block* block = NULL;
|
||||||
|
context->ipfsNode->blockstore->Get(context->ipfsNode->blockstore->blockstoreContext, cidEntry->cid, &block);
|
||||||
|
if (block != NULL) {
|
||||||
|
libp2p_utils_vector_add(request->blocks_we_want_to_send, block);
|
||||||
|
cidEntry->cancel = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determine if any of the cids in the list are waiting to be filled
|
||||||
|
* @param cidEntries a Vector of CidEntry objects
|
||||||
|
* @returns true(1) if we have some waiting, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int ipfs_peer_request_cids_waiting(struct Libp2pVector* cidEntries) {
|
||||||
|
if (cidEntries == NULL)
|
||||||
|
return 0;
|
||||||
|
for(int i = 0; i < cidEntries->total; i++) {
|
||||||
|
const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i);
|
||||||
|
if (entry != NULL && !entry->cancel)
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
/****
|
/****
|
||||||
* Handle a PeerRequest
|
* Handle a PeerRequest
|
||||||
* @param context the BitswapContext
|
* @param context the BitswapContext
|
||||||
|
@ -235,7 +281,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
|
||||||
}
|
}
|
||||||
// determine if we're connected
|
// determine if we're connected
|
||||||
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
|
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
|
||||||
int need_to_connect = request->cids_we_want->total != 0 || request->cids_they_want->total != 0 || request->blocks_we_want_to_send->total != 0;
|
int need_to_connect = request->cids_we_want->total != 0 || ipfs_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0;
|
||||||
|
|
||||||
// determine if we need to connect
|
// determine if we need to connect
|
||||||
if (need_to_connect) {
|
if (need_to_connect) {
|
||||||
|
@ -247,6 +293,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
|
||||||
// build a message
|
// build a message
|
||||||
struct BitswapMessage* msg = ipfs_bitswap_message_new();
|
struct BitswapMessage* msg = ipfs_bitswap_message_new();
|
||||||
// see if we can fulfill any of their requests. If so, fill in msg->payload
|
// see if we can fulfill any of their requests. If so, fill in msg->payload
|
||||||
|
ipfs_bitswap_peer_request_get_blocks_they_want(context, request);
|
||||||
ipfs_bitswap_message_add_blocks(msg, request->blocks_we_want_to_send);
|
ipfs_bitswap_message_add_blocks(msg, request->blocks_we_want_to_send);
|
||||||
// add requests that we would like
|
// add requests that we would like
|
||||||
ipfs_bitswap_message_add_wantlist_items(msg, request->cids_we_want);
|
ipfs_bitswap_message_add_wantlist_items(msg, request->cids_we_want);
|
||||||
|
|
|
@ -250,7 +250,9 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc
|
||||||
struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i);
|
struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i);
|
||||||
// add this to their queue
|
// add this to their queue
|
||||||
struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current);
|
struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current);
|
||||||
libp2p_utils_vector_add(queueEntry->cids_we_want, cid);
|
struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new();
|
||||||
|
entry->cid = cid;
|
||||||
|
libp2p_utils_vector_add(queueEntry->cids_we_want, entry);
|
||||||
// process this queue via bitswap protocol
|
// process this queue via bitswap protocol
|
||||||
ipfs_bitswap_peer_request_process_entry(context, queueEntry);
|
ipfs_bitswap_peer_request_process_entry(context, queueEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ struct CidEntry {
|
||||||
struct PeerRequest {
|
struct PeerRequest {
|
||||||
pthread_mutex_t request_mutex;
|
pthread_mutex_t request_mutex;
|
||||||
struct Libp2pPeer* peer;
|
struct Libp2pPeer* peer;
|
||||||
// Cid collection of cids that they want. Note cancellations are removed immediately
|
// CidEntry collection of cids that they want
|
||||||
struct Libp2pVector* cids_they_want;
|
struct Libp2pVector* cids_they_want;
|
||||||
// CidEntry collection of cids that we want or are canceling
|
// CidEntry collection of cids that we want or are canceling
|
||||||
struct Libp2pVector* cids_we_want;
|
struct Libp2pVector* cids_we_want;
|
||||||
|
@ -39,6 +39,12 @@ struct PeerRequestQueue {
|
||||||
struct PeerRequestEntry* last;
|
struct PeerRequestEntry* last;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Allocate memory for CidEntry
|
||||||
|
* @returns new CidEntry struct
|
||||||
|
*/
|
||||||
|
struct CidEntry* ipfs_bitswap_peer_request_cid_entry_new();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate resources for a new PeerRequest
|
* Allocate resources for a new PeerRequest
|
||||||
* @returns a new PeerRequest struct or NULL if there was a problem
|
* @returns a new PeerRequest struct or NULL if there was a problem
|
||||||
|
|
Loading…
Reference in a new issue