From a63910e0d7bcbdc441171211b7524fa3d04ba6a2 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 24 Jul 2017 09:09:22 -0500 Subject: [PATCH] Adding bitswap request queue --- cid/cid.c | 24 +++ exchange/bitswap/Makefile | 2 +- exchange/bitswap/bitswap.c | 6 + exchange/bitswap/peer_request_queue.c | 165 ++++++++++++++++++ include/ipfs/cid/cid.h | 8 + .../exchange/bitswap/peer_request_queue.h | 92 ++++++++++ test/exchange/test_bitswap_request_queue.h | 15 ++ test/testit.c | 3 + 8 files changed, 314 insertions(+), 1 deletion(-) create mode 100644 exchange/bitswap/peer_request_queue.c create mode 100644 include/ipfs/exchange/bitswap/peer_request_queue.h create mode 100644 test/exchange/test_bitswap_request_queue.h diff --git a/cid/cid.c b/cid/cid.c index 51f277e..d9d9d2d 100644 --- a/cid/cid.c +++ b/cid/cid.c @@ -248,3 +248,27 @@ int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Ci return 1; } + +/** + * Compare two cids + * @param a side A + * @param b side B + * @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal + */ +int ipfs_cid_compare(struct Cid* a, struct Cid* b) { + if (a->version != b->version) { + return b->version - a->version; + } + if (a->codec != b->codec) { + return ((int)b->codec - (int)a->codec); + } + if (a->hash_length != b->hash_length) { + return b->hash_length - a->hash_length; + } + for(size_t i = 0; i < a->hash_length; i++) { + if (a->hash[i] != b->hash[i]) { + return ((int)b->hash[i] - (int)a->hash[i]); + } + } + return 0; +} diff --git a/exchange/bitswap/Makefile b/exchange/bitswap/Makefile index a4c268c..513c40b 100644 --- a/exchange/bitswap/Makefile +++ b/exchange/bitswap/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = bitswap.o message.o network.o +OBJS = bitswap.o message.o network.o peer_request_queue.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index fa5b304..d4135a0 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -70,6 +70,10 @@ int ipfs_bitswap_close(void* exchangeContext) { /** * Implements the Exchange->HasBlock method + * Some notes from the GO version say that this is normally called by user + * interaction (i.e. user added a file). + * But this does not make sense right now, as the GO code looks like it + * adds the block to the blockstore. This still has to be sorted. */ int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) { //TODO: Implement this method @@ -80,6 +84,8 @@ int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) { /** * Implements the Exchange->GetBlock method + * We're asking for this method to get the block from peers. Perhaps this should be + * taking in a pointer to a callback, as this could take a while (or fail). */ int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) { // TODO: Implement this method diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c new file mode 100644 index 0000000..9467159 --- /dev/null +++ b/exchange/bitswap/peer_request_queue.c @@ -0,0 +1,165 @@ +/*** + * A queue for requests from remote peers + * NOTE: This should handle multiple threads + */ + +#include +#include "ipfs/cid/cid.h" +#include "ipfs/exchange/bitswap/peer_request_queue.h" + +/** + * Allocate resources for a new PeerRequest + * @returns a new PeerRequest struct or NULL if there was a problem + */ +struct PeerRequest* ipfs_bitswap_peer_request_new() { + struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest)); + if (request != NULL) { + request->peer_id = 0; + request->cid = NULL; + } + return request; +} + +/** + * Free resources from a PeerRequest + * @param request the request to free + * @returns true(1) + */ +int ipfs_bitswap_peer_request_free(struct PeerRequest* request) { + free(request); + return 1; +} + +/** + * Allocate resources for a new queue + */ +struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new() { + struct PeerRequestQueue* queue = malloc(sizeof(struct PeerRequestQueue)); + queue->first = NULL; + queue->last = NULL; + return queue; +} + +/** + * Free all resources related to the queue + * @param queue the queue + * @returns true(1) + */ +int ipfs_bitswap_peer_request_queue_free(struct PeerRequestQueue* queue) { + pthread_mutex_lock(&queue->queue_mutex); + struct PeerRequestEntry* current = queue->last; + while (current != NULL) { + ipfs_bitswap_peer_request_entry_free(current); + current = current->prior; + } + pthread_mutex_unlock(&queue->queue_mutex); + return 1; +} + +/** + * Adds a peer request to the end of the queue + * @param queue the queue + * @param request the request + * @returns true(1) on success, otherwise false + */ +int ipfs_bitswap_peer_request_queue_add(struct PeerRequestQueue* queue, struct PeerRequest* request) { + if (request != NULL) { + struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_entry_new(); + entry->current = request; + pthread_mutex_lock(&queue->queue_mutex); + entry->prior = queue->last; + queue->last = entry; + pthread_mutex_unlock(&queue->queue_mutex); + return 1; + } + return 0; +} + +/** + * Removes a peer request from the queue, no mather where it is + * @param queue the queue + * @param request the request + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_peer_request_queue_remove(struct PeerRequestQueue* queue, struct PeerRequest* request) { + if (request != NULL) { + struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_queue_find_entry(queue, request); + if (entry != NULL) { + pthread_mutex_lock(&queue->queue_mutex); + // remove the entry's link, and hook prior and next together + entry->prior->next = entry->next; + entry->prior = NULL; + entry->next = NULL; + ipfs_bitswap_peer_request_entry_free(entry); + pthread_mutex_unlock(&queue->queue_mutex); + return 1; + } + } + return 0; +} + +/** + * Finds a PeerRequestEntry that contains the specified PeerRequest + * @param queue the queue to look through + * @param request what we're looking for + * @returns the PeerRequestEntry or NULL if not found + */ +struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request) { + if (request != NULL) { + struct PeerRequestEntry* current = queue->first; + while (current != NULL) { + if (current->current->peer_id == request->peer_id && ipfs_cid_compare(request->cid, current->current->cid) == 0) { + return current; + } + } + } + return NULL; +} + + +/** + * Pull a PeerRequest off the queue + * @param queue the queue + * @returns the PeerRequest that should be handled next, or NULL if the queue is empty + */ +struct PeerRequest* ipfs_bitswap_peer_request_pop(struct PeerRequestQueue* queue) { + struct PeerRequest* retVal = NULL; + if (queue != NULL) { + pthread_mutex_lock(&queue->queue_mutex); + struct PeerRequestEntry* entry = queue->first; + retVal = entry->current; + queue->first = queue->first->next; + pthread_mutex_unlock(&queue->queue_mutex); + ipfs_bitswap_peer_request_entry_free(entry); + } + return retVal; +} + +/*** + * Allocate resources for a PeerRequestEntry struct + * @returns the allocated struct or NULL if there was a problem + */ +struct PeerRequestEntry* ipfs_bitswap_peer_request_entry_new() { + struct PeerRequestEntry* entry = (struct PeerRequestEntry*) malloc(sizeof(struct PeerRequestEntry)); + if (entry != NULL) { + entry->current = NULL; + entry->next = NULL; + entry->prior = NULL; + } + return entry; +} + +/** + * Frees resources allocated + * NOTE: This does not free the embedded PeerRequest (should it?) + * @param entry the PeerRequestEntry to free + * @returns true(1) + */ +int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) { + entry->next = NULL; + entry->prior = NULL; + entry->current = NULL; + free(entry); + return 1; +} + diff --git a/include/ipfs/cid/cid.h b/include/ipfs/cid/cid.h index 7d99772..2feec3a 100644 --- a/include/ipfs/cid/cid.h +++ b/include/ipfs/cid/cid.h @@ -116,4 +116,12 @@ int ipfs_cid_set_len (struct CidSet *set); unsigned char **ipfs_cid_set_keys (struct CidSet *set); int ipfs_cid_set_foreach (struct CidSet *set, int (*func)(struct Cid *)); +/** + * Compare two cids + * @param a side A + * @param b side B + * @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal + */ +int ipfs_cid_compare(struct Cid* a, struct Cid* b); + #endif diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h new file mode 100644 index 0000000..8ac72e8 --- /dev/null +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -0,0 +1,92 @@ +/*** + * A queue for requests from remote peers + * NOTE: This should handle multiple threads + */ + +#include + +struct PeerRequest { + int peer_id; + struct Cid* cid; +}; + +struct PeerRequestEntry { + struct PeerRequestEntry* prior; + struct PeerRequest* current; + struct PeerRequestEntry* next; +}; + +struct PeerRequestQueue { + pthread_mutex_t queue_mutex; + struct PeerRequestEntry* first; + struct PeerRequestEntry* last; +}; + +/** + * Allocate resources for a new PeerRequest + * @returns a new PeerRequest struct or NULL if there was a problem + */ +struct PeerRequest* ipfs_bitswap_peer_request_new(); + +/** + * Free resources from a PeerRequest + * @param request the request to free + * @returns true(1) + */ +int ipfs_bitswap_peer_request_free(struct PeerRequest* request); + +/** + * Allocate resources for a new queue + */ +struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new(); + +/** + * Free all resources related to the queue + * @param queue the queue + * @returns true(1) + */ +int ipfs_bitswap_peer_request_queue_free(struct PeerRequestQueue* queue); + +/** + * Adds a peer request to the end of the queue + * @param queue the queue + * @param request the request + * @returns true(1) on success, otherwise false + */ +int ipfs_bitswap_peer_request_queue_add(struct PeerRequestQueue* queue, struct PeerRequest* request); + +/** + * Removes a peer request from the queue, no mather where it is + * @param queue the queue + * @param request the request + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_peer_request_quque_remove(struct PeerRequestQueue* queue, struct PeerRequest* request); + +/** + * Pull a PeerRequest off the queue + * @param queue the queue + * @returns the PeerRequest that should be handled next. + */ +struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* queue); + +/** + * Finds a PeerRequestEntry that contains the specified PeerRequest + * @param queue the queue + * @param request what we're looking for + * @returns the PeerRequestEntry or NULL if not found + */ +struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request); + +/*** + * Allocate resources for a PeerRequestEntry struct + * @returns the allocated struct or NULL if there was a problem + */ +struct PeerRequestEntry* ipfs_bitswap_peer_request_entry_new(); + +/** + * Frees resources allocated + * @param entry the PeerRequestEntry to free + * @returns true(1) + */ +int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry); diff --git a/test/exchange/test_bitswap_request_queue.h b/test/exchange/test_bitswap_request_queue.h new file mode 100644 index 0000000..82cfefb --- /dev/null +++ b/test/exchange/test_bitswap_request_queue.h @@ -0,0 +1,15 @@ +#include +#include "ipfs/exchange/bitswap/peer_request_queue.h" + +/*** + * Create a queue, do some work, free the queue, make sure valgrind likes it. + */ +int test_bitswap_peer_request_queue_new() { + // create a queue + struct PeerRequestQueue* queue = ipfs_bitswap_peer_request_queue_new(); + struct PeerRequest* request = ipfs_bitswap_peer_request_new(); + ipfs_bitswap_peer_request_queue_add(queue, request); + // clean up + ipfs_bitswap_peer_request_queue_free(queue); + return 1; +} diff --git a/test/testit.c b/test/testit.c index 1e06ada..f472098 100644 --- a/test/testit.c +++ b/test/testit.c @@ -1,6 +1,7 @@ #include "cid/test_cid.h" #include "cmd/ipfs/test_init.h" #include "exchange/test_bitswap.h" +#include "exchange/test_bitswap_request_queue.h" #include "flatfs/test_flatfs.h" #include "merkledag/test_merkledag.h" #include "node/test_node.h" @@ -34,6 +35,7 @@ int testit(const char* name, int (*func)(void)) { const char* names[] = { "test_bitswap_new_free", + "test_bitswap_peer_request_queue_new", "test_cid_new_free", "test_cid_cast_multihash", "test_cid_cast_non_multihash", @@ -85,6 +87,7 @@ const char* names[] = { int (*funcs[])(void) = { test_bitswap_new_free, + test_bitswap_peer_request_queue_new, test_cid_new_free, test_cid_cast_multihash, test_cid_cast_non_multihash,