From e1135fef3bd5249fac9414472087420a7a97f560 Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 27 Jul 2017 08:38:57 -0500 Subject: [PATCH] Beginnings of the multithreaded engine This engine has 2 threads. One to process the request queue, the other to gather up and build peer messages and send them. --- exchange/bitswap/engine.c | 76 ++++++++++++++++ exchange/bitswap/network.c | 8 +- exchange/bitswap/peer_request_queue.c | 22 ++++- exchange/bitswap/wantlist_queue.c | 89 +++++++++++++++++++ include/ipfs/blocks/blockstore.h | 3 + include/ipfs/core/ipfs_node.h | 6 +- include/ipfs/exchange/bitswap/bitswap.h | 3 + include/ipfs/exchange/bitswap/engine.h | 29 ++++++ .../exchange/bitswap/peer_request_queue.h | 21 ++++- include/ipfs/exchange/bitswap/want_manager.h | 2 +- .../ipfs/exchange/bitswap/wantlist_queue.h | 9 ++ test/exchange/test_bitswap_request_queue.h | 3 + 12 files changed, 258 insertions(+), 13 deletions(-) create mode 100644 exchange/bitswap/engine.c create mode 100644 include/ipfs/exchange/bitswap/engine.h diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c new file mode 100644 index 0000000..f86d075 --- /dev/null +++ b/exchange/bitswap/engine.c @@ -0,0 +1,76 @@ +#include "ipfs/exchange/bitswap/engine.h" +#include "ipfs/exchange/bitswap/wantlist_queue.h" +#include "ipfs/exchange/bitswap/peer_request_queue.h" + +/*** + * A separate thread that processes the queue + * @param context the context + */ +void ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { + struct BitswapContext* context = (struct BitswapContext*)ctx; + // the loop + while (!context->bitswap_engine->shutting_down) { + struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist); + if (item != NULL) { + // if there is something on the queue process it. + ipfs_bitswap__wantlist_process_entry(context, item); + } else { + // if there is nothing on the queue, wait... + sleep(2); + } + } +} + +/*** + * A separate thread that processes the queue + * @param context the context + */ +void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { + struct BitswapContext* context = (struct BitswapContext*)ctx; + // the loop + while (!context->bitswap_engine->shutting_down) { + struct BitswapPeerQueueEntry* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue); + if (item != NULL) { + // if there is something on the queue process it. + ipfs_bitswap_peer_request_process_entry(context, item); + } else { + // if there is nothing on the queue, wait... + sleep(2); + } + } +} + +/** + * Starts the bitswap engine that processes queue items. There + * should only be one of these per ipfs instance. + * + * @param context the context + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_engine_start(const struct BitswapContext* context) { + context->bitswap_engine->shutting_down = 0; + + // fire off the threads + if (pthread_create(&context->bitswap_engine->wantlist_processor_thread, NULL, ipfs_bitswap_engine_wantlist_processor_start, (void*)context)) { + return 0; + } + if (pthread_create(&context->bitswap_engine->peer_request_processor_thread, NULL, ipfs_bitswap_engine_peer_request_processor_start, (void*)context)) { + ipfs_bitswap_engine_stop(context); + return 0; + } + return 1; +} + +/*** + * Shut down the engine + * @param context the context + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_engine_stop(const struct BitswapContext* context) { + context->bitswap_engine->shutting_down = 1; + + int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread); + int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread); + + return !error1 && !error2; +} diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 6ce34d1..496fef3 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -14,8 +14,12 @@ /** * We received a BitswapMessage from the network */ +/* +ipfs_bitswap_network_receive_message(struct BitswapContext* context) { + +} +*/ /** - * We want to pop something off the queue to send to a peer. - * This can be a wantlist or blocks + * We want to pop something off the queue */ diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 72dae3f..2c29efd 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -4,6 +4,7 @@ */ #include +#include "libp2p/conn/session.h" #include "ipfs/cid/cid.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" @@ -14,8 +15,9 @@ struct PeerRequest* ipfs_bitswap_peer_request_new() { struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest)); if (request != NULL) { - request->peer_id = 0; - request->cid = NULL; + request->cids = NULL; + request->blocks = NULL; + request->context = NULL; } return request; } @@ -116,9 +118,9 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR if (request != NULL) { struct PeerRequestEntry* current = queue->first; while (current != NULL) { - if (current->current->peer_id == request->peer_id && ipfs_cid_compare(request->cid, current->current->cid) == 0) { + if (libp2p_session_context_compare(current->current->context, request->context) == 0) return current; - } + current = current->next; } } return NULL; @@ -171,3 +173,15 @@ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) { return 1; } +/*** + * Add a block to the appropriate peer's queue + * @param queue the queue + * @param who the session context that identifies the peer + * @param block the block + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block) { + // find the right entry + // add to the block array + return 0; +} diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index 6dd4ec3..0de1460 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -2,6 +2,11 @@ #include "libp2p/conn/session.h" #include "libp2p/utils/vector.h" #include "ipfs/exchange/bitswap/wantlist_queue.h" +#include "ipfs/exchange/bitswap/peer_request_queue.h" + +/** + * Implementation of the WantlistQueue + */ /** * remove this session from the lists of sessions that are looking for this WantListQueueEntry @@ -126,6 +131,7 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new() { entry->cid = NULL; entry->priority = 0; entry->sessionsRequesting = NULL; + entry->attempts = 0; } return entry; } @@ -173,3 +179,86 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const } } +/** + * determine if any of the sessions are referring to the local node + * @param sessions a vector of WantlistSession + * @returns true(1) if any of the sessions are local, false otherwise + */ +int ipfs_bitswap_wantlist_local_request(struct Libp2pVector* sessions) +{ + for(int i = 0; i < sessions->total; i++) { + struct WantListSession* curr = (struct WantListSession*) libp2p_utils_vector_get(sessions, i); + if (curr != NULL && curr->type == WANTLIST_SESSION_TYPE_LOCAL) + return 1; + } + return 0; +} + +/*** + * Attempt to retrieve a block from the local blockstore + * + * @param context the BitswapContext + * @param cid the id to look for + * @param block where to put the results + * @returns true(1) if found, false(0) otherwise + */ +int ipfs_bitswap_wantlist_get_block_locally(struct BitswapContext* context, struct Cid* cid, struct Block** block) { + return context->ipfsNode->blockstore->Get(context->ipfsNode->blockstore->blockstoreContext, cid, block); +} + +/*** + * Retrieve a block. The only information we have is the cid + * + * This will ask the network for who has the file, using the router. + * It will then ask the specific nodes for the file. This method + * does not queue anything. It actually does the work. + * + * @param context the BitswapContext + * @param cid the id of the file + * @param block where to put the results + * @returns true(1) if found, false(0) otherwise + */ +int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid, struct Block** block) { + //TODO: Implement this workhorse of a method + return 0; +} + +/** + * Called by the Bitswap engine, this processes an item on the WantListQueue + * @param context the context + * @param entry the WantListQueueEntry + * @returns true(1) on success, false(0) if not. + */ +int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct WantListQueueEntry* entry) { + int local_request = ipfs_bitswap_wantlist_local_request(entry->sessionsRequesting); + int have_local = ipfs_bitswap_wantlist_get_block_locally(context, entry->cid, &entry->block); + // should we go get it? + if (!local_request && !have_local) { + return 0; + } + if (local_request && !have_local) { + if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid, &entry->block)) { + // if we were unsuccessful in retrieving it, put it back in the queue? + // I don't think so. But I'm keeping this counter here until we have + // a final decision. Maybe lower the priority? + entry->attempts++; + return 0; + } + } + if (entry->block != NULL) { + // okay we have the block. + // fulfill the requests + for(size_t i = 0; i < entry->sessionsRequesting->total; i++) { + // TODO: Review this code. + struct WantListSession* session = (struct WantListSession*) libp2p_utils_vector_get(entry->sessionsRequesting, i); + if (session->type == WANTLIST_SESSION_TYPE_LOCAL) { + context->ipfsNode->exchange->HasBlock(context->ipfsNode->exchange, entry->block); + } else { + struct SessionContext* sessionContext = (struct SessionContext*) session->context; + ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, sessionContext, entry->block); + } + } + + } + return 0; +} diff --git a/include/ipfs/blocks/blockstore.h b/include/ipfs/blocks/blockstore.h index 2b37e87..59cf357 100644 --- a/include/ipfs/blocks/blockstore.h +++ b/include/ipfs/blocks/blockstore.h @@ -16,6 +16,9 @@ struct Blockstore { struct BlockstoreContext* blockstoreContext; int (*Delete)(const struct BlockstoreContext* context, struct Cid* cid); int (*Has)(const struct BlockstoreContext* context, struct Cid* cid); + /** + * Retrieve a block from the blockstore + */ int (*Get)(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block); int (*Put)(const struct BlockstoreContext* context, struct Block* block); }; diff --git a/include/ipfs/core/ipfs_node.h b/include/ipfs/core/ipfs_node.h index cca5af2..69a0495 100644 --- a/include/ipfs/core/ipfs_node.h +++ b/include/ipfs/core/ipfs_node.h @@ -1,11 +1,12 @@ #pragma once +#include "libp2p/peer/peerstore.h" +#include "libp2p/peer/providerstore.h" #include "ipfs/blocks/blockstore.h" +#include "ipfs/exchange/exchange.h" #include "ipfs/repo/config/identity.h" #include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/routing/routing.h" -#include "libp2p/peer/peerstore.h" -#include "libp2p/peer/providerstore.h" enum NodeMode { MODE_OFFLINE, MODE_ONLINE }; @@ -17,6 +18,7 @@ struct IpfsNode { struct ProviderStore* providerstore; struct IpfsRouting* routing; struct Blockstore* blockstore; + struct Exchange* exchange; //struct Pinner pinning; // an interface //struct Mount** mounts; // TODO: Add more here diff --git a/include/ipfs/exchange/bitswap/bitswap.h b/include/ipfs/exchange/bitswap/bitswap.h index b797228..460f86d 100644 --- a/include/ipfs/exchange/bitswap/bitswap.h +++ b/include/ipfs/exchange/bitswap/bitswap.h @@ -7,10 +7,13 @@ #include "ipfs/core/ipfs_node.h" #include "ipfs/exchange/exchange.h" +#include "ipfs/exchange/bitswap/engine.h" struct BitswapContext { struct IpfsNode* ipfsNode; struct WantListQueue* localWantlist; + struct PeerRequestQueue* peerRequestQueue; + struct BitswapEngine* bitswap_engine; }; /** diff --git a/include/ipfs/exchange/bitswap/engine.h b/include/ipfs/exchange/bitswap/engine.h new file mode 100644 index 0000000..cfb9001 --- /dev/null +++ b/include/ipfs/exchange/bitswap/engine.h @@ -0,0 +1,29 @@ +#pragma once + +#include +//#include "ipfs/exchange/bitswap/bitswap.h" we must forward declare here, as BitswapContext has a reference to BitswapEngine + +struct BitswapContext; + +struct BitswapEngine { + int shutting_down; + pthread_t wantlist_processor_thread; + pthread_t peer_request_processor_thread; +}; + +/** + * Starts the bitswap engine that processes queue items. There + * should only be one of these per ipfs instance. + * + * @param context the context + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_engine_start(const struct BitswapContext* context); + +/*** + * Shut down the engine + * + * @param context the context + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_engine_stop(const struct BitswapContext* context); diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h index 8ac72e8..9443cf8 100644 --- a/include/ipfs/exchange/bitswap/peer_request_queue.h +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -1,13 +1,16 @@ /*** - * A queue for requests from remote peers - * NOTE: This should handle multiple threads + * A queue for requests to/from remote peers + * NOTE: This must handle multiple threads */ #include +#include "ipfs/blocks/block.h" struct PeerRequest { - int peer_id; - struct Cid* cid; + pthread_mutex_t request_mutex; + struct SessionContext* context; + struct Libp2pVector* cids; + struct Libp2pVector* blocks; }; struct PeerRequestEntry { @@ -37,6 +40,7 @@ int ipfs_bitswap_peer_request_free(struct PeerRequest* request); /** * Allocate resources for a new queue + * @returns a new PeerRequestQueue */ struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new(); @@ -78,6 +82,15 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* */ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request); +/*** + * Add a block to the appropriate peer's queue + * @param queue the queue + * @param who the session context that identifies the peer + * @param block the block + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block); + /*** * Allocate resources for a PeerRequestEntry struct * @returns the allocated struct or NULL if there was a problem diff --git a/include/ipfs/exchange/bitswap/want_manager.h b/include/ipfs/exchange/bitswap/want_manager.h index 27c5126..c20f305 100644 --- a/include/ipfs/exchange/bitswap/want_manager.h +++ b/include/ipfs/exchange/bitswap/want_manager.h @@ -22,7 +22,7 @@ struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapCon int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, const struct Cid* cid); /*** - * retrieve a block from the WantManager. NOTE: a call to want_manager_received should be done first + * retrieve a block from the WantManager. * @param context the context * @param cid the Cid to get * @param block a pointer to the block that will be allocated diff --git a/include/ipfs/exchange/bitswap/wantlist_queue.h b/include/ipfs/exchange/bitswap/wantlist_queue.h index 646606f..4b5cba0 100644 --- a/include/ipfs/exchange/bitswap/wantlist_queue.h +++ b/include/ipfs/exchange/bitswap/wantlist_queue.h @@ -8,6 +8,7 @@ #include #include "ipfs/cid/cid.h" #include "ipfs/blocks/block.h" +#include "ipfs/exchange/bitswap/bitswap.h" enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_REMOTE }; @@ -22,6 +23,7 @@ struct WantListQueueEntry { // a vector of WantListSessions struct Libp2pVector* sessionsRequesting; struct Block* block; + int attempts; }; struct WantListQueue { @@ -88,3 +90,10 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue */ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const struct WantListSession* b); +/** + * Called by the Bitswap engine, this processes an item on the WantListQueue + * @param context the context + * @param entry the WantListQueueEntry + * @returns true(1) on success, false(0) if not. + */ +int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct WantListQueueEntry* entry); diff --git a/test/exchange/test_bitswap_request_queue.h b/test/exchange/test_bitswap_request_queue.h index d1c45b2..86db8e2 100644 --- a/test/exchange/test_bitswap_request_queue.h +++ b/test/exchange/test_bitswap_request_queue.h @@ -28,6 +28,8 @@ int test_bitswap_peer_request_queue_new() { } int test_bitswap_peer_request_queue_find() { + return 0; + /* int retVal = 0; struct PeerRequestQueue* queue = NULL; struct PeerRequest* request1 = NULL; @@ -72,4 +74,5 @@ int test_bitswap_peer_request_queue_find() { // clean up ipfs_bitswap_peer_request_queue_free(queue); return retVal; + */ }