More changes for bitswap

This commit is contained in:
John Jones 2017-07-24 17:58:39 -05:00
parent 3a8a85e628
commit 108792ca44
16 changed files with 325 additions and 61 deletions

View file

@ -99,7 +99,7 @@ int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t
exit:
if (retVal == 0) {
ipfs_blocks_block_free(*block);
ipfs_block_free(*block);
}
if (temp_buffer != NULL)
free(temp_buffer);
@ -155,7 +155,7 @@ int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, stru
* @param block the block to free
* @returns true(1) on success
*/
int ipfs_blocks_block_free(struct Block* block) {
int ipfs_block_free(struct Block* block) {
if (block != NULL) {
ipfs_cid_free(block->cid);
if (block->data != NULL)
@ -164,3 +164,17 @@ int ipfs_blocks_block_free(struct Block* block) {
}
return 1;
}
/***
* Make a copy of a block
* @param original the original
* @returns a new Block that is a copy
*/
struct Block* ipfs_block_copy(struct Block* original) {
struct Block* copy = ipfs_blocks_block_new();
copy->data_length = original->data_length;
copy->data = (unsigned char*) malloc(original->data_length);
memcpy(copy->data, original->data, original->data_length);
copy->cid = ipfs_cid_copy(original->cid);
return copy;
}

View file

@ -137,6 +137,23 @@ int ipfs_cid_free(struct Cid* cid) {
return 1;
}
/***
* Make a copy of a Cid
* @param original the original
* @returns a copy of the original
*/
struct Cid* ipfs_cid_copy(const struct Cid* original) {
struct Cid* copy = (struct Cid*) malloc(sizeof(struct Cid));
if (copy != NULL) {
copy->codec = original->codec;
copy->version = original->version;
copy->hash_length = original->hash_length;
copy->hash = (unsigned char*) malloc(original->hash_length);
memcpy(copy->hash, original->hash, original->hash_length);
}
return copy;
}
/***
* Fill a Cid struct based on a base 58 encoded multihash
* @param incoming the string
@ -259,7 +276,7 @@ int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Ci
* @param b side B
* @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal
*/
int ipfs_cid_compare(struct Cid* a, struct Cid* b) {
int ipfs_cid_compare(const struct Cid* a, const struct Cid* b) {
if (a == NULL && b == NULL)
return 0;
if (a != NULL && b == NULL)

View file

@ -7,7 +7,7 @@ endif
LFLAGS =
DEPS =
OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o
OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)

View file

@ -107,15 +107,18 @@ int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block*
int timeout = 10;
int waitSecs = 1;
int timeTaken = 0;
if (ipfs_bitswap_want_manager_add(bitswapContext, cid)) {
struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid);
if (want_entry != NULL) {
// loop waiting for it to fill
while(1) {
if (ipfs_bitswap_want_manager_received(bitswapContext, cid)) {
if (ipfs_bitswap_want_manager_get_block(bitswapContext, cid, block)) {
// NOTE: this should use reference counting
ipfs_bitswap_want_manager_remove(bitswapContext, cid);
return 1;
if (want_entry->block != NULL) {
*block = ipfs_block_copy(want_entry->block);
// error or not, we no longer need the block (decrement reference count)
ipfs_bitswap_want_manager_remove(bitswapContext, cid);
if (*block == NULL) {
return 0;
}
return 1;
}
//TODO: This is a busy-loop. Find another way.
timeTaken += waitSecs;

View file

@ -439,7 +439,7 @@ int ipfs_bitswap_message_free(struct BitswapMessage* message) {
for(int i = 0; i < message->blocks->total; i++) {
// free each item in the vector
struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->blocks, i);
ipfs_blocks_block_free(entry);
ipfs_block_free(entry);
}
libp2p_utils_vector_free(message->blocks);
}
@ -447,7 +447,7 @@ int ipfs_bitswap_message_free(struct BitswapMessage* message) {
for(int i = 0; i < message->payload->total; i++) {
// free each item in the vector
struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->payload, i);
ipfs_blocks_block_free(entry);
ipfs_block_free(entry);
}
libp2p_utils_vector_free(message->payload);
}

View file

@ -1,16 +1,15 @@
#include "ipfs/exchange/bitswap/want_manager.h"
#include "ipfs/exchange/bitswap/wantlist.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
/***
* Add a Cid to the local wantlist
* @param context the context
* @param cid the Cid
* @returns true(1) on success, false(0) otherwise
* @returns the added entry
*/
int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid) {
// first see if it is already here
// increment the reference count
return 0;
struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid) {
// add if not there, and increment reference count
return ipfs_bitswap_wantlist_queue_add(context->localWantlist, cid);
}
/***
@ -21,7 +20,11 @@ int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const st
*/
int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, const struct Cid* cid) {
// find the entry
struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, cid);
// check the status
if (entry != NULL && entry->block != NULL) {
return 1;
}
return 0;
}
@ -33,8 +36,14 @@ int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, con
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_want_manager_get_block(const struct BitswapContext* context, const struct Cid* cid, struct Block** block) {
// find the entry
// return a copy of the block
struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, cid);
if (entry != NULL && entry->block != NULL) {
// return a copy of the block
*block = ipfs_block_copy(entry->block);
if ( (*block) != NULL) {
return 1;
}
}
return 0;
}
@ -48,5 +57,5 @@ int ipfs_bitswap_want_manager_get_block(const struct BitswapContext* context, co
int ipfs_bitswap_want_manager_remove(const struct BitswapContext* context, const struct Cid* cid) {
// decrement the reference count
// if it is zero, remove the entry (lock first)
return 0;
return ipfs_bitswap_wantlist_queue_remove(context->localWantlist, cid);
}

View file

@ -0,0 +1,140 @@
#include <stdlib.h>
#include "libp2p/utils/vector.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
/***
* Initialize a new Wantlist (there should only be 1 per instance)
* @returns a new WantList
*/
struct WantListQueue* ipfs_bitswap_wantlist_queue_new() {
struct WantListQueue* wantlist = (struct WantListQueue*) malloc(sizeof(struct WantListQueue));
if (wantlist != NULL) {
pthread_mutex_init(&wantlist->wantlist_mutex, NULL);
wantlist->queue = NULL;
}
return wantlist;
}
/***
* Deallocate resources of a WantList
* @param wantlist the WantList
* @returns true(1)
*/
int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist) {
if (wantlist != NULL) {
if (wantlist->queue != NULL) {
libp2p_utils_vector_free(wantlist->queue);
wantlist->queue = NULL;
}
free(wantlist);
}
return 1;
}
/***
* Add a Cid to the WantList
* @param wantlist the WantList to add to
* @param cid the Cid to add
* @returns the correct WantListEntry or NULL if error
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid) {
struct WantListQueueEntry* entry = NULL;
if (wantlist != NULL) {
pthread_mutex_lock(&wantlist->wantlist_mutex);
if (wantlist->queue == NULL) {
wantlist->queue = libp2p_utils_vector_new(1);
}
entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid);
if (entry == NULL) {
// create a new one
entry = ipfs_bitswap_wantlist_queue_entry_new();
entry->cid = ipfs_cid_copy(cid);
entry->priority = 1;
//TODO: find a way to pass session information
//entry->sessionsRequesting;
} else {
//TODO: find a way to pass sessioninformation
//entry->sessionRequesting;
}
libp2p_utils_vector_add(wantlist->queue, entry);
pthread_mutex_unlock(&wantlist->wantlist_mutex);
}
return entry;
}
/***
* Remove (decrement the counter) a Cid from the WantList
* @param wantlist the WantList
* @param cid the Cid
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid) {
//TODO: remove if counter is <= 0
if (wantlist != NULL) {
struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid);
if (entry != NULL) {
//TODO: find a way to decrement
return 1;
}
}
return 0;
}
/***
* Find a Cid in the WantList
* @param wantlist the list
* @param cid the Cid
* @returns the WantListQueueEntry
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid) {
for (size_t i = 0; i < wantlist->queue->total; i++) {
struct WantListQueueEntry* entry = (struct WantListQueueEntry*) libp2p_utils_vector_get(wantlist->queue, i);
if (entry == NULL) {
//TODO: something went wrong. This should be logged.
return NULL;
}
if (ipfs_cid_compare(cid, entry->cid) == 0) {
return entry;
}
}
return NULL;
}
/***
* Initialize a WantListQueueEntry
* @returns a new WantListQueueEntry
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new() {
struct WantListQueueEntry* entry = (struct WantListQueueEntry*) malloc(sizeof(struct WantListQueueEntry));
if (entry != NULL) {
entry->block = NULL;
entry->cid = NULL;
entry->priority = 0;
entry->sessionsRequesting = NULL;
}
return entry;
}
/***
* Free a WantListQueueENtry struct
* @param entry the struct
* @returns true(1)
*/
int ipfs_bitswap_wantlist_queue_entry_free(struct WantListQueueEntry* entry) {
if (entry != NULL) {
if (entry->block != NULL) {
ipfs_block_free(entry->block);
entry->block = NULL;
}
if (entry->cid != NULL) {
ipfs_cid_free(entry->cid);
entry->cid = NULL;
}
if (entry->sessionsRequesting != NULL) {
libp2p_utils_vector_free(entry->sessionsRequesting);
entry->sessionsRequesting = NULL;
}
free(entry);
}
return 1;
}

View file

@ -27,7 +27,7 @@ int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, stru
* @param block the block to free
* @returns true(1) on success
*/
int ipfs_blocks_block_free(struct Block* block);
int ipfs_block_free(struct Block* block);
/**
* Determine the approximate size of an encoded block
@ -55,4 +55,11 @@ int ipfs_blocks_block_protobuf_encode(const struct Block* block, unsigned char*
*/
int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t buffer_length, struct Block** block);
/***
* Make a copy of a block
* @param original the original
* @returns a new Block that is a copy
*/
struct Block* ipfs_block_copy(struct Block* original);
#endif

View file

@ -81,6 +81,13 @@ int ipfs_cid_new(int version, const unsigned char* hash, size_t hash_length, con
*/
int ipfs_cid_free(struct Cid* cid);
/***
* Make a copy of a Cid
* @param original the original
* @returns a copy of the original
*/
struct Cid* ipfs_cid_copy(const struct Cid* original);
/***
* Fill a Cid struct based on a base 58 encoded string
* @param incoming the string
@ -122,6 +129,6 @@ int ipfs_cid_set_foreach (struct CidSet *set, int (*func)(struct Cid *));
* @param b side B
* @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal
*/
int ipfs_cid_compare(struct Cid* a, struct Cid* b);
int ipfs_cid_compare(const struct Cid* a, const struct Cid* b);
#endif

View file

@ -11,7 +11,7 @@
struct BitswapContext {
struct SessionContext* sessionContext;
struct IpfsNode* ipfsNode;
struct Wantlist* localWantlist;
struct WantListQueue* localWantlist;
};
/**

View file

@ -3,14 +3,15 @@
#include "ipfs/blocks/block.h"
#include "ipfs/cid/cid.h"
#include "ipfs/exchange/bitswap/bitswap.h"
#include "wantlist_queue.h"
/***
* Add a Cid to the local wantlist
* @param context the context
* @param cid the Cid
* @returns true(1) on success, false(0) otherwise
* @returns the added WantListQueueEntry
*/
int ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid);
struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid);
/***
* Checks to see if the requested block has been received

View file

@ -1,17 +0,0 @@
/**
* This is a list of requests from a peer.
* NOTE: This tracks who wants what. If 2 peers want the same file,
* there will be 1 WantListEntry in the WantList. There will be 2 entries in
* WantListEntry.sessionsRequesting.
*/
struct WantListEntry {
unsigned char* cid;
size_t cid_length;
int priority;
struct Libp2pVector* sessionsRequesting;
};
struct WantList {
struct Libp2pVector* set;
};

View file

@ -0,0 +1,83 @@
#pragma once
/**
* This is a list of requests from a peer (including locally).
* NOTE: This tracks who wants what. If 2 peers want the same file,
* there will be 1 WantListEntry in the WantList. There will be 2 entries in
* WantListEntry.sessionsRequesting.
*/
#include <pthread.h>
#include "ipfs/cid/cid.h"
#include "ipfs/blocks/block.h"
enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_REMOTE };
struct WantListSession {
enum WantListSessionType type;
void* context;
};
struct WantListQueueEntry {
struct Cid* cid;
int priority;
// a vector of WantListSessions
struct Libp2pVector* sessionsRequesting;
struct Block* block;
};
struct WantListQueue {
pthread_mutex_t wantlist_mutex;
// a vector of WantListEntries
struct Libp2pVector* queue;
};
/***
* Initialize a WantListQueueEntry
* @returns a new WantListQueueEntry
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new();
/***
* Remove resources, freeing a WantListQueueEntry
* @param entry the WantListQueueEntry
* @returns true(1)
*/
int ipfs_bitswap_wantlist_queue_entry_free(struct WantListQueueEntry* entry);
/***
* Initialize a new Wantlist (there should only be 1 per instance)
* @returns a new WantList
*/
struct WantListQueue* ipfs_bitswap_wantlist_queue_new();
/***
* Deallocate resources of a WantList
* @param wantlist the WantList
* @returns true(1)
*/
int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist);
/***
* Add a Cid to the WantList
* @param wantlist the WantList to add to
* @param cid the Cid to add
* @returns the correct WantListEntry or NULL if error
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid);
/***
* Remove (decrement the counter) a Cid from the WantList
* @param wantlist the WantList
* @param cid the Cid
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid);
/***
* Find a Cid in the WantList
* @param wantlist the list
* @param cid the Cid
* @returns the WantListQueueEntry
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid);

View file

@ -61,7 +61,7 @@ int test_repo_fsrepo_write_read_block() {
retVal = ipfs_repo_fsrepo_block_write(block, fs_repo);
if (retVal == 0) {
ipfs_repo_fsrepo_free(fs_repo);
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
@ -70,7 +70,7 @@ int test_repo_fsrepo_write_read_block() {
retVal = ipfs_repo_fsrepo_block_read(block->cid->hash, block->cid->hash_length, &results, fs_repo);
if (retVal == 0) {
ipfs_repo_fsrepo_free(fs_repo);
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
@ -90,7 +90,7 @@ int test_repo_fsrepo_write_read_block() {
}
ipfs_repo_fsrepo_free(fs_repo);
ipfs_blocks_block_free(block);
ipfs_blocks_block_free(results);
ipfs_block_free(block);
ipfs_block_free(results);
return retVal;
}

View file

@ -9,45 +9,45 @@ int test_blocks_new() {
retVal = ipfs_blocks_block_add_data(input, strlen((const char*)input) + 1, block);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
// now examine the block
if (strcmp((const char*)block->data, (const char*)input) != 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
if (block->data_length != strlen((const char*)input) + 1) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
if (block->cid->codec != CID_PROTOBUF) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
if (block->cid->version != 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
if (block->cid->hash_length != 32) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
unsigned char result_hash[32] = {33, 153, 66, 187, 124, 250, 87, 12, 12, 73, 43, 247, 175, 153, 10, 51, 192, 195, 218, 69, 220, 170, 105, 179, 195, 0, 203, 213, 172, 3, 244, 10 };
for(int i = 0; i < 32; i++) {
if (block->cid->hash[i] != result_hash[i]) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
}
retVal = ipfs_blocks_block_free(block);
retVal = ipfs_block_free(block);
return 1;
}

View file

@ -29,7 +29,7 @@ int test_ipfs_datastore_put() {
retVal = ipfs_blocks_block_add_data(input, strlen((char*)input), block);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
@ -38,7 +38,7 @@ int test_ipfs_datastore_put() {
unsigned char key[key_length];
retVal = ipfs_datastore_helper_ds_key_from_binary(block->data, block->data_length, &key[0], key_length, &key_length);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
@ -46,18 +46,18 @@ int test_ipfs_datastore_put() {
struct FSRepo* fs_repo;
retVal = ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
retVal = ipfs_repo_fsrepo_open(fs_repo);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
// send to Put with key
retVal = fs_repo->config->datastore->datastore_put((const unsigned char*)key, key_length, block->data, block->data_length, fs_repo->config->datastore);
if (retVal == 0) {
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 0;
}
@ -67,7 +67,7 @@ int test_ipfs_datastore_put() {
// clean up
ipfs_repo_fsrepo_free(fs_repo);
ipfs_blocks_block_free(block);
ipfs_block_free(block);
return 1;
}