diff --git a/blocks/block.c b/blocks/block.c index 832a6a9..ec8faef 100644 --- a/blocks/block.c +++ b/blocks/block.c @@ -99,7 +99,7 @@ int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t exit: if (retVal == 0) { - ipfs_blocks_block_free(*block); + ipfs_block_free(*block); } if (temp_buffer != NULL) free(temp_buffer); @@ -155,7 +155,7 @@ int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, stru * @param block the block to free * @returns true(1) on success */ -int ipfs_blocks_block_free(struct Block* block) { +int ipfs_block_free(struct Block* block) { if (block != NULL) { ipfs_cid_free(block->cid); if (block->data != NULL) @@ -164,3 +164,17 @@ int ipfs_blocks_block_free(struct Block* block) { } return 1; } + +/*** + * Make a copy of a block + * @param original the original + * @returns a new Block that is a copy + */ +struct Block* ipfs_block_copy(struct Block* original) { + struct Block* copy = ipfs_blocks_block_new(); + copy->data_length = original->data_length; + copy->data = (unsigned char*) malloc(original->data_length); + memcpy(copy->data, original->data, original->data_length); + copy->cid = ipfs_cid_copy(original->cid); + return copy; +} diff --git a/cid/cid.c b/cid/cid.c index 3e4126b..622cf3e 100644 --- a/cid/cid.c +++ b/cid/cid.c @@ -137,6 +137,23 @@ int ipfs_cid_free(struct Cid* cid) { return 1; } +/*** + * Make a copy of a Cid + * @param original the original + * @returns a copy of the original + */ +struct Cid* ipfs_cid_copy(const struct Cid* original) { + struct Cid* copy = (struct Cid*) malloc(sizeof(struct Cid)); + if (copy != NULL) { + copy->codec = original->codec; + copy->version = original->version; + copy->hash_length = original->hash_length; + copy->hash = (unsigned char*) malloc(original->hash_length); + memcpy(copy->hash, original->hash, original->hash_length); + } + return copy; +} + /*** * Fill a Cid struct based on a base 58 encoded multihash * @param incoming the string @@ -259,7 +276,7 @@ int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Ci * @param b side B * @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal */ -int ipfs_cid_compare(struct Cid* a, struct Cid* b) { +int ipfs_cid_compare(const struct Cid* a, const struct Cid* b) { if (a == NULL && b == NULL) return 0; if (a != NULL && b == NULL) diff --git a/exchange/bitswap/Makefile b/exchange/bitswap/Makefile index 01337f0..e4c3441 100644 --- a/exchange/bitswap/Makefile +++ b/exchange/bitswap/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o +OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 8e637e6..9456901 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -107,15 +107,18 @@ int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block* int timeout = 10; int waitSecs = 1; int timeTaken = 0; - if (ipfs_bitswap_want_manager_add(bitswapContext, cid)) { + struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid); + if (want_entry != NULL) { // loop waiting for it to fill while(1) { - if (ipfs_bitswap_want_manager_received(bitswapContext, cid)) { - if (ipfs_bitswap_want_manager_get_block(bitswapContext, cid, block)) { - // NOTE: this should use reference counting - ipfs_bitswap_want_manager_remove(bitswapContext, cid); - return 1; + if (want_entry->block != NULL) { + *block = ipfs_block_copy(want_entry->block); + // error or not, we no longer need the block (decrement reference count) + ipfs_bitswap_want_manager_remove(bitswapContext, cid); + if (*block == NULL) { + return 0; } + return 1; } //TODO: This is a busy-loop. Find another way. timeTaken += waitSecs; diff --git a/exchange/bitswap/message.c b/exchange/bitswap/message.c index ee9b979..3982e3f 100644 --- a/exchange/bitswap/message.c +++ b/exchange/bitswap/message.c @@ -439,7 +439,7 @@ int ipfs_bitswap_message_free(struct BitswapMessage* message) { for(int i = 0; i < message->blocks->total; i++) { // free each item in the vector struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->blocks, i); - ipfs_blocks_block_free(entry); + ipfs_block_free(entry); } libp2p_utils_vector_free(message->blocks); } @@ -447,7 +447,7 @@ int ipfs_bitswap_message_free(struct BitswapMessage* message) { for(int i = 0; i < message->payload->total; i++) { // free each item in the vector struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->payload, i); - ipfs_blocks_block_free(entry); + ipfs_block_free(entry); } libp2p_utils_vector_free(message->payload); } diff --git a/exchange/bitswap/want_manager.c b/exchange/bitswap/want_manager.c index 0ad334b..3fe8952 100644 --- a/exchange/bitswap/want_manager.c +++ b/exchange/bitswap/want_manager.c @@ -1,16 +1,15 @@ #include "ipfs/exchange/bitswap/want_manager.h" -#include "ipfs/exchange/bitswap/wantlist.h" +#include "ipfs/exchange/bitswap/wantlist_queue.h" /*** * Add a Cid to the local wantlist * @param context the context * @param cid the Cid - * @returns true(1) on success, false(0) otherwise + * @returns the added entry */ -int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid) { - // first see if it is already here - // increment the reference count - return 0; +struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid) { + // add if not there, and increment reference count + return ipfs_bitswap_wantlist_queue_add(context->localWantlist, cid); } /*** @@ -21,7 +20,11 @@ int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const st */ int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, const struct Cid* cid) { // find the entry + struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, cid); // check the status + if (entry != NULL && entry->block != NULL) { + return 1; + } return 0; } @@ -33,8 +36,14 @@ int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, con * @returns true(1) on success, false(0) otherwise */ int ipfs_bitswap_want_manager_get_block(const struct BitswapContext* context, const struct Cid* cid, struct Block** block) { - // find the entry - // return a copy of the block + struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, cid); + if (entry != NULL && entry->block != NULL) { + // return a copy of the block + *block = ipfs_block_copy(entry->block); + if ( (*block) != NULL) { + return 1; + } + } return 0; } @@ -48,5 +57,5 @@ int ipfs_bitswap_want_manager_get_block(const struct BitswapContext* context, co int ipfs_bitswap_want_manager_remove(const struct BitswapContext* context, const struct Cid* cid) { // decrement the reference count // if it is zero, remove the entry (lock first) - return 0; + return ipfs_bitswap_wantlist_queue_remove(context->localWantlist, cid); } diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c new file mode 100644 index 0000000..9bf931a --- /dev/null +++ b/exchange/bitswap/wantlist_queue.c @@ -0,0 +1,140 @@ +#include +#include "libp2p/utils/vector.h" +#include "ipfs/exchange/bitswap/wantlist_queue.h" + +/*** + * Initialize a new Wantlist (there should only be 1 per instance) + * @returns a new WantList + */ +struct WantListQueue* ipfs_bitswap_wantlist_queue_new() { + struct WantListQueue* wantlist = (struct WantListQueue*) malloc(sizeof(struct WantListQueue)); + if (wantlist != NULL) { + pthread_mutex_init(&wantlist->wantlist_mutex, NULL); + wantlist->queue = NULL; + } + return wantlist; +} + +/*** + * Deallocate resources of a WantList + * @param wantlist the WantList + * @returns true(1) + */ +int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist) { + if (wantlist != NULL) { + if (wantlist->queue != NULL) { + libp2p_utils_vector_free(wantlist->queue); + wantlist->queue = NULL; + } + free(wantlist); + } + return 1; +} + +/*** + * Add a Cid to the WantList + * @param wantlist the WantList to add to + * @param cid the Cid to add + * @returns the correct WantListEntry or NULL if error + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid) { + struct WantListQueueEntry* entry = NULL; + if (wantlist != NULL) { + pthread_mutex_lock(&wantlist->wantlist_mutex); + if (wantlist->queue == NULL) { + wantlist->queue = libp2p_utils_vector_new(1); + } + entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid); + if (entry == NULL) { + // create a new one + entry = ipfs_bitswap_wantlist_queue_entry_new(); + entry->cid = ipfs_cid_copy(cid); + entry->priority = 1; + //TODO: find a way to pass session information + //entry->sessionsRequesting; + } else { + //TODO: find a way to pass sessioninformation + //entry->sessionRequesting; + } + libp2p_utils_vector_add(wantlist->queue, entry); + pthread_mutex_unlock(&wantlist->wantlist_mutex); + } + return entry; +} + +/*** + * Remove (decrement the counter) a Cid from the WantList + * @param wantlist the WantList + * @param cid the Cid + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid) { + //TODO: remove if counter is <= 0 + if (wantlist != NULL) { + struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid); + if (entry != NULL) { + //TODO: find a way to decrement + return 1; + } + } + return 0; +} + +/*** + * Find a Cid in the WantList + * @param wantlist the list + * @param cid the Cid + * @returns the WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid) { + for (size_t i = 0; i < wantlist->queue->total; i++) { + struct WantListQueueEntry* entry = (struct WantListQueueEntry*) libp2p_utils_vector_get(wantlist->queue, i); + if (entry == NULL) { + //TODO: something went wrong. This should be logged. + return NULL; + } + if (ipfs_cid_compare(cid, entry->cid) == 0) { + return entry; + } + } + return NULL; +} + +/*** + * Initialize a WantListQueueEntry + * @returns a new WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new() { + struct WantListQueueEntry* entry = (struct WantListQueueEntry*) malloc(sizeof(struct WantListQueueEntry)); + if (entry != NULL) { + entry->block = NULL; + entry->cid = NULL; + entry->priority = 0; + entry->sessionsRequesting = NULL; + } + return entry; +} + +/*** + * Free a WantListQueueENtry struct + * @param entry the struct + * @returns true(1) + */ +int ipfs_bitswap_wantlist_queue_entry_free(struct WantListQueueEntry* entry) { + if (entry != NULL) { + if (entry->block != NULL) { + ipfs_block_free(entry->block); + entry->block = NULL; + } + if (entry->cid != NULL) { + ipfs_cid_free(entry->cid); + entry->cid = NULL; + } + if (entry->sessionsRequesting != NULL) { + libp2p_utils_vector_free(entry->sessionsRequesting); + entry->sessionsRequesting = NULL; + } + free(entry); + } + return 1; +} diff --git a/include/ipfs/blocks/block.h b/include/ipfs/blocks/block.h index c5cc679..2ff176b 100644 --- a/include/ipfs/blocks/block.h +++ b/include/ipfs/blocks/block.h @@ -27,7 +27,7 @@ int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, stru * @param block the block to free * @returns true(1) on success */ -int ipfs_blocks_block_free(struct Block* block); +int ipfs_block_free(struct Block* block); /** * Determine the approximate size of an encoded block @@ -55,4 +55,11 @@ int ipfs_blocks_block_protobuf_encode(const struct Block* block, unsigned char* */ int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t buffer_length, struct Block** block); +/*** + * Make a copy of a block + * @param original the original + * @returns a new Block that is a copy + */ +struct Block* ipfs_block_copy(struct Block* original); + #endif diff --git a/include/ipfs/cid/cid.h b/include/ipfs/cid/cid.h index e8bc96a..e17f86b 100644 --- a/include/ipfs/cid/cid.h +++ b/include/ipfs/cid/cid.h @@ -81,6 +81,13 @@ int ipfs_cid_new(int version, const unsigned char* hash, size_t hash_length, con */ int ipfs_cid_free(struct Cid* cid); +/*** + * Make a copy of a Cid + * @param original the original + * @returns a copy of the original + */ +struct Cid* ipfs_cid_copy(const struct Cid* original); + /*** * Fill a Cid struct based on a base 58 encoded string * @param incoming the string @@ -122,6 +129,6 @@ int ipfs_cid_set_foreach (struct CidSet *set, int (*func)(struct Cid *)); * @param b side B * @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal */ -int ipfs_cid_compare(struct Cid* a, struct Cid* b); +int ipfs_cid_compare(const struct Cid* a, const struct Cid* b); #endif diff --git a/include/ipfs/exchange/bitswap/bitswap.h b/include/ipfs/exchange/bitswap/bitswap.h index 8667f2f..6400b9a 100644 --- a/include/ipfs/exchange/bitswap/bitswap.h +++ b/include/ipfs/exchange/bitswap/bitswap.h @@ -11,7 +11,7 @@ struct BitswapContext { struct SessionContext* sessionContext; struct IpfsNode* ipfsNode; - struct Wantlist* localWantlist; + struct WantListQueue* localWantlist; }; /** diff --git a/include/ipfs/exchange/bitswap/want_manager.h b/include/ipfs/exchange/bitswap/want_manager.h index 96faace..bd2520a 100644 --- a/include/ipfs/exchange/bitswap/want_manager.h +++ b/include/ipfs/exchange/bitswap/want_manager.h @@ -3,14 +3,15 @@ #include "ipfs/blocks/block.h" #include "ipfs/cid/cid.h" #include "ipfs/exchange/bitswap/bitswap.h" +#include "wantlist_queue.h" /*** * Add a Cid to the local wantlist * @param context the context * @param cid the Cid - * @returns true(1) on success, false(0) otherwise + * @returns the added WantListQueueEntry */ -int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid); +struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid); /*** * Checks to see if the requested block has been received diff --git a/include/ipfs/exchange/bitswap/wantlist.h b/include/ipfs/exchange/bitswap/wantlist.h deleted file mode 100644 index 5d1e4f8..0000000 --- a/include/ipfs/exchange/bitswap/wantlist.h +++ /dev/null @@ -1,17 +0,0 @@ -/** - * This is a list of requests from a peer. - * NOTE: This tracks who wants what. If 2 peers want the same file, - * there will be 1 WantListEntry in the WantList. There will be 2 entries in - * WantListEntry.sessionsRequesting. - */ - -struct WantListEntry { - unsigned char* cid; - size_t cid_length; - int priority; - struct Libp2pVector* sessionsRequesting; -}; - -struct WantList { - struct Libp2pVector* set; -}; diff --git a/include/ipfs/exchange/bitswap/wantlist_queue.h b/include/ipfs/exchange/bitswap/wantlist_queue.h new file mode 100644 index 0000000..810f234 --- /dev/null +++ b/include/ipfs/exchange/bitswap/wantlist_queue.h @@ -0,0 +1,83 @@ +#pragma once +/** + * This is a list of requests from a peer (including locally). + * NOTE: This tracks who wants what. If 2 peers want the same file, + * there will be 1 WantListEntry in the WantList. There will be 2 entries in + * WantListEntry.sessionsRequesting. + */ +#include +#include "ipfs/cid/cid.h" +#include "ipfs/blocks/block.h" + +enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_REMOTE }; + +struct WantListSession { + enum WantListSessionType type; + void* context; +}; + +struct WantListQueueEntry { + struct Cid* cid; + int priority; + // a vector of WantListSessions + struct Libp2pVector* sessionsRequesting; + struct Block* block; +}; + +struct WantListQueue { + pthread_mutex_t wantlist_mutex; + // a vector of WantListEntries + struct Libp2pVector* queue; +}; + +/*** + * Initialize a WantListQueueEntry + * @returns a new WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new(); + +/*** + * Remove resources, freeing a WantListQueueEntry + * @param entry the WantListQueueEntry + * @returns true(1) + */ +int ipfs_bitswap_wantlist_queue_entry_free(struct WantListQueueEntry* entry); + +/*** + * Initialize a new Wantlist (there should only be 1 per instance) + * @returns a new WantList + */ +struct WantListQueue* ipfs_bitswap_wantlist_queue_new(); + +/*** + * Deallocate resources of a WantList + * @param wantlist the WantList + * @returns true(1) + */ +int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist); + +/*** + * Add a Cid to the WantList + * @param wantlist the WantList to add to + * @param cid the Cid to add + * @returns the correct WantListEntry or NULL if error + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid); + +/*** + * Remove (decrement the counter) a Cid from the WantList + * @param wantlist the WantList + * @param cid the Cid + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid); + +/*** + * Find a Cid in the WantList + * @param wantlist the list + * @param cid the Cid + * @returns the WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid); + + diff --git a/test/repo/test_repo_fsrepo.h b/test/repo/test_repo_fsrepo.h index eee35a8..e588e8a 100644 --- a/test/repo/test_repo_fsrepo.h +++ b/test/repo/test_repo_fsrepo.h @@ -61,7 +61,7 @@ int test_repo_fsrepo_write_read_block() { retVal = ipfs_repo_fsrepo_block_write(block, fs_repo); if (retVal == 0) { ipfs_repo_fsrepo_free(fs_repo); - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } @@ -70,7 +70,7 @@ int test_repo_fsrepo_write_read_block() { retVal = ipfs_repo_fsrepo_block_read(block->cid->hash, block->cid->hash_length, &results, fs_repo); if (retVal == 0) { ipfs_repo_fsrepo_free(fs_repo); - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } @@ -90,7 +90,7 @@ int test_repo_fsrepo_write_read_block() { } ipfs_repo_fsrepo_free(fs_repo); - ipfs_blocks_block_free(block); - ipfs_blocks_block_free(results); + ipfs_block_free(block); + ipfs_block_free(results); return retVal; } diff --git a/test/storage/test_blocks.h b/test/storage/test_blocks.h index 6d9f5b8..9df126e 100644 --- a/test/storage/test_blocks.h +++ b/test/storage/test_blocks.h @@ -9,45 +9,45 @@ int test_blocks_new() { retVal = ipfs_blocks_block_add_data(input, strlen((const char*)input) + 1, block); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } // now examine the block if (strcmp((const char*)block->data, (const char*)input) != 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } if (block->data_length != strlen((const char*)input) + 1) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } if (block->cid->codec != CID_PROTOBUF) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } if (block->cid->version != 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } if (block->cid->hash_length != 32) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } unsigned char result_hash[32] = {33, 153, 66, 187, 124, 250, 87, 12, 12, 73, 43, 247, 175, 153, 10, 51, 192, 195, 218, 69, 220, 170, 105, 179, 195, 0, 203, 213, 172, 3, 244, 10 }; for(int i = 0; i < 32; i++) { if (block->cid->hash[i] != result_hash[i]) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } } - retVal = ipfs_blocks_block_free(block); + retVal = ipfs_block_free(block); return 1; } diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index d46139f..74e8f13 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -29,7 +29,7 @@ int test_ipfs_datastore_put() { retVal = ipfs_blocks_block_add_data(input, strlen((char*)input), block); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } @@ -38,7 +38,7 @@ int test_ipfs_datastore_put() { unsigned char key[key_length]; retVal = ipfs_datastore_helper_ds_key_from_binary(block->data, block->data_length, &key[0], key_length, &key_length); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } @@ -46,18 +46,18 @@ int test_ipfs_datastore_put() { struct FSRepo* fs_repo; retVal = ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } retVal = ipfs_repo_fsrepo_open(fs_repo); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } // send to Put with key retVal = fs_repo->config->datastore->datastore_put((const unsigned char*)key, key_length, block->data, block->data_length, fs_repo->config->datastore); if (retVal == 0) { - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 0; } @@ -67,7 +67,7 @@ int test_ipfs_datastore_put() { // clean up ipfs_repo_fsrepo_free(fs_repo); - ipfs_blocks_block_free(block); + ipfs_block_free(block); return 1; }