Implementation of a universal bitswap queue

This queue stores both local and remote requests for blocks
This commit is contained in:
John Jones 2017-07-26 07:38:47 -05:00
parent 108792ca44
commit 692d3406c8
14 changed files with 76 additions and 28 deletions

View file

@ -107,7 +107,10 @@ int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block*
int timeout = 10; int timeout = 10;
int waitSecs = 1; int waitSecs = 1;
int timeTaken = 0; int timeTaken = 0;
struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid); struct WantListSession wantlist_session;
wantlist_session.type = WANTLIST_SESSION_TYPE_LOCAL;
wantlist_session.context = (void*)bitswapContext->ipfsNode;
struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid, &wantlist_session);
if (want_entry != NULL) { if (want_entry != NULL) {
// loop waiting for it to fill // loop waiting for it to fill
while(1) { while(1) {

View file

@ -2,14 +2,14 @@
#include "ipfs/exchange/bitswap/wantlist_queue.h" #include "ipfs/exchange/bitswap/wantlist_queue.h"
/*** /***
* Add a Cid to the local wantlist * Add a Cid to the wantlist
* @param context the context * @param context the context
* @param cid the Cid * @param cid the Cid
* @returns the added entry * @returns the added entry
*/ */
struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid) { struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid, const struct WantListSession* session) {
// add if not there, and increment reference count // add if not there, and increment reference count
return ipfs_bitswap_wantlist_queue_add(context->localWantlist, cid); return ipfs_bitswap_wantlist_queue_add(context->localWantlist, cid, session);
} }
/*** /***
@ -57,5 +57,8 @@ int ipfs_bitswap_want_manager_get_block(const struct BitswapContext* context, co
int ipfs_bitswap_want_manager_remove(const struct BitswapContext* context, const struct Cid* cid) { int ipfs_bitswap_want_manager_remove(const struct BitswapContext* context, const struct Cid* cid) {
// decrement the reference count // decrement the reference count
// if it is zero, remove the entry (lock first) // if it is zero, remove the entry (lock first)
return ipfs_bitswap_wantlist_queue_remove(context->localWantlist, cid); struct WantListSession session;
session.type = WANTLIST_SESSION_TYPE_LOCAL;
session.context = (void*) context->ipfsNode;
return ipfs_bitswap_wantlist_queue_remove(context->localWantlist, cid, &session);
} }

View file

@ -1,7 +1,26 @@
#include <stdlib.h> #include <stdlib.h>
#include "libp2p/conn/session.h"
#include "libp2p/utils/vector.h" #include "libp2p/utils/vector.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h" #include "ipfs/exchange/bitswap/wantlist_queue.h"
/**
* remove this session from the lists of sessions that are looking for this WantListQueueEntry
* @param entry the entry
* @param session who was looking for it
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_wantlist_queue_entry_decrement(struct WantListQueueEntry* entry, const struct WantListSession* session) {
for(size_t i = 0; i < entry->sessionsRequesting->total; i++) {
const struct WantListSession* current = (const struct WantListSession*)libp2p_utils_vector_get(entry->sessionsRequesting, i);
if (ipfs_bitswap_wantlist_session_compare(session, current) == 0) {
libp2p_utils_vector_delete(entry->sessionsRequesting, i);
return 1;
}
}
return 0;
}
/*** /***
* Initialize a new Wantlist (there should only be 1 per instance) * Initialize a new Wantlist (there should only be 1 per instance)
* @returns a new WantList * @returns a new WantList
@ -37,7 +56,7 @@ int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist) {
* @param cid the Cid to add * @param cid the Cid to add
* @returns the correct WantListEntry or NULL if error * @returns the correct WantListEntry or NULL if error
*/ */
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid) { struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid, const struct WantListSession* session) {
struct WantListQueueEntry* entry = NULL; struct WantListQueueEntry* entry = NULL;
if (wantlist != NULL) { if (wantlist != NULL) {
pthread_mutex_lock(&wantlist->wantlist_mutex); pthread_mutex_lock(&wantlist->wantlist_mutex);
@ -50,13 +69,9 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue*
entry = ipfs_bitswap_wantlist_queue_entry_new(); entry = ipfs_bitswap_wantlist_queue_entry_new();
entry->cid = ipfs_cid_copy(cid); entry->cid = ipfs_cid_copy(cid);
entry->priority = 1; entry->priority = 1;
//TODO: find a way to pass session information libp2p_utils_vector_add(wantlist->queue, entry);
//entry->sessionsRequesting;
} else {
//TODO: find a way to pass sessioninformation
//entry->sessionRequesting;
} }
libp2p_utils_vector_add(wantlist->queue, entry); libp2p_utils_vector_add(entry->sessionsRequesting, session);
pthread_mutex_unlock(&wantlist->wantlist_mutex); pthread_mutex_unlock(&wantlist->wantlist_mutex);
} }
return entry; return entry;
@ -68,12 +83,12 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue*
* @param cid the Cid * @param cid the Cid
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid) { int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid, const struct WantListSession* session) {
//TODO: remove if counter is <= 0 //TODO: remove if counter is <= 0
if (wantlist != NULL) { if (wantlist != NULL) {
struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid); struct WantListQueueEntry* entry = ipfs_bitswap_wantlist_queue_find(wantlist, cid);
if (entry != NULL) { if (entry != NULL) {
//TODO: find a way to decrement ipfs_bitswap_wantlist_queue_entry_decrement(entry, session);
return 1; return 1;
} }
} }
@ -138,3 +153,23 @@ int ipfs_bitswap_wantlist_queue_entry_free(struct WantListQueueEntry* entry) {
} }
return 1; return 1;
} }
int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const struct WantListSession* b) {
if (a == NULL && b == NULL)
return 0;
if (a == NULL && b != NULL)
return -1;
if (a != NULL && b == NULL)
return 1;
if (a->type != b->type)
return b->type - a->type;
if (a->type == WANTLIST_SESSION_TYPE_LOCAL) {
// it's local, there should be only 1
return 0;
} else {
struct SessionContext* contextA = (struct SessionContext*)a->context;
struct SessionContext* contextB = (struct SessionContext*)b->context;
return libp2p_session_context_compare(contextA, contextB);
}
}

View file

@ -11,7 +11,7 @@
* @param cid the Cid * @param cid the Cid
* @returns the added WantListQueueEntry * @returns the added WantListQueueEntry
*/ */
struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid); struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapContext* context, const struct Cid* cid, const struct WantListSession* session);
/*** /***
* Checks to see if the requested block has been received * Checks to see if the requested block has been received

View file

@ -13,7 +13,7 @@ enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_RE
struct WantListSession { struct WantListSession {
enum WantListSessionType type; enum WantListSessionType type;
void* context; void* context; // either an IpfsNode (local) or a SessionContext (remote)
}; };
struct WantListQueueEntry { struct WantListQueueEntry {
@ -62,7 +62,7 @@ int ipfs_bitswap_wantlist_queue_free(struct WantListQueue* wantlist);
* @param cid the Cid to add * @param cid the Cid to add
* @returns the correct WantListEntry or NULL if error * @returns the correct WantListEntry or NULL if error
*/ */
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid); struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue* wantlist, const struct Cid* cid, const struct WantListSession* session);
/*** /***
* Remove (decrement the counter) a Cid from the WantList * Remove (decrement the counter) a Cid from the WantList
@ -70,7 +70,7 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue*
* @param cid the Cid * @param cid the Cid
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid); int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const struct Cid* cid, const struct WantListSession* session);
/*** /***
* Find a Cid in the WantList * Find a Cid in the WantList
@ -80,4 +80,11 @@ int ipfs_bitswap_wantlist_queue_remove(struct WantListQueue* wantlist, const str
*/ */
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid); struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue* wantlist, const struct Cid* cid);
/***
* compare 2 sessions for equality
* @param a side a
* @param b side b
* @returns 0 if equal, <0 if A wins, >0 if b wins
*/
int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const struct WantListSession* b);

View file

@ -33,7 +33,7 @@ int repo_config_bootstrap_peers_retrieve(struct Libp2pVector** list) {
int repo_config_bootstrap_peers_free(struct Libp2pVector* list) { int repo_config_bootstrap_peers_free(struct Libp2pVector* list) {
if (list != NULL) { if (list != NULL) {
for(int i = 0; i < list->total; i++) { for(int i = 0; i < list->total; i++) {
struct MultiAddress* currAddr = libp2p_utils_vector_get(list, i); struct MultiAddress* currAddr = (struct MultiAddress*)libp2p_utils_vector_get(list, i);
multiaddress_free(currAddr); multiaddress_free(currAddr);
} }
libp2p_utils_vector_free(list); libp2p_utils_vector_free(list);

View file

@ -119,7 +119,7 @@ int ipfs_repo_config_init(struct RepoConfig* config, unsigned int num_bits_for_k
if (bootstrap_peers != NULL) { if (bootstrap_peers != NULL) {
config->bootstrap_peers = libp2p_utils_vector_new(bootstrap_peers->total); config->bootstrap_peers = libp2p_utils_vector_new(bootstrap_peers->total);
for(int i = 0; i < bootstrap_peers->total; i++) { for(int i = 0; i < bootstrap_peers->total; i++) {
struct MultiAddress* orig = libp2p_utils_vector_get(bootstrap_peers, i); const struct MultiAddress* orig = (const struct MultiAddress*) libp2p_utils_vector_get(bootstrap_peers, i);
libp2p_utils_vector_add(config->bootstrap_peers, multiaddress_copy(orig)); libp2p_utils_vector_add(config->bootstrap_peers, multiaddress_copy(orig));
} }
} }

View file

@ -27,7 +27,7 @@ int repo_config_replication_free(struct Replication* replication) {
// free the vector // free the vector
if (replication->nodes != NULL) { if (replication->nodes != NULL) {
for(int i = 0; i < replication->nodes->total; i++) { for(int i = 0; i < replication->nodes->total; i++) {
struct MultiAddress* currAddr = libp2p_utils_vector_get(replication->nodes, i); struct MultiAddress* currAddr = (struct MultiAddress*)libp2p_utils_vector_get(replication->nodes, i);
multiaddress_free(currAddr); multiaddress_free(currAddr);
} }
libp2p_utils_vector_free(replication->nodes); libp2p_utils_vector_free(replication->nodes);

View file

@ -95,7 +95,7 @@ int repo_config_write_config_file(char* full_filename, struct RepoConfig* config
fprintf(out_file, " \"ResolveCacheSize\": %d\n", config->ipns.resolve_cache_size); fprintf(out_file, " \"ResolveCacheSize\": %d\n", config->ipns.resolve_cache_size);
fprintf(out_file, " },\n \"Bootstrap\": [\n"); fprintf(out_file, " },\n \"Bootstrap\": [\n");
for(int i = 0; i < config->bootstrap_peers->total; i++) { for(int i = 0; i < config->bootstrap_peers->total; i++) {
struct MultiAddress* peer = libp2p_utils_vector_get(config->bootstrap_peers, i); const struct MultiAddress* peer = (const struct MultiAddress*)libp2p_utils_vector_get(config->bootstrap_peers, i);
fprintf(out_file, " \"%s\"", peer->string); fprintf(out_file, " \"%s\"", peer->string);
if (i < config->bootstrap_peers->total - 1) if (i < config->bootstrap_peers->total - 1)
fprintf(out_file, ",\n"); fprintf(out_file, ",\n");

View file

@ -126,7 +126,7 @@ int ipfs_routing_kademlia_bootstrap(struct IpfsRouting* routing) {
struct IpfsNode *local_node = routing->local_node; struct IpfsNode *local_node = routing->local_node;
// read the config file and get the bootstrap peers // read the config file and get the bootstrap peers
for(int i = 0; i < local_node->repo->config->bootstrap_peers->total; i++) { // loop through the peers for(int i = 0; i < local_node->repo->config->bootstrap_peers->total; i++) { // loop through the peers
struct IPFSAddr* ipfs_addr = local_node->repo->config->bootstrap_peers->items[i]; struct IPFSAddr* ipfs_addr = (struct IPFSAddr*) libp2p_utils_vector_get(local_node->repo->config->bootstrap_peers, i);
struct MultiAddress* ma = multiaddress_new_from_string(ipfs_addr->entire_string); struct MultiAddress* ma = multiaddress_new_from_string(ipfs_addr->entire_string);
// get the id // get the id
char* ptr; char* ptr;

View file

@ -406,7 +406,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k
if (!retVal) { if (!retVal) {
// we didn't get the file. Try to connect to the peers we're not connected to, and ask for the file // we didn't get the file. Try to connect to the peers we're not connected to, and ask for the file
for(int i = 0; i < peers->total; i++) { for(int i = 0; i < peers->total; i++) {
struct Libp2pPeer* current_peer = libp2p_utils_vector_get(peers, i); struct Libp2pPeer* current_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(peers, i);
if (libp2p_peer_matches_id(current_peer, (unsigned char*)routing->local_node->identity->peer_id)) { if (libp2p_peer_matches_id(current_peer, (unsigned char*)routing->local_node->identity->peer_id)) {
// we tried this once, it didn't work. Skip it. // we tried this once, it didn't work. Skip it.
continue; continue;

View file

@ -49,7 +49,7 @@ int test_bitswap_new_free() {
message->wantlist->entries = libp2p_utils_vector_new(1); message->wantlist->entries = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(message->wantlist->entries, wantlist_entry); libp2p_utils_vector_add(message->wantlist->entries, wantlist_entry);
wantlist_entry = NULL; wantlist_entry = NULL;
wantlist_entry = libp2p_utils_vector_get(message->wantlist->entries, 0); wantlist_entry = (struct WantlistEntry*)libp2p_utils_vector_get(message->wantlist->entries, 0);
if (wantlist_entry == NULL) { if (wantlist_entry == NULL) {
fprintf(stderr, "Vector didn't have entry\n"); fprintf(stderr, "Vector didn't have entry\n");
goto exit; goto exit;
@ -63,7 +63,7 @@ int test_bitswap_new_free() {
block = ipfs_blocks_block_new(); block = ipfs_blocks_block_new();
block->data_length = 25; block->data_length = 25;
libp2p_utils_vector_add(message->payload, block); libp2p_utils_vector_add(message->payload, block);
block = libp2p_utils_vector_get(message->payload, 0); block = (struct Block*)libp2p_utils_vector_get(message->payload, 0);
if (block == NULL) { if (block == NULL) {
fprintf(stderr, "Vector didn't have payload entry\n"); fprintf(stderr, "Vector didn't have payload entry\n");
goto exit; goto exit;

View file

@ -30,7 +30,7 @@ int test_repo_bootstrap_peers_init() {
*/ */
for(int i = 0; i < list->total; i++) { for(int i = 0; i < list->total; i++) {
unsigned long strLen = strlen(default_bootstrap_addresses[i]); unsigned long strLen = strlen(default_bootstrap_addresses[i]);
struct MultiAddress* currAddr = libp2p_utils_vector_get(list, i); struct MultiAddress* currAddr = (struct MultiAddress*)libp2p_utils_vector_get(list, i);
if (strncmp(currAddr->string, default_bootstrap_addresses[i], strLen) != 0) if (strncmp(currAddr->string, default_bootstrap_addresses[i], strLen) != 0)
printf("The value of element %d is: %s\n", i, currAddr->string); printf("The value of element %d is: %s\n", i, currAddr->string);
} }

View file

@ -229,7 +229,7 @@ int test_routing_find_providers() {
// connect to peer 2 // connect to peer 2
struct Libp2pPeer *remote_peer = NULL; struct Libp2pPeer *remote_peer = NULL;
for(int i = 0; i < result->total; i++) { for(int i = 0; i < result->total; i++) {
remote_peer = libp2p_utils_vector_get(result, i); remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i);
if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(remote_peer, 5)) { if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(remote_peer, 5)) {
break; break;
} }