Implemented find providers from remote peers

Now, you can ask a known node for a key, and it can pass a list of peers
that are perhaps unknownn to the calling peer.
This commit is contained in:
John Jones 2017-04-16 23:47:53 -05:00
parent 62096ffc1c
commit 2b0a29a06b
9 changed files with 263 additions and 19 deletions

View file

@ -35,13 +35,13 @@ struct IpfsRouting {
int (*GetValue) (struct IpfsRouting*, char*, size_t, void**, size_t*); int (*GetValue) (struct IpfsRouting*, char*, size_t, void**, size_t*);
/** /**
* Find a provider * Find a provider
* @param 1 the context * @param routing the context
* @param 2 the information that is being looked for * @param key the information that is being looked for
* @param 3 the size of param 2 * @param key_size the size of param 2
* @param 4 the information found * @param peers a vector of peers found that can provide the value for the key
* @param 5 the size of the information found * @returns true(1) on success, otherwise false(0)
*/ */
int (*FindProviders) (struct IpfsRouting*, char*, size_t, struct Libp2pVector** multiaddresses); int (*FindProviders) (struct IpfsRouting* routing, unsigned char* key, size_t key_size, struct Libp2pVector** peers);
/** /**
* Find a peer * Find a peer
* @param 1 the context * @param 1 the context

View file

@ -46,7 +46,7 @@ int ipfs_routing_kademlia_get_value(struct IpfsRouting* routing, char* key, size
* @param results_size the size of the results buffer * @param results_size the size of the results buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int ipfs_routing_kademlia_find_providers(struct IpfsRouting* routing, char* key, size_t key_size, struct Libp2pVector** results) { int ipfs_routing_kademlia_find_providers(struct IpfsRouting* routing, unsigned char* key, size_t key_size, struct Libp2pVector** results) {
*results = libp2p_utils_vector_new(1); *results = libp2p_utils_vector_new(1);
struct Libp2pVector* vector = *results; struct Libp2pVector* vector = *results;
// see if I can provide it // see if I can provide it
@ -67,7 +67,7 @@ int ipfs_routing_kademlia_find_providers(struct IpfsRouting* routing, char* key,
if (vector->total == 0) { if (vector->total == 0) {
// search requires null terminated key // search requires null terminated key
char* key_nt = malloc(key_size + 1); char* key_nt = malloc(key_size + 1);
strncpy(key_nt, key, key_size); strncpy(key_nt, (char*)key, key_size);
key_nt[key_size] = 0; key_nt[key_size] = 0;
struct MultiAddress** list = search_kademlia(key_nt, 3); struct MultiAddress** list = search_kademlia(key_nt, 3);
free(key_nt); free(key_nt);

View file

@ -54,7 +54,7 @@ int ipfs_routing_generic_get_value (ipfs_routing* routing, char *key, size_t key
return 0; return 0;
} }
int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, struct Libp2pVector** multiaddresses) int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, unsigned char *key, size_t key_size, struct Libp2pVector** peers)
{ {
return ErrOffline; return ErrOffline;
} }

View file

@ -47,8 +47,80 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct Stream* st
return return_message; return return_message;
} }
int ipfs_routing_online_find_providers(struct IpfsRouting* routing, char* val1, size_t val2, struct Libp2pVector** multiaddresses) { /***
return 0; * Ask the network for anyone that can provide a hash
* @param routing the context
* @param key the hash to look for
* @param key_size the size of the hash
* @param peers an array of Peer structs that can provide the hash
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, unsigned char* key, size_t key_size, struct Libp2pVector** peers) {
int found = 0;
// build the message to be transmitted
struct Libp2pMessage* message = libp2p_message_new();
message->message_type = MESSAGE_TYPE_GET_PROVIDERS;
message->key_size = key_size;
message->key = malloc(message->key_size);
memcpy(message->key, key, message->key_size);
// loop through the connected peers, asking for the hash
struct Libp2pLinkedList* current_entry = routing->local_node->peerstore->head_entry;
while (current_entry != NULL) {
struct Libp2pPeer* peer = ((struct PeerEntry*)current_entry->item)->peer;
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// Ask for hash, if it has it, break out of the loop and stop looking
struct Libp2pMessage* return_message = ipfs_routing_online_send_receive_message(peer->connection, message);
if (return_message != NULL && return_message->provider_peer_head != NULL) {
found = 1;
*peers = libp2p_utils_vector_new(1);
struct Libp2pLinkedList * current_provider_peer_list_item = return_message->provider_peer_head;
while (current_provider_peer_list_item != NULL) {
struct Libp2pPeer *current_peer = current_provider_peer_list_item->item;
libp2p_utils_vector_add(*peers, libp2p_peer_copy(current_peer));
current_provider_peer_list_item = current_provider_peer_list_item->next;
}
libp2p_message_free(return_message);
break;
}
// TODO: Make this multithreaded
}
if (found)
current_entry = NULL;
else
current_entry = current_entry->next;
}
// clean up
libp2p_message_free(message);
return found;
}
/**
* Looking for a provider of a hash. This first looks locally, then asks the network
* @param routing the context
* @param key the hash to look for
* @param key_size the size of the hash
* @param multiaddresses the results
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_find_providers(struct IpfsRouting* routing, unsigned char* key, size_t key_size, struct Libp2pVector** peers) {
unsigned char* peer_id;
int peer_id_size;
struct Libp2pPeer *peer;
// see if we can find the key, and retrieve the peer who has it
if (!libp2p_providerstore_get(routing->local_node->providerstore, key, key_size, &peer_id, &peer_id_size)) {
// we need to look remotely
return ipfs_routing_online_find_remote_providers(routing, key, key_size, peers);
}
// now translate the peer id into a peer to get the multiaddresses
peer = libp2p_peerstore_get_peer(routing->local_node->peerstore, peer_id, peer_id_size);
if (peer == NULL)
return 0;
*peers = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(*peers, peer);
return 1;
} }
/*** /***
@ -119,7 +191,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, char* key, size_t k
int port = multiaddress_get_ip_port(temp_ma); int port = multiaddress_get_ip_port(temp_ma);
multiaddress_free(temp_ma); multiaddress_free(temp_ma);
char str[255]; char str[255];
sprintf(str, "/ip4/127.1.2.3/tcp/%d", port); sprintf(str, "/ip4/127.0.0.1/tcp/%d/ipfs/%s/", port, routing->local_node->identity->peer_id);
struct MultiAddress* ma = multiaddress_new_from_string(str); struct MultiAddress* ma = multiaddress_new_from_string(str);
libp2p_logger_debug("online", "Adding local MultiAddress %s to peer.\n", ma->string); libp2p_logger_debug("online", "Adding local MultiAddress %s to peer.\n", ma->string);
local_peer->addr_head->item = ma; local_peer->addr_head->item = ma;

56
test/core/test_null.h Normal file
View file

@ -0,0 +1,56 @@
#include <stdlib.h>
int test_null_add_provider() {
int retVal = 0;
char* peer_id_1;
char* peer_id_2;
struct FSRepo *fs_repo_2 = NULL;
pthread_t thread1;
pthread_t thread2;
struct MultiAddress* ma_peer1;
char* ipfs_path = "/tmp/test1";
// create peer 1 that will be the "server" for this test
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1);
char multiaddress_string[255];
sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1);
ma_peer1 = multiaddress_new_from_string(multiaddress_string);
// start the daemon in a separate thread
if (pthread_create(&thread1, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0)
goto exit;
// create peer 2 that will be the "client" for this test
ipfs_path = "/tmp/test2";
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
struct Libp2pVector* ma_vector = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector, ma_peer1);
drop_and_build_repository(ipfs_path, 4002, ma_vector, &peer_id_2);
// add a file, to prime the connection to peer 1
//TODO: Find a better way to do this...
size_t bytes_written = 0;
ipfs_repo_fsrepo_new(ipfs_path, NULL, &fs_repo_2);
ipfs_repo_fsrepo_open(fs_repo_2);
struct Node* node = NULL;
ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, fs_repo_2, &bytes_written, 0);
ipfs_repo_fsrepo_free(fs_repo_2);
// start the daemon in a separate thread
if (pthread_create(&thread2, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0)
goto exit;
// wait for everything to start up
// JMJ debugging
sleep(60);
//TODO: verify that the server (peer 1) has the client and his file
retVal = 1;
exit:
if (fs_repo_2 != NULL)
ipfs_repo_fsrepo_free(fs_repo_2);
if (ma_peer1 != NULL)
multiaddress_free(ma_peer1);
pthread_cancel(thread1);
pthread_cancel(thread2);
return retVal;
}

View file

@ -111,9 +111,11 @@ int test_resolver_remote_get() {
// put the server in the peer store and change our peer id so we think it is remote (hack for now) // put the server in the peer store and change our peer id so we think it is remote (hack for now)
strcpy(remote_peer_id, fs_repo->config->identity->peer_id); strcpy(remote_peer_id, fs_repo->config->identity->peer_id);
struct MultiAddress* remote_addr = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/4001"); char multiaddress_string[100];
sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", remote_peer_id);
struct MultiAddress* remote_addr = multiaddress_new_from_string(multiaddress_string);
struct Peerstore* peerstore = libp2p_peerstore_new(); struct Peerstore* peerstore = libp2p_peerstore_new();
struct Libp2pPeer* peer = libp2p_peer_new_from_data(remote_peer_id, strlen(remote_peer_id), remote_addr); struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(remote_addr);
libp2p_peerstore_add_peer(peerstore, peer); libp2p_peerstore_add_peer(peerstore, peer);
strcpy(fs_repo->config->identity->peer_id, "QmABCD"); strcpy(fs_repo->config->identity->peer_id, "QmABCD");

View file

@ -70,7 +70,6 @@ int test_routing_find_peer() {
local_node.routing->Bootstrap(local_node.routing); local_node.routing->Bootstrap(local_node.routing);
struct Libp2pPeer* result; struct Libp2pPeer* result;
struct Libp2pPeer* first_peer = ((struct PeerEntry*)local_node.peerstore->head_entry->item)->peer;
if (!local_node.routing->FindPeer(local_node.routing, peer_id_2, strlen(peer_id_2), &result)) if (!local_node.routing->FindPeer(local_node.routing, peer_id_2, strlen(peer_id_2), &result))
return 0; return 0;
@ -90,3 +89,113 @@ int test_routing_find_peer() {
return 1; return 1;
} }
int test_routing_find_providers() {
int retVal = 0;
// clean out repository
char* ipfs_path = "/tmp/test1";
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
char* peer_id_1;
char* peer_id_2;
struct FSRepo* fs_repo_2;
char* peer_id_3;
pthread_t thread1, thread2;
int thread1_started = 0, thread2_started = 0;
struct MultiAddress* ma_peer1;
// create peer 1
drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1);
char multiaddress_string[255];
sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1);
ma_peer1 = multiaddress_new_from_string(multiaddress_string);
// start the daemon in a separate thread
if (pthread_create(&thread1, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0) {
fprintf(stderr, "Unable to start thread 1\n");
goto exit;
}
thread1_started = 1;
// create peer 2
ipfs_path = "/tmp/test2";
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
struct Libp2pVector* ma_vector = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector, ma_peer1);
drop_and_build_repository(ipfs_path, 4002, ma_vector, &peer_id_2);
// add a file, to prime the connection to peer 1
//TODO: Find a better way to do this...
size_t bytes_written = 0;
ipfs_repo_fsrepo_new(ipfs_path, NULL, &fs_repo_2);
ipfs_repo_fsrepo_open(fs_repo_2);
struct Node* node = NULL;
ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, fs_repo_2, &bytes_written, 0);
ipfs_repo_fsrepo_free(fs_repo_2);
// start the daemon in a separate thread
if (pthread_create(&thread2, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0) {
fprintf(stderr, "Unable to start thread 2\n");
goto exit;
}
thread2_started = 1;
// wait for everything to start up
// JMJ debugging =
sleep(3);
// create my peer, peer 3
ipfs_path = "/tmp/test3";
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
drop_and_build_repository(ipfs_path, 4003, ma_vector, &peer_id_3);
struct FSRepo* fs_repo;
ipfs_repo_fsrepo_new(ipfs_path, NULL, &fs_repo);
ipfs_repo_fsrepo_open(fs_repo);
// We know peer 1, try to find peer 2
struct IpfsNode local_node;
local_node.mode = MODE_ONLINE;
local_node.peerstore = libp2p_peerstore_new();
local_node.providerstore = libp2p_providerstore_new();
local_node.repo = fs_repo;
local_node.identity = fs_repo->config->identity;
local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key, NULL);
local_node.routing->Bootstrap(local_node.routing);
struct Libp2pVector* result;
if (!local_node.routing->FindProviders(local_node.routing, node->hash, node->hash_size, &result)) {
fprintf(stderr, "Unable to find a provider\n");
goto exit;
}
if (result == NULL) {
fprintf(stderr, "Provider array is NULL\n");
goto exit;
}
// connect to peer 2
struct Libp2pPeer *remote_peer = NULL;
for(int i = 0; i < result->total; i++) {
remote_peer = libp2p_utils_vector_get(result, i);
if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(remote_peer)) {
break;
}
remote_peer = NULL;
}
if (remote_peer == NULL) {
fprintf(stderr, "Remote Peer is NULL\n");
goto exit;
}
fprintf(stderr, "Remote address is: %s\n", remote_peer->id);
retVal = 1;
exit:
if (fs_repo != NULL)
ipfs_repo_fsrepo_free(fs_repo);
if (thread1_started)
pthread_cancel(thread1);
if (thread2_started)
pthread_cancel(thread2);
return retVal;
}

View file

@ -98,7 +98,7 @@ int test_routing_supernode_get_remote_value() {
goto exit; goto exit;
// ask the network who can provide this // ask the network who can provide this
if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)hash, hash_size, &multiaddresses)) if (!ipfs_node->routing->FindProviders(ipfs_node->routing, hash, hash_size, &multiaddresses))
goto exit; goto exit;
// get the file // get the file
@ -203,7 +203,7 @@ int test_routing_supernode_get_value() {
goto exit; goto exit;
// ask the network who can provide this // ask the network who can provide this
if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)write_node->hash, write_node->hash_size, &multiaddresses)) if (!ipfs_node->routing->FindProviders(ipfs_node->routing, write_node->hash, write_node->hash_size, &multiaddresses))
goto exit; goto exit;
struct MultiAddress* addr = NULL; struct MultiAddress* addr = NULL;

View file

@ -16,6 +16,7 @@
#include "storage/test_blocks.h" #include "storage/test_blocks.h"
#include "storage/test_unixfs.h" #include "storage/test_unixfs.h"
#include "core/test_ping.h" #include "core/test_ping.h"
#include "core/test_null.h"
int testit(const char* name, int (*func)(void)) { int testit(const char* name, int (*func)(void)) {
printf("Testing %s...\n", name); printf("Testing %s...\n", name);
@ -58,13 +59,15 @@ const char* names[] = {
"test_merkledag_add_node", "test_merkledag_add_node",
"test_merkledag_add_node_with_links", "test_merkledag_add_node_with_links",
"test_resolver_get", "test_resolver_get",
"test_routing_find_peer",
"test_routing_find_providers",
"test_routing_supernode_get_value", "test_routing_supernode_get_value",
"test_routing_supernode_get_remote_value", "test_routing_supernode_get_remote_value",
"test_routing_find_peer",
"test_unixfs_encode_decode", "test_unixfs_encode_decode",
"test_unixfs_encode_smallfile", "test_unixfs_encode_smallfile",
"test_ping", "test_ping",
"test_ping_remote", "test_ping_remote",
"test_null_add_provider",
"test_resolver_remote_get" "test_resolver_remote_get"
}; };
@ -99,13 +102,15 @@ int (*funcs[])(void) = {
test_merkledag_add_node, test_merkledag_add_node,
test_merkledag_add_node_with_links, test_merkledag_add_node_with_links,
test_resolver_get, test_resolver_get,
test_routing_find_peer,
test_routing_find_providers,
test_routing_supernode_get_value, test_routing_supernode_get_value,
test_routing_supernode_get_remote_value, test_routing_supernode_get_remote_value,
test_routing_find_peer,
test_unixfs_encode_decode, test_unixfs_encode_decode,
test_unixfs_encode_smallfile, test_unixfs_encode_smallfile,
test_ping, test_ping,
test_ping_remote, test_ping_remote,
test_null_add_provider,
test_resolver_remote_get test_resolver_remote_get
}; };