From cfcabaecd0a25f815f9cc1f5095edf9f52b094c7 Mon Sep 17 00:00:00 2001 From: John Jones Date: Sun, 19 Mar 2017 14:40:16 -0500 Subject: [PATCH] Finishing NodeIO --- core/null.c | 63 ++++++++++++++++++++----- include/ipfs/core/ipfs_node.h | 2 + include/ipfs/routing/routing.h | 2 +- routing/k_routing.c | 38 ++++++++++++--- routing/offline.c | 2 +- routing/online.c | 2 +- test/routing/test_supernode.h | 84 +++++++++++++++++++++------------- test/testit.c | 2 + 8 files changed, 143 insertions(+), 52 deletions(-) diff --git a/core/null.c b/core/null.c index 75f8e64..489bd93 100644 --- a/core/null.c +++ b/core/null.c @@ -11,6 +11,9 @@ #include "ipfs/routing/routing.h" #include "ipfs/core/ipfs_node.h" #include "libp2p/secio/secio.h" +#include "libp2p/nodeio/nodeio.h" +#include "ipfs/merkledag/merkledag.h" +#include "ipfs/merkledag/node.h" #define BUF_SIZE 4096 @@ -19,6 +22,11 @@ int ipfs_null_requesting_secio(unsigned char* buffer, size_t buffer_size) { return 1; return 0; } +int ipfs_null_requesting_nodeio(unsigned char* buffer, size_t buffer_size) { + if (buffer_size > 5 && strncmp((char*)buffer, "/nodeio", 7) == 0) + return 1; + return 0; +} /** * We've received a connection. Find out what they want */ @@ -32,26 +40,57 @@ void *ipfs_null_connection (void *ptr) // TODO: multistream + secio + message. // TODO: when should we exit the for loop and disconnect? - struct SessionContext secure_session; - secure_session.insecure_stream = libp2p_net_multistream_stream_new(connection_param->socket); - secure_session.default_stream = secure_session.insecure_stream; + struct SessionContext session; + session.insecure_stream = libp2p_net_multistream_stream_new(connection_param->socket); + session.default_stream = session.insecure_stream; fprintf(stderr, "Connection %d, count %d\n", connection_param->socket, *(connection_param->count)); - if (libp2p_net_multistream_negotiate(secure_session.insecure_stream)) { - routing = ipfs_routing_new_online(connection_param->local_node, &connection_param->local_node->identity->private_key, secure_session.insecure_stream); + if (libp2p_net_multistream_negotiate(session.insecure_stream)) { + routing = ipfs_routing_new_online(connection_param->local_node, &connection_param->local_node->identity->private_key, session.insecure_stream); for(;;) { // check if they're looking for an upgrade (i.e. secio) unsigned char* results = NULL; size_t bytes_read; - secure_session.default_stream->read(&secure_session, &results, &bytes_read); + session.default_stream->read(&session, &results, &bytes_read); if (ipfs_null_requesting_secio(results, bytes_read)) { - if (!libp2p_secio_handshake(&secure_session, &connection_param->local_node->identity->private_key, 1)) { + if (!libp2p_secio_handshake(&session, &connection_param->local_node->identity->private_key, 1)) { // rejecting connection break; } - } else { + } else if (ipfs_null_requesting_nodeio(results, bytes_read)) { + if (!libp2p_nodeio_handshake(&session)) + break; + // loop through file requests + int _continue = 1; + while(_continue) { + unsigned char* hash; + size_t hash_length = 0; + _continue = session.default_stream->read(&session, &hash, &hash_length); + if (hash_length < 20) { + _continue = 0; + continue; + } + else { + // try to get the Node + struct Node* node = NULL; + if (!ipfs_merkledag_get(hash, hash_length, &node, connection_param->local_node->repo)) { + _continue = 0; + continue; + } + size_t results_size = ipfs_node_protobuf_encode_size(node); + unsigned char results[results_size]; + if (!ipfs_node_protobuf_encode(node, results, results_size, &results_size)) { + _continue = 0; + continue; + } + // send it to the requestor + session.default_stream->write(&session, results, results_size); + } + } + } + else { struct Libp2pMessage* msg = NULL; libp2p_message_protobuf_decode(results, bytes_read, &msg); if (msg != NULL) { @@ -65,9 +104,9 @@ void *ipfs_null_connection (void *ptr) routing->GetValue(routing, msg->key, msg->key_size, (void**)&val, &val_size); if (val == NULL) { // write a 0 to the stream to tell the client we couldn't find it. - secure_session.default_stream->write(&secure_session, 0, 1); + session.default_stream->write(&session, 0, 1); } else { - secure_session.default_stream->write(&secure_session, val, val_size); + session.default_stream->write(&session, val, val_size); } break; } @@ -94,8 +133,8 @@ void *ipfs_null_connection (void *ptr) } */ - if (secure_session.default_stream != NULL) { - secure_session.default_stream->close(&secure_session); + if (session.default_stream != NULL) { + session.default_stream->close(&session); } (*(connection_param->count))--; // update counter. free (connection_param); diff --git a/include/ipfs/core/ipfs_node.h b/include/ipfs/core/ipfs_node.h index 7d190f4..6255234 100644 --- a/include/ipfs/core/ipfs_node.h +++ b/include/ipfs/core/ipfs_node.h @@ -3,6 +3,7 @@ #include "ipfs/repo/config/identity.h" #include "ipfs/repo/fsrepo/fs_repo.h" #include "libp2p/peer/peerstore.h" +#include "libp2p/peer/providerstore.h" enum NodeMode { MODE_OFFLINE, MODE_ONLINE }; @@ -11,6 +12,7 @@ struct IpfsNode { struct Identity* identity; struct FSRepo* repo; struct Peerstore* peerstore; + struct ProviderStore* providerstore; struct s_ipfs_routing* routing; //struct Pinner pinning; // an interface //struct Mount** mounts; diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index 1351036..cf848bc 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -40,7 +40,7 @@ struct s_ipfs_routing { * @param 4 the information found * @param 5 the size of the information found */ - int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); + int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, struct Libp2pVector** multiaddresses); /** * Find a peer * @param 1 the context diff --git a/routing/k_routing.c b/routing/k_routing.c index f9e7f24..bec9a78 100644 --- a/routing/k_routing.c +++ b/routing/k_routing.c @@ -1,5 +1,7 @@ #include "ipfs/routing/routing.h" #include "libp2p/routing/kademlia.h" +#include "libp2p/peer/providerstore.h" +#include "libp2p/utils/vector.h" /** * Routing using Kademlia and DHT @@ -41,11 +43,30 @@ int ipfs_routing_kademlia_get_value(struct s_ipfs_routing* routing, char* key, s * @param results_size the size of the results buffer * @returns true(1) on success, otherwise false(0) */ -int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* key, size_t key_size, void* results, size_t* results_size) { +int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* key, size_t key_size, struct Libp2pVector** results) { + *results = libp2p_utils_vector_new(1); + struct Libp2pVector* vector = *results; // see if I can provide it - // add my multiaddress if I can - // get a list of providers that are closer - return 0; + unsigned char* peer_id = NULL; + int peer_id_size = 0; + if (libp2p_providerstore_get(routing->local_node->providerstore, (unsigned char*)key, key_size, &peer_id, &peer_id_size)) { + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(routing->local_node->peerstore, peer_id, peer_id_size); + struct Libp2pLinkedList* current = peer->addr_head; + while (current != NULL) { + struct MultiAddress* ma = (struct MultiAddress*)current->item; + if (multiaddress_is_ip(ma)) { + libp2p_utils_vector_add(vector, ma); + } + current = current->next; + } + } + //TODO: get a list of providers that are closer + if (vector->total == 0) { + libp2p_utils_vector_free(vector); + vector = NULL; + return 0; + } + return 1; } /** @@ -54,8 +75,13 @@ int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* k int ipfs_routing_kademlia_find_peer(struct s_ipfs_routing* routing, char* param1, size_t param2, void* param3, size_t* param4) { return 0; } -int ipfs_routing_kademlia_provide(struct s_ipfs_routing* routing, char* param1, size_t param2) { - return 0; +int ipfs_routing_kademlia_provide(struct s_ipfs_routing* routing, char* key, size_t key_size) { + //TODO: Announce to the network that I can provide this file + // save in a cache + // store key and address in cache. Key is the hash, peer id is the value + libp2p_providerstore_add(routing->local_node->providerstore, (unsigned char*)key, key_size, (unsigned char*)routing->local_node->identity->peer_id, strlen(routing->local_node->identity->peer_id)); + + return 1; } // declared here so as to have the code in 1 place diff --git a/routing/offline.c b/routing/offline.c index 7e83e9e..a98626f 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -54,7 +54,7 @@ int ipfs_routing_generic_get_value (ipfs_routing* routing, char *key, size_t key return 0; } -int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, void *ret, size_t *rlen) +int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, struct Libp2pVector** multiaddresses) { return ErrOffline; } diff --git a/routing/online.c b/routing/online.c index 9d30a7d..6ae24df 100644 --- a/routing/online.c +++ b/routing/online.c @@ -8,7 +8,7 @@ * Implements the routing interface for network clients */ -int ipfs_routing_online_find_providers(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { +int ipfs_routing_online_find_providers(struct s_ipfs_routing* routing, char* val1, size_t val2, struct Libp2pVector** multiaddresses) { return 0; } int ipfs_routing_online_find_peer(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { diff --git a/test/routing/test_supernode.h b/test/routing/test_supernode.h index ec7b6d2..3680294 100644 --- a/test/routing/test_supernode.h +++ b/test/routing/test_supernode.h @@ -3,6 +3,12 @@ #include "../test_helper.h" #include "ipfs/routing/routing.h" #include "ipfs/repo/fsrepo/fs_repo.h" +#include "libp2p/net/multistream.h" +#include "libp2p/nodeio/nodeio.h" +#include "libp2p/utils/vector.h" +#include "libp2p/utils/linked_list.h" +#include "libp2p/peer/peerstore.h" +#include "libp2p/peer/providerstore.h" void stop_kademlia(void); @@ -33,6 +39,12 @@ int test_routing_supernode_start() { return retVal; } +void* start_daemon(void* path) { + char* repo_path = (char*)path; + ipfs_daemon_start(repo_path); + return NULL; +} + int test_routing_supernode_get_value() { int retVal = 0; struct FSRepo* fs_repo = NULL; @@ -40,84 +52,94 @@ int test_routing_supernode_get_value() { struct Stream* stream = NULL; int file_size = 1000; unsigned char bytes[file_size]; - char* fileName = "temp_file.bin"; + //char* fileName = "temp_file.bin"; char* fullFileName = "/tmp/temp_file.bin"; struct Node* write_node = NULL; size_t bytes_written = 0; - unsigned char base58Hash[100]; - size_t results_size = 2048; - char results_buffer[results_size]; + struct Libp2pVector* multiaddresses; + unsigned char* results; + size_t results_size = 0; + struct Node* node = NULL; + char* ip = NULL; if (!drop_build_and_open_repo("/tmp/.ipfs", &fs_repo)) goto exit; + // start daemon + pthread_t thread; + pthread_create(&thread, NULL, start_daemon, (void*)"/tmp/.ipfs"); + ipfs_node = (struct IpfsNode*)malloc(sizeof(struct IpfsNode)); ipfs_node->mode = MODE_ONLINE; ipfs_node->identity = fs_repo->config->identity; ipfs_node->repo = fs_repo; + ipfs_node->providerstore = libp2p_providerstore_new(); + ipfs_node->peerstore = libp2p_peerstore_new(); + struct Libp2pPeer this_peer; + this_peer.id = fs_repo->config->identity->peer_id; + this_peer.id_size = strlen(fs_repo->config->identity->peer_id); + this_peer.addr_head = libp2p_utils_linked_list_new(); + this_peer.addr_head->item = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/4001"); + libp2p_peerstore_add_peer(ipfs_node->peerstore, &this_peer); ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key, stream); if (ipfs_node->routing == NULL) goto exit; + // start listening + // create a file create_bytes(&bytes[0], file_size); create_file(fullFileName, bytes, file_size); // write to ipfs - if (ipfs_import_file("/tmp", fileName, &write_node, fs_repo, &bytes_written, 1) == 0) { + if (ipfs_import_file("/tmp", fullFileName, &write_node, fs_repo, &bytes_written, 1) == 0) { goto exit; } - if (!ipfs_node->routing->Provide(ipfs_node->routing, (char*)write_node->data, write_node->data_size)) - goto exit; - // write_node->hash has the base32 key of the file. Convert this to a base58. - if (!ipfs_cid_hash_to_base58(write_node->hash, write_node->hash_size, base58Hash, 100)) + // TODO: announce to network that this can be provided + if (!ipfs_node->routing->Provide(ipfs_node->routing, (char*)write_node->hash, write_node->hash_size)) goto exit; // ask the network who can provide this - if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)base58Hash, 100, &results_buffer[0], &results_size)) + if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)write_node->hash, write_node->hash_size, &multiaddresses)) goto exit; - // Q: What should FindProviders have in the results buffer? A: A struct of: - // 20 byte id - // 4 byte (or 16 byte for ip6) ip address - // 2 byte port number - // that means we have to attempt a connection and ask for peer ID - // TODO: Q: How do we determine ip4 vs ip6? - - struct Libp2pLinkedList* multiaddress_head; - // get an IP4 ip and port - if (!ipfs_routing_supernode_parse_provider(results_buffer, results_size, &multiaddress_head)) - goto exit; - - struct Libp2pLinkedList* current_address = multiaddress_head; struct MultiAddress* addr = NULL; - while (current_address != NULL) { - addr = (struct Multiaddress*)current_address->item; - if (multiaddress_is_ip4(addr)) + for(int i = 0; i < multiaddresses->total; i++) { + addr = (struct MultiAddress*) libp2p_utils_vector_get(multiaddresses, i); + if (multiaddress_is_ip4(addr)) { break; + } addr = NULL; - current_address = current_address->next; } if (addr == NULL) goto exit; // Connect to server - char* ip; multiaddress_get_ip_address(addr, &ip); struct Stream* file_stream = libp2p_net_multistream_connect(ip, multiaddress_get_ip_port(addr)); + if (file_stream == NULL) + goto exit; + + struct SessionContext context; + context.insecure_stream = file_stream; + context.default_stream = file_stream; // Switch from multistream to NodeIO - if (!libp2p_nodeio_upgrade_stream(file_stream)) + if (!libp2p_nodeio_upgrade_stream(&context)) goto exit; // Ask for file - struct Node* node = libp2p_nodeio_get(file_stream, base58Hash, 100); - if (node == NULL) + if (!libp2p_nodeio_get(&context, write_node->hash, write_node->hash_size, &results, &results_size)) goto exit; + if (!ipfs_node_protobuf_decode(results, results_size, &node)) + goto exit; + + //TODO: see if we got it + retVal = 1; exit: if (ipfs_node->routing != NULL) diff --git a/test/testit.c b/test/testit.c index 1a2a7b2..a38e283 100644 --- a/test/testit.c +++ b/test/testit.c @@ -57,6 +57,7 @@ const char* names[] = { "test_merkledag_add_node", "test_merkledag_add_node_with_links", "test_resolver_get", + "test_routing_supernode_get_value", "test_unixfs_encode_decode", "test_unixfs_encode_smallfile", "test_ping", @@ -94,6 +95,7 @@ int (*funcs[])(void) = { test_merkledag_add_node, test_merkledag_add_node_with_links, test_resolver_get, + test_routing_supernode_get_value, test_unixfs_encode_decode, test_unixfs_encode_smallfile, test_ping,