Attempting to bitswap from previously unknown node

This commit is contained in:
jmjatlanta 2017-09-27 10:05:17 -05:00
parent f0a53f2753
commit 76b860c06f
7 changed files with 203 additions and 110 deletions

View file

@ -271,64 +271,6 @@ size_t boundary_size(char *str, char *boundary, size_t limit)
return 0; return 0;
} }
/**
* function to find and read the object.
* @param path is the ipfs address, obj is a pointer to be allocated and will be the return of the data, size is a pointer to return the data length.
* @returns 1 when success is 0 when failure.
*/
int get_object(struct IpfsNode* local_node, char *path, unsigned char **obj, size_t *size)
{
FILE* memstream_file = NULL;
char* memstream_char = NULL;
size_t memstream_size = 0;
struct Cid* cid = NULL;
// convert hash to cid
if ( ipfs_cid_decode_hash_from_base58((unsigned char*)path, strlen(path), &cid) == 0) {
return 0;
}
// find block
struct HashtableNode* read_node = NULL;
if (!ipfs_exporter_get_node(local_node, cid->hash, cid->hash_length, &read_node)) {
ipfs_cid_free(cid);
return 0;
}
// open a memory stream
memstream_file = open_memstream(&memstream_char, &memstream_size);
if (memstream_file == NULL) {
libp2p_logger_error("api", "get_object: Unable to open a memory stream.\n");
ipfs_cid_free(cid);
return 0;
}
// throw everything (including links) into the memory stream
ipfs_exporter_cat_node(read_node, local_node, memstream_file);
fclose(memstream_file);
// no longer need these
ipfs_cid_free(cid);
ipfs_hashtable_node_free(read_node);
*size = memstream_size;
*obj = (unsigned char*)memstream_char;
return 1;
}
/**
* send object data as an http response.
* @param socket, object pointer and size.
* @returns 1 when success is 0 when failure.
*/
int send_object(int socket, unsigned char *obj, size_t size)
{
// TODO: implement.
return 0; // fail.
}
struct ApiConnectionParam { struct ApiConnectionParam {
int index; int index;
struct IpfsNode* this_node; struct IpfsNode* this_node;

View file

@ -142,6 +142,68 @@ int ipfs_core_http_process_object(struct IpfsNode* local_node, struct HttpReques
return retVal; return retVal;
} }
/***
* process dht commands
* @param local_node the context
* @param request the request
* @param response where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_core_http_process_dht(struct IpfsNode* local_node, struct HttpRequest* request, char** response) {
int failedCount = 0;
if (strcmp(request->sub_command, "provide") == 0) {
// do a dht provide
for (int i = 0; i < request->arguments->total; i++) {
char* hash = (char*)libp2p_utils_vector_get(request->arguments, i);
struct Cid* cid;
if (!ipfs_cid_decode_hash_from_base58(hash, strlen(hash), &cid)) {
failedCount++;
continue;
}
if (!ipfs_routing_online_provide(local_node->routing, cid->hash, cid->hash_length)) {
failedCount++;
continue;
}
}
if (!failedCount) {
// complete success
// TODO: do the right thing
*response = (char*) malloc(1024);
snprintf(*response, 1024, "{\n\t\"ID\": \"<string>\"\n" \
"\t\"Type\": \"<int>\"\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\"\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t]\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
} else {
// at least some failed
// TODO: do the right thing
*response = (char*) malloc(1024);
snprintf(*response, 1024, "{\n\t\"ID\": \"<string>\",\n" \
"\t\"Type\": \"<int>\",\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\",\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t],\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
}
}
return failedCount < request->arguments->total;
}
/*** /***
* Process the parameters passed in from an http request * Process the parameters passed in from an http request
* @param local_node the context * @param local_node the context
@ -158,6 +220,8 @@ int ipfs_core_http_request_process(struct IpfsNode* local_node, struct HttpReque
retVal = ipfs_core_http_process_name(local_node, request, response); retVal = ipfs_core_http_process_name(local_node, request, response);
} else if (strcmp(request->command, "object") == 0) { } else if (strcmp(request->command, "object") == 0) {
retVal = ipfs_core_http_process_object(local_node, request, response); retVal = ipfs_core_http_process_object(local_node, request, response);
} else if (strcmp(request->command, "dht") == 0) {
retVal = ipfs_core_http_process_dht(local_node, request, response);
} }
return retVal; return retVal;
} }

View file

@ -3,7 +3,9 @@
#include <ipfs/routing/routing.h> #include <ipfs/routing/routing.h>
#include <ipfs/util/errs.h> #include <ipfs/util/errs.h>
#include "libp2p/crypto/rsa.h" #include "libp2p/crypto/rsa.h"
#include "libp2p/utils/logger.h"
#include "libp2p/record/record.h" #include "libp2p/record/record.h"
#include "ipfs/core/http_request.h"
#include "ipfs/datastore/ds_helper.h" #include "ipfs/datastore/ds_helper.h"
#include "ipfs/merkledag/merkledag.h" #include "ipfs/merkledag/merkledag.h"
#include "ipfs/routing/routing.h" #include "ipfs/routing/routing.h"
@ -86,9 +88,38 @@ int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, const unsigned
return ErrOffline; return ErrOffline;
} }
int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, const unsigned char *cid, size_t cid_size) /**
* Attempt to publish that this node can provide a value
* @param offlineRouting the context
* @param incoming_hash the hash (in binary form)
* @param incoming_hash_size the length of the hash array
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, const unsigned char *incoming_hash, size_t incoming_hash_size)
{ {
return ErrOffline; if (offlineRouting->local_node->mode == MODE_API_AVAILABLE) {
//TODO: publish this through the api
unsigned char buffer[256];
if (!ipfs_cid_hash_to_base58(incoming_hash, incoming_hash_size, &buffer[0], 256)) {
libp2p_logger_error("offline", "Unable to convert hash to its Base58 representation.\n");
return 0;
}
char* response;
struct HttpRequest* request = ipfs_core_http_request_new();
request->command = "dht";
request->sub_command = "provide";
request->arguments = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(request->arguments, buffer);
if (!ipfs_core_http_request_get(offlineRouting->local_node, request, &response)) {
libp2p_logger_error("offline", "Unable to call API for dht publish.\n");
return 0;
}
fprintf(stdout, "%s", response);
return 1;
}
return 0;
} }
int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pPeer* peer) int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pPeer* peer)

View file

@ -448,8 +448,17 @@ int test_routing_provide() {
*/ */
int test_routing_retrieve_file_third_party() { int test_routing_retrieve_file_third_party() {
int retVal = 0; int retVal = 0;
char* ipfs_path_1 = "/tmp/ipfs_1", *ipfs_path_2 = "/tmp/ipfs_2", *ipfs_path_3 = "/tmp/ipfs_3";
char* peer_id_1 = NULL, *peer_id_2 = NULL, *peer_id_3 = NULL;
struct IpfsNode* ipfs_node2 = NULL, *ipfs_node3 = NULL;
pthread_t thread1, thread2, thread3;
int thread1_started = 0, thread2_started = 0, thread3_started = 0;
struct MultiAddress* ma_peer1 = NULL;
struct Libp2pVector* ma_vector2 = NULL, *ma_vector3 = NULL;
struct HashtableNode* node = NULL, *result_node = NULL;
char multiaddress_string[255] = "";
char hash[256] = "";
/*
libp2p_logger_add_class("online"); libp2p_logger_add_class("online");
libp2p_logger_add_class("multistream"); libp2p_logger_add_class("multistream");
libp2p_logger_add_class("null"); libp2p_logger_add_class("null");
@ -459,74 +468,70 @@ int test_routing_retrieve_file_third_party() {
libp2p_logger_add_class("exporter"); libp2p_logger_add_class("exporter");
libp2p_logger_add_class("peer"); libp2p_logger_add_class("peer");
libp2p_logger_add_class("test_routing"); libp2p_logger_add_class("test_routing");
*/
// clean out repository // clean out repository
char* ipfs_path = "/tmp/test1";
char* peer_id_1 = NULL, *peer_id_2 = NULL, *peer_id_3 = NULL;
struct IpfsNode* ipfs_node2 = NULL, *ipfs_node3 = NULL;
pthread_t thread1, thread2;
int thread1_started = 0, thread2_started = 0;
struct MultiAddress* ma_peer1 = NULL;
struct Libp2pVector* ma_vector2 = NULL, *ma_vector3 = NULL;
struct HashtableNode* node = NULL, *result_node = NULL;
// create peer 1 // create peer 1
drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1); if (!drop_and_build_repository(ipfs_path_1, 4001, NULL, &peer_id_1))
char multiaddress_string[255]; goto exit;
sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1); sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1);
ma_peer1 = multiaddress_new_from_string(multiaddress_string); ma_peer1 = multiaddress_new_from_string(multiaddress_string);
// start the daemon in a separate thread // start the daemon in a separate thread
libp2p_logger_debug("test_routing", "Firing up daemon 1.\n"); libp2p_logger_debug("test_routing", "Firing up daemon 1.\n");
if (pthread_create(&thread1, NULL, test_daemon_start, (void*)ipfs_path) < 0) { if (pthread_create(&thread1, NULL, test_daemon_start, (void*)ipfs_path_1) < 0) {
fprintf(stderr, "Unable to start thread 1\n"); libp2p_logger_error("test_routing", "Unable to start thread 1\n");
goto exit; goto exit;
} }
thread1_started = 1; thread1_started = 1;
// wait for everything to start up // wait for everything to start up
// JMJ debugging =
sleep(3); sleep(3);
// create peer 2 // create peer 2, that will host the file
ipfs_path = "/tmp/test2";
// create a vector to hold peer1's multiaddress so we can connect as a peer // create a vector to hold peer1's multiaddress so we can connect as a peer
ma_vector2 = libp2p_utils_vector_new(1); ma_vector2 = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector2, ma_peer1); libp2p_utils_vector_add(ma_vector2, ma_peer1);
// note: this destroys some things, as it frees the fs_repo_3: // note: this destroys some things, as it frees the fs_repo:
drop_and_build_repository(ipfs_path, 4002, ma_vector2, &peer_id_2); drop_and_build_repository(ipfs_path_2, 4002, ma_vector2, &peer_id_2);
multiaddress_free(ma_peer1); multiaddress_free(ma_peer1);
// add a file, to prime the connection to peer 1
//TODO: Find a better way to do this...
size_t bytes_written = 0;
if (!ipfs_node_online_new(ipfs_path, &ipfs_node2))
goto exit;
ipfs_node2->routing->Bootstrap(ipfs_node2->routing);
ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, ipfs_node2, &bytes_written, 0);
ipfs_node_free(ipfs_node2);
// start the daemon in a separate thread // start the daemon in a separate thread
libp2p_logger_debug("test_routing", "Firing up daemon 2.\n"); libp2p_logger_debug("test_routing", "Firing up daemon 2.\n");
if (pthread_create(&thread2, NULL, test_daemon_start, (void*)ipfs_path) < 0) { if (pthread_create(&thread2, NULL, test_daemon_start, (void*)ipfs_path_2) < 0) {
fprintf(stderr, "Unable to start thread 2\n"); libp2p_logger_error("test_routing", "Unable to start thread 2.\n");
goto exit; goto exit;
} }
thread2_started = 1; thread2_started = 1;
//TODO: add a file to server 2
uint8_t *bytes = (unsigned char*)"hello, world!\n";
char* filename = "test1.txt";
create_file(filename, bytes, strlen((char*)bytes));
size_t bytes_written;
ipfs_node_offline_new(ipfs_path_2, &ipfs_node2);
ipfs_import_file(NULL, filename, &node, ipfs_node2, &bytes_written, 0);
memset(hash, 0, 256);
ipfs_cid_hash_to_base58(node->hash, node->hash_size, (unsigned char*)hash, 256);
libp2p_logger_debug("test_api", "Inserted file with hash %s.\n", hash);
ipfs_node_free(ipfs_node2);
// wait for everything to start up // wait for everything to start up
// JMJ debugging =
sleep(3); sleep(3);
libp2p_logger_debug("test_routing", "Firing up the 3rd client\n");
// create my peer, peer 3 // create my peer, peer 3
ipfs_path = "/tmp/test3";
ma_peer1 = multiaddress_new_from_string(multiaddress_string); ma_peer1 = multiaddress_new_from_string(multiaddress_string);
ma_vector3 = libp2p_utils_vector_new(1); ma_vector3 = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(ma_vector3, ma_peer1); libp2p_utils_vector_add(ma_vector3, ma_peer1);
drop_and_build_repository(ipfs_path, 4003, ma_vector3, &peer_id_3); drop_and_build_repository(ipfs_path_3, 4003, ma_vector3, &peer_id_3);
multiaddress_free(ma_peer1); multiaddress_free(ma_peer1);
ipfs_node_online_new(ipfs_path, &ipfs_node3); libp2p_logger_debug("test_routing", "Firing up daemon 2.\n");
if (pthread_create(&thread3, NULL, test_daemon_start, (void*)ipfs_path_3) < 0) {
libp2p_logger_error("test_routing", "Unable to start thread 3.\n");
goto exit;
}
thread3_started = 1;
ipfs_node3->routing->Bootstrap(ipfs_node3->routing); //now have peer 3 ask for a file that is on peer 2, but peer 3 only knows of peer 1
ipfs_node_offline_new(ipfs_path_3, &ipfs_node3);
if (!ipfs_exporter_get_node(ipfs_node3, node->hash, node->hash_size, &result_node)) { if (!ipfs_exporter_get_node(ipfs_node3, node->hash, node->hash_size, &result_node)) {
fprintf(stderr, "Get_Node returned false\n"); fprintf(stderr, "Get_Node returned false\n");
@ -550,6 +555,8 @@ int test_routing_retrieve_file_third_party() {
pthread_join(thread1, NULL); pthread_join(thread1, NULL);
if (thread2_started) if (thread2_started)
pthread_join(thread2, NULL); pthread_join(thread2, NULL);
if (thread3_started)
pthread_join(thread3, NULL);
if (ipfs_node3 != NULL) if (ipfs_node3 != NULL)
ipfs_node_free(ipfs_node3); ipfs_node_free(ipfs_node3);
if (peer_id_1 != NULL) if (peer_id_1 != NULL)
@ -558,12 +565,10 @@ int test_routing_retrieve_file_third_party() {
free(peer_id_2); free(peer_id_2);
if (peer_id_3 != NULL) if (peer_id_3 != NULL)
free(peer_id_3); free(peer_id_3);
if (ma_vector2 != NULL) { if (ma_vector2 != NULL)
libp2p_utils_vector_free(ma_vector2); libp2p_utils_vector_free(ma_vector2);
} if (ma_vector3 != NULL)
if (ma_vector3 != NULL) {
libp2p_utils_vector_free(ma_vector3); libp2p_utils_vector_free(ma_vector3);
}
if (node != NULL) if (node != NULL)
ipfs_hashtable_node_free(node); ipfs_hashtable_node_free(node);
if (result_node != NULL) if (result_node != NULL)

View file

@ -4,3 +4,4 @@
./run_test.sh test_2.sh ./run_test.sh test_2.sh
./run_test.sh test_3.sh ./run_test.sh test_3.sh
./run_test.sh test_4.sh ./run_test.sh test_4.sh
./run_test.sh test_5.sh

49
test/scripts/test_5.sh Executable file
View file

@ -0,0 +1,49 @@
#!/bin/bash
####
# Attempt to start 2 deamons and have an api client of server A ask for a file from server B
####
source ./test_helpers.sh
IPFS1="../../main/ipfs --config /tmp/ipfs_1"
IPFS2="../../main/ipfs --config /tmp/ipfs_2"
function pre {
post
eval "$IPFS1" init;
check_failure "pre" $?
cp ../config.test1.wo_journal /tmp/ipfs_1/config
eval "$IPFS2" init;
check_failure "pre ipfs2" $?
cp ../config.test2.wo_journal /tmp/ipfs_2/config
}
function post {
rm -Rf /tmp/ipfs_1;
rm -Rf /tmp/ipfs_2;
rm hello.txt;
}
function body {
create_hello_world;
eval "$IPFS1" add hello.txt
check_failure "add hello.txt" $?
#start the daemons
eval "../../main/ipfs --config /tmp/ipfs_1 daemon &"
daemon_id_1=$!
eval "../../main/ipfs --config /tmp/ipfs_2 daemon &"
daemon_id_2=$!
sleep 5
#A client of server 2 wants the file at server 1
eval "$IPFS2" cat QmYAXgX8ARiriupMQsbGXtKdDyGzWry1YV3sycKw1qqmgH
check_failure "cat" $?
kill -9 $daemon_id_1
kill -9 $daemon_id_2
}

View file

@ -88,23 +88,24 @@ const char* names[] = {
"test_merkledag_get_data", "test_merkledag_get_data",
"test_merkledag_add_node", "test_merkledag_add_node",
"test_merkledag_add_node_with_links", "test_merkledag_add_node_with_links",
// 50 below
"test_namesys_publisher_publish", "test_namesys_publisher_publish",
"test_namesys_resolver_resolve", "test_namesys_resolver_resolve",
"test_resolver_get", "test_resolver_get",
"test_routing_find_peer", "test_routing_find_peer",
"test_routing_provide" /*, "test_routing_provide",
"test_routing_find_providers", "test_routing_find_providers",
"test_routing_supernode_get_value", "test_routing_supernode_get_value",
"test_routing_supernode_get_remote_value", "test_routing_supernode_get_remote_value",
"test_routing_retrieve_file_third_party", "test_routing_retrieve_file_third_party",
"test_routing_retrieve_large_file", "test_routing_retrieve_large_file",
// 60 below
"test_unixfs_encode_decode", "test_unixfs_encode_decode",
"test_unixfs_encode_smallfile", "test_unixfs_encode_smallfile",
"test_ping", "test_ping",
"test_ping_remote", "test_ping_remote",
"test_null_add_provider", "test_null_add_provider",
"test_resolver_remote_get" "test_resolver_remote_get"
*/
}; };
int (*funcs[])(void) = { int (*funcs[])(void) = {
@ -157,24 +158,24 @@ int (*funcs[])(void) = {
test_merkledag_get_data, test_merkledag_get_data,
test_merkledag_add_node, test_merkledag_add_node,
test_merkledag_add_node_with_links, test_merkledag_add_node_with_links,
// 50 below
test_namesys_publisher_publish, test_namesys_publisher_publish,
test_namesys_resolver_resolve, test_namesys_resolver_resolve,
test_resolver_get, test_resolver_get,
test_routing_find_peer, test_routing_find_peer,
test_routing_provide /*,
test_routing_find_providers,
test_routing_provide, test_routing_provide,
test_routing_find_providers,
test_routing_supernode_get_value, test_routing_supernode_get_value,
test_routing_supernode_get_remote_value, test_routing_supernode_get_remote_value,
test_routing_retrieve_file_third_party, test_routing_retrieve_file_third_party,
test_routing_retrieve_large_file, test_routing_retrieve_large_file,
// 60 below
test_unixfs_encode_decode, test_unixfs_encode_decode,
test_unixfs_encode_smallfile, test_unixfs_encode_smallfile,
test_ping, test_ping,
test_ping_remote, test_ping_remote,
test_null_add_provider, test_null_add_provider,
test_resolver_remote_get test_resolver_remote_get
*/
}; };
/** /**