Attempting to fulfill remote requests

yamux
jmjatlanta 2017-07-31 17:59:51 -05:00
parent fa7a6826b1
commit 836fb5387b
7 changed files with 101 additions and 38 deletions

View File

@ -30,7 +30,7 @@ int ipfs_bitswap_engine_free(struct BitswapEngine* engine) {
}
/***
* A separate thread that processes the queue
* A separate thread that processes the queue of local requests
* @param context the context
*/
void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
@ -50,7 +50,7 @@ void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
}
/***
* A separate thread that processes the queue
* A separate thread that processes the queue of remote requests
* @param context the context
*/
void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {

View File

@ -682,6 +682,8 @@ int ipfs_bitswap_message_add_blocks(struct BitswapMessage* message, struct Libp2
if (message == NULL)
return 0;
if (blocks == NULL || blocks->total == 0)
return 0;
if (message->payload == NULL) {
message->payload = libp2p_utils_vector_new(blocks->total);
if (message->payload == NULL)

View File

@ -43,6 +43,28 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru
return 1;
}
/***
* Remove a cid from the queue
* @param cids the vector of cids
* @param cid the cid to remove
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection, struct Cid* cid) {
if (collection == NULL || cid == NULL)
return 0;
for(int i = 0; i < collection->total; collection++) {
const struct Cid* current = (const struct Cid*)libp2p_utils_vector_get(collection, i);
if (ipfs_cid_compare(current, cid) == 0) {
libp2p_utils_vector_delete(collection, i);
return 1;
}
}
return 0;
}
/***
* Handle a raw incoming bitswap message from the network
* @param node us
@ -116,8 +138,10 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc
ipfs_cid_free(cid);
return 0;
}
// add the cid to their queue
libp2p_utils_vector_add(queueEntry->current->cids, cid);
if (entry->cancel)
ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid);
else
libp2p_utils_vector_add(queueEntry->current->cids_they_want, cid);
}
}
return 1;

View File

@ -15,21 +15,32 @@
* @returns a new PeerRequest struct or NULL if there was a problem
*/
struct PeerRequest* ipfs_bitswap_peer_request_new() {
int retVal = 0;
struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest));
if (request != NULL) {
request->cids = libp2p_utils_vector_new(1);
if (request->cids == NULL) {
free(request);
return NULL;
}
request->blocks = libp2p_utils_vector_new(1);
if (request->blocks == NULL) {
libp2p_utils_vector_free(request->cids);
free(request);
return NULL;
}
request->cids_they_want = libp2p_utils_vector_new(1);
if (request->cids_they_want == NULL)
goto exit;
request->cids_we_want = libp2p_utils_vector_new(1);
if (request->cids_we_want == NULL)
goto exit;
request->blocks_we_want_to_send = libp2p_utils_vector_new(1);
if (request->blocks_we_want_to_send == NULL)
goto exit;
request->peer = NULL;
}
retVal = 1;
exit:
if (retVal == 0 && request != NULL) {
if (request->blocks_we_want_to_send != NULL)
libp2p_utils_vector_free(request->blocks_we_want_to_send);
if (request->cids_they_want != NULL)
libp2p_utils_vector_free(request->cids_they_want);
if (request->cids_we_want != NULL)
libp2p_utils_vector_free(request->cids_we_want);
free(request);
request = NULL;
}
return request;
}
@ -194,9 +205,14 @@ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) {
* @param block the block
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block) {
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct Libp2pPeer* who, struct Block* block) {
// find the right entry
// add to the block array
struct PeerRequest* entry = ipfs_peer_request_queue_find_peer(queue, who);
if (entry != NULL)
{
// add to the block array
libp2p_utils_vector_add(entry->blocks_we_want_to_send, block);
}
return 0;
}
@ -207,9 +223,19 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct
* @returns true(1) on succes, otherwise false(0)
*/
int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) {
// determine if we have enough information to continue
if (request == NULL)
return 0;
if (request->peer == NULL)
return 0;
if (!request->peer->is_local) {
if (request->peer->connection_type != CONNECTION_TYPE_CONNECTED)
if (request->peer->addr_head == NULL || request->peer->addr_head->item == NULL)
return 0;
}
// determine if we're connected
int connected = request->peer == NULL || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
int need_to_connect = request->cids != NULL;
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
int need_to_connect = request->cids_we_want->total != 0 || request->cids_they_want->total != 0 || request->blocks_we_want_to_send->total != 0;
// determine if we need to connect
if (need_to_connect) {
@ -220,10 +246,10 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
if (connected) {
// build a message
struct BitswapMessage* msg = ipfs_bitswap_message_new();
// add requests
ipfs_bitswap_message_add_wantlist_items(msg, request->cids);
// add blocks
ipfs_bitswap_message_add_blocks(msg, request->blocks);
// see if we can fulfill any of their requests. If so, fill in msg->payload
ipfs_bitswap_message_add_blocks(msg, request->blocks_we_want_to_send);
// add requests that we would like
ipfs_bitswap_message_add_wantlist_items(msg, request->cids_we_want);
// send message
if (ipfs_bitswap_network_send_message(context, request->peer, msg))
return 1;

View File

@ -197,9 +197,9 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const
// it's local, there should be only 1
return 0;
} else {
struct SessionContext* contextA = (struct SessionContext*)a->context;
struct SessionContext* contextB = (struct SessionContext*)b->context;
return libp2p_session_context_compare(contextA, contextB);
struct Libp2pPeer* contextA = (struct Libp2pPeer*)a->context;
struct Libp2pPeer* contextB = (struct Libp2pPeer*)b->context;
return libp2p_peer_compare(contextA, contextB);
}
}
@ -250,11 +250,8 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc
struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i);
// add this to their queue
struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current);
libp2p_utils_vector_add(queueEntry->cids, cid);
// process this queue
// NOTE: We need to ask the remotes via bitswap, and wait for a response before returning
// there will need to be some fancy stuff to know when we get it back so that this method
// can return with the block
libp2p_utils_vector_add(queueEntry->cids_we_want, cid);
// process this queue via bitswap protocol
ipfs_bitswap_peer_request_process_entry(context, queueEntry);
}
return 1;
@ -263,7 +260,10 @@ int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struc
}
/**
* Called by the Bitswap engine, this processes an item on the WantListQueue
* Called by the Bitswap engine, this processes an item on the WantListQueue. This is called when
* we want a file locally from a remote source. Send a message immediately, adding in stuff that
* perhaps the remote source wanted.
*
* @param context the context
* @param entry the WantListQueueEntry
* @returns true(1) on success, false(0) if not.
@ -293,8 +293,8 @@ int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct W
if (session->type == WANTLIST_SESSION_TYPE_LOCAL) {
context->ipfsNode->exchange->HasBlock(context->ipfsNode->exchange, entry->block);
} else {
struct SessionContext* sessionContext = (struct SessionContext*) session->context;
ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, sessionContext, entry->block);
struct Libp2pPeer* peer = (struct Libp2pPeer*) session->context;
ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, peer, entry->block);
}
}

View File

@ -9,11 +9,22 @@
#include "ipfs/exchange/bitswap/bitswap.h"
#include "ipfs/blocks/block.h"
struct CidEntry {
struct Cid* cid;
int cancel;
};
struct PeerRequest {
pthread_mutex_t request_mutex;
struct Libp2pPeer* peer;
struct Libp2pVector* cids;
struct Libp2pVector* blocks;
// Cid collection of cids that they want. Note cancellations are removed immediately
struct Libp2pVector* cids_they_want;
// CidEntry collection of cids that we want or are canceling
struct Libp2pVector* cids_we_want;
// blocks to send to them
struct Libp2pVector* blocks_we_want_to_send;
// blocks they sent us are processed immediately, so no queue necessary
// although the cid can go in cids_we_want again, with a cancel flag
};
struct PeerRequestEntry {
@ -92,7 +103,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR
* @param block the block
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block);
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct Libp2pPeer* who, struct Block* block);
/***
* Allocate resources for a PeerRequestEntry struct

View File

@ -14,7 +14,7 @@ enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_RE
struct WantListSession {
enum WantListSessionType type;
void* context; // either an IpfsNode (local) or a SessionContext (remote)
void* context; // either an IpfsNode (local) or a Libp2pPeer (remote)
};
struct WantListQueueEntry {