bug fixes to client side bitswap

yamux
John Jones 2017-07-31 10:01:06 -05:00
parent 9bceade4d8
commit e22da601ea
6 changed files with 202 additions and 7 deletions

View File

@ -7,6 +7,7 @@
#include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/bitswap.h"
#include "ipfs/exchange/bitswap/message.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
#include "ipfs/exchange/bitswap/want_manager.h"
/**
@ -28,8 +29,8 @@ struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) {
free(exchange);
return NULL;
}
bitswapContext->localWantlist = NULL;
bitswapContext->peerRequestQueue = NULL;
bitswapContext->localWantlist = ipfs_bitswap_wantlist_queue_new();
bitswapContext->peerRequestQueue = ipfs_bitswap_peer_request_queue_new();
bitswapContext->ipfsNode = ipfs_node;
exchange->exchangeContext = (void*) bitswapContext;
@ -109,7 +110,7 @@ int ipfs_bitswap_get_block(struct Exchange* exchange, struct Cid* cid, struct Bl
return 1;
// now ask the network
//NOTE: this timeout should be configurable
int timeout = 10;
int timeout = 60;
int waitSecs = 1;
int timeTaken = 0;
struct WantListSession wantlist_session;

View File

@ -139,10 +139,13 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue*
if (queue != NULL) {
pthread_mutex_lock(&queue->queue_mutex);
struct PeerRequestEntry* entry = queue->first;
retVal = entry->current;
queue->first = queue->first->next;
if (entry != NULL) {
retVal = entry->current;
queue->first = queue->first->next;
}
pthread_mutex_unlock(&queue->queue_mutex);
ipfs_bitswap_peer_request_entry_free(entry);
if (entry != NULL)
ipfs_bitswap_peer_request_entry_free(entry);
}
return retVal;
}

View File

@ -147,10 +147,14 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_pop(struct WantListQueue*
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new() {
struct WantListQueueEntry* entry = (struct WantListQueueEntry*) malloc(sizeof(struct WantListQueueEntry));
if (entry != NULL) {
entry->sessionsRequesting = libp2p_utils_vector_new(1);
if (entry->sessionsRequesting == NULL) {
free(entry);
return NULL;
}
entry->block = NULL;
entry->cid = NULL;
entry->priority = 0;
entry->sessionsRequesting = NULL;
entry->attempts = 0;
}
return entry;

View File

@ -39,6 +39,9 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon
sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size);
sessionContext->default_stream->read(sessionContext, &results, &results_size, 5);
if (results_size == 0)
goto exit;
// see if we can unprotobuf
if (!libp2p_message_protobuf_decode(results, results_size, &return_message))
goto exit;

View File

@ -157,6 +157,186 @@ int test_bitswap_retrieve_file()
return retVal;
}
/***
* Attempt to retrieve a file from a known node
*/
int test_bitswap_retrieve_file_remote() {
int retVal = 0;
/*
libp2p_logger_add_class("dht_protocol");
libp2p_logger_add_class("providerstore");
libp2p_logger_add_class("peerstore");
libp2p_logger_add_class("exporter");
libp2p_logger_add_class("peer");
*/
libp2p_logger_add_class("test_bitswap");
libp2p_logger_add_class("null");
libp2p_logger_add_class("online");
libp2p_logger_add_class("multistream");
// clean out repository
char* ipfs_path = "/tmp/test1";
char* peer_id_1 = NULL, *peer_id_2 = NULL;
struct IpfsNode* ipfs_node1 = NULL, *ipfs_node2 = NULL;
pthread_t thread1;
int thread1_started = 0;
struct MultiAddress* ma_peer1 = NULL;
struct Libp2pVector* ma_vector2 = NULL;
struct HashtableNode* node = NULL;
struct Block* result = NULL;
struct Cid* cid = NULL;
// create peer 1
libp2p_logger_debug("test_bitswap", "Firing up daemon 1.\n");
drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1);
char multiaddress_string[255];
sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1);
ma_peer1 = multiaddress_new_from_string(multiaddress_string);
// add a file
size_t bytes_written = 0;
ipfs_node_online_new(ipfs_path, &ipfs_node1);
ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, ipfs_node1, &bytes_written, 0);
// start the daemon in a separate thread
if (pthread_create(&thread1, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0) {
libp2p_logger_error("test_bitswap", "Unable to start thread 1\n");
goto exit;
}
thread1_started = 1;
// wait for everything to start up
sleep(3);
// create my peer, peer 2
libp2p_logger_debug("test_routing", "Firing up the client\n");
ipfs_path = "/tmp/test2";
ma_peer1 = multiaddress_new_from_string(multiaddress_string);
ma_vector2 = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector2, ma_peer1);
drop_and_build_repository(ipfs_path, 4002, ma_vector2, &peer_id_2);
multiaddress_free(ma_peer1);
ipfs_node_online_new(ipfs_path, &ipfs_node2);
ipfs_node2->routing->Bootstrap(ipfs_node2->routing);
// this does the heavy lifting...
cid = ipfs_cid_new(0, node->hash, node->hash_size, CID_PROTOBUF);
if (!ipfs_node2->exchange->GetBlock(ipfs_node2->exchange, cid, &result)) {
libp2p_logger_error("test_bitswap", "GetBlock returned false\n");
goto exit;
}
if (node->hash_size != result->cid->hash_length) {
libp2p_logger_error("test_bitswap", "Node hash sizes do not match. Should be %lu but is %lu\n", node->hash_size, result->cid->hash_length);
goto exit;
}
if (node->data_size != result->data_length) {
libp2p_logger_error("test_bitswap", "Result sizes do not match. Should be %lu but is %lu\n", node->data_size, result->data_length);
goto exit;
}
retVal = 1;
exit:
ipfs_daemon_stop();
if (thread1_started)
pthread_join(thread1, NULL);
if (peer_id_1 != NULL)
free(peer_id_1);
if (peer_id_2 != NULL)
free(peer_id_2);
if (ma_vector2 != NULL) {
libp2p_utils_vector_free(ma_vector2);
}
if (node != NULL)
ipfs_hashtable_node_free(node);
if (result != NULL)
ipfs_block_free(result);
if (cid != NULL)
ipfs_cid_free(cid);
return retVal;
}
/***
* Attempt to retrieve a file from a known node
*/
int test_bitswap_retrieve_file_known_remote() {
int retVal = 0;
/***
* This assumes a remote server with the hello_world.txt file already in its database
*/
char* remote_ip = "10.211.55.2";
int remote_port = 4001;
char* remote_peer_id = "QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4";
char* hello_world_hash = "QmTUFTVgkHT3Qdd9ospVjSLi2upd6VdkeNXZQH66cVmzja";
/*
libp2p_logger_add_class("dht_protocol");
libp2p_logger_add_class("providerstore");
libp2p_logger_add_class("peerstore");
libp2p_logger_add_class("exporter");
libp2p_logger_add_class("peer");
*/
libp2p_logger_add_class("test_bitswap");
libp2p_logger_add_class("null");
libp2p_logger_add_class("online");
libp2p_logger_add_class("multistream");
char* ipfs_path = "/tmp/test1";
char* peer_id_1 = NULL, *peer_id_2 = NULL;
struct IpfsNode* ipfs_node2 = NULL;
struct MultiAddress* ma_peer1 = NULL;
struct Libp2pVector* ma_vector2 = NULL;
struct Block* result = NULL;
struct Cid* cid = NULL;
// create peer 1
char multiaddress_string[255];
sprintf(multiaddress_string, "/ip4/%s/tcp/%d/ipfs/%s", remote_ip, remote_port, remote_peer_id);
ma_peer1 = multiaddress_new_from_string(multiaddress_string);
// create my peer, peer 2
libp2p_logger_debug("test_routing", "Firing up the client\n");
ipfs_path = "/tmp/test2";
ma_vector2 = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector2, ma_peer1);
drop_and_build_repository(ipfs_path, 4002, ma_vector2, &peer_id_2);
multiaddress_free(ma_peer1);
ipfs_node_online_new(ipfs_path, &ipfs_node2);
ipfs_node2->routing->Bootstrap(ipfs_node2->routing);
// this does the heavy lifting...
cid = ipfs_cid_new(0, (unsigned char*)hello_world_hash, strlen(hello_world_hash), CID_PROTOBUF);
if (!ipfs_node2->exchange->GetBlock(ipfs_node2->exchange, cid, &result)) {
libp2p_logger_error("test_bitswap", "GetBlock returned false\n");
goto exit;
}
if (strlen(hello_world_hash) != result->cid->hash_length) {
libp2p_logger_error("test_bitswap", "Node hash sizes do not match. Should be %lu but is %lu\n", strlen(hello_world_hash), result->cid->hash_length);
goto exit;
}
retVal = 1;
exit:
if (peer_id_1 != NULL)
free(peer_id_1);
if (peer_id_2 != NULL)
free(peer_id_2);
if (ma_vector2 != NULL) {
libp2p_utils_vector_free(ma_vector2);
}
if (result != NULL)
ipfs_block_free(result);
if (cid != NULL)
ipfs_cid_free(cid);
return retVal;
}
/***
* Attempt to retrieve a file from a previously unknown node
*/

View File

@ -37,6 +37,8 @@ const char* names[] = {
"test_bitswap_new_free",
"test_bitswap_peer_request_queue_new",
"test_bitswap_retrieve_file",
"test_bitswap_retrieve_file_known_remote",
"test_bitswap_retrieve_file_remote",
"test_bitswap_retrieve_file_third_party",
"test_cid_new_free",
"test_cid_cast_multihash",
@ -91,6 +93,8 @@ int (*funcs[])(void) = {
test_bitswap_new_free,
test_bitswap_peer_request_queue_new,
test_bitswap_retrieve_file,
test_bitswap_retrieve_file_known_remote,
test_bitswap_retrieve_file_remote,
test_bitswap_retrieve_file_third_party,
test_cid_new_free,
test_cid_cast_multihash,