1
0
Fork 0

Finishing NodeIO

yamux
John Jones 2017-03-19 14:40:16 -05:00
parent 0b238eb5ac
commit cfcabaecd0
8 changed files with 143 additions and 52 deletions

View File

@ -11,6 +11,9 @@
#include "ipfs/routing/routing.h" #include "ipfs/routing/routing.h"
#include "ipfs/core/ipfs_node.h" #include "ipfs/core/ipfs_node.h"
#include "libp2p/secio/secio.h" #include "libp2p/secio/secio.h"
#include "libp2p/nodeio/nodeio.h"
#include "ipfs/merkledag/merkledag.h"
#include "ipfs/merkledag/node.h"
#define BUF_SIZE 4096 #define BUF_SIZE 4096
@ -19,6 +22,11 @@ int ipfs_null_requesting_secio(unsigned char* buffer, size_t buffer_size) {
return 1; return 1;
return 0; return 0;
} }
int ipfs_null_requesting_nodeio(unsigned char* buffer, size_t buffer_size) {
if (buffer_size > 5 && strncmp((char*)buffer, "/nodeio", 7) == 0)
return 1;
return 0;
}
/** /**
* We've received a connection. Find out what they want * We've received a connection. Find out what they want
*/ */
@ -32,26 +40,57 @@ void *ipfs_null_connection (void *ptr)
// TODO: multistream + secio + message. // TODO: multistream + secio + message.
// TODO: when should we exit the for loop and disconnect? // TODO: when should we exit the for loop and disconnect?
struct SessionContext secure_session; struct SessionContext session;
secure_session.insecure_stream = libp2p_net_multistream_stream_new(connection_param->socket); session.insecure_stream = libp2p_net_multistream_stream_new(connection_param->socket);
secure_session.default_stream = secure_session.insecure_stream; session.default_stream = session.insecure_stream;
fprintf(stderr, "Connection %d, count %d\n", connection_param->socket, *(connection_param->count)); fprintf(stderr, "Connection %d, count %d\n", connection_param->socket, *(connection_param->count));
if (libp2p_net_multistream_negotiate(secure_session.insecure_stream)) { if (libp2p_net_multistream_negotiate(session.insecure_stream)) {
routing = ipfs_routing_new_online(connection_param->local_node, &connection_param->local_node->identity->private_key, secure_session.insecure_stream); routing = ipfs_routing_new_online(connection_param->local_node, &connection_param->local_node->identity->private_key, session.insecure_stream);
for(;;) { for(;;) {
// check if they're looking for an upgrade (i.e. secio) // check if they're looking for an upgrade (i.e. secio)
unsigned char* results = NULL; unsigned char* results = NULL;
size_t bytes_read; size_t bytes_read;
secure_session.default_stream->read(&secure_session, &results, &bytes_read); session.default_stream->read(&session, &results, &bytes_read);
if (ipfs_null_requesting_secio(results, bytes_read)) { if (ipfs_null_requesting_secio(results, bytes_read)) {
if (!libp2p_secio_handshake(&secure_session, &connection_param->local_node->identity->private_key, 1)) { if (!libp2p_secio_handshake(&session, &connection_param->local_node->identity->private_key, 1)) {
// rejecting connection // rejecting connection
break; break;
} }
} else { } else if (ipfs_null_requesting_nodeio(results, bytes_read)) {
if (!libp2p_nodeio_handshake(&session))
break;
// loop through file requests
int _continue = 1;
while(_continue) {
unsigned char* hash;
size_t hash_length = 0;
_continue = session.default_stream->read(&session, &hash, &hash_length);
if (hash_length < 20) {
_continue = 0;
continue;
}
else {
// try to get the Node
struct Node* node = NULL;
if (!ipfs_merkledag_get(hash, hash_length, &node, connection_param->local_node->repo)) {
_continue = 0;
continue;
}
size_t results_size = ipfs_node_protobuf_encode_size(node);
unsigned char results[results_size];
if (!ipfs_node_protobuf_encode(node, results, results_size, &results_size)) {
_continue = 0;
continue;
}
// send it to the requestor
session.default_stream->write(&session, results, results_size);
}
}
}
else {
struct Libp2pMessage* msg = NULL; struct Libp2pMessage* msg = NULL;
libp2p_message_protobuf_decode(results, bytes_read, &msg); libp2p_message_protobuf_decode(results, bytes_read, &msg);
if (msg != NULL) { if (msg != NULL) {
@ -65,9 +104,9 @@ void *ipfs_null_connection (void *ptr)
routing->GetValue(routing, msg->key, msg->key_size, (void**)&val, &val_size); routing->GetValue(routing, msg->key, msg->key_size, (void**)&val, &val_size);
if (val == NULL) { if (val == NULL) {
// write a 0 to the stream to tell the client we couldn't find it. // write a 0 to the stream to tell the client we couldn't find it.
secure_session.default_stream->write(&secure_session, 0, 1); session.default_stream->write(&session, 0, 1);
} else { } else {
secure_session.default_stream->write(&secure_session, val, val_size); session.default_stream->write(&session, val, val_size);
} }
break; break;
} }
@ -94,8 +133,8 @@ void *ipfs_null_connection (void *ptr)
} }
*/ */
if (secure_session.default_stream != NULL) { if (session.default_stream != NULL) {
secure_session.default_stream->close(&secure_session); session.default_stream->close(&session);
} }
(*(connection_param->count))--; // update counter. (*(connection_param->count))--; // update counter.
free (connection_param); free (connection_param);

View File

@ -3,6 +3,7 @@
#include "ipfs/repo/config/identity.h" #include "ipfs/repo/config/identity.h"
#include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/repo/fsrepo/fs_repo.h"
#include "libp2p/peer/peerstore.h" #include "libp2p/peer/peerstore.h"
#include "libp2p/peer/providerstore.h"
enum NodeMode { MODE_OFFLINE, MODE_ONLINE }; enum NodeMode { MODE_OFFLINE, MODE_ONLINE };
@ -11,6 +12,7 @@ struct IpfsNode {
struct Identity* identity; struct Identity* identity;
struct FSRepo* repo; struct FSRepo* repo;
struct Peerstore* peerstore; struct Peerstore* peerstore;
struct ProviderStore* providerstore;
struct s_ipfs_routing* routing; struct s_ipfs_routing* routing;
//struct Pinner pinning; // an interface //struct Pinner pinning; // an interface
//struct Mount** mounts; //struct Mount** mounts;

View File

@ -40,7 +40,7 @@ struct s_ipfs_routing {
* @param 4 the information found * @param 4 the information found
* @param 5 the size of the information found * @param 5 the size of the information found
*/ */
int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, struct Libp2pVector** multiaddresses);
/** /**
* Find a peer * Find a peer
* @param 1 the context * @param 1 the context

View File

@ -1,5 +1,7 @@
#include "ipfs/routing/routing.h" #include "ipfs/routing/routing.h"
#include "libp2p/routing/kademlia.h" #include "libp2p/routing/kademlia.h"
#include "libp2p/peer/providerstore.h"
#include "libp2p/utils/vector.h"
/** /**
* Routing using Kademlia and DHT * Routing using Kademlia and DHT
@ -41,11 +43,30 @@ int ipfs_routing_kademlia_get_value(struct s_ipfs_routing* routing, char* key, s
* @param results_size the size of the results buffer * @param results_size the size of the results buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* key, size_t key_size, void* results, size_t* results_size) { int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* key, size_t key_size, struct Libp2pVector** results) {
*results = libp2p_utils_vector_new(1);
struct Libp2pVector* vector = *results;
// see if I can provide it // see if I can provide it
// add my multiaddress if I can unsigned char* peer_id = NULL;
// get a list of providers that are closer int peer_id_size = 0;
return 0; if (libp2p_providerstore_get(routing->local_node->providerstore, (unsigned char*)key, key_size, &peer_id, &peer_id_size)) {
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(routing->local_node->peerstore, peer_id, peer_id_size);
struct Libp2pLinkedList* current = peer->addr_head;
while (current != NULL) {
struct MultiAddress* ma = (struct MultiAddress*)current->item;
if (multiaddress_is_ip(ma)) {
libp2p_utils_vector_add(vector, ma);
}
current = current->next;
}
}
//TODO: get a list of providers that are closer
if (vector->total == 0) {
libp2p_utils_vector_free(vector);
vector = NULL;
return 0;
}
return 1;
} }
/** /**
@ -54,8 +75,13 @@ int ipfs_routing_kademlia_find_providers(struct s_ipfs_routing* routing, char* k
int ipfs_routing_kademlia_find_peer(struct s_ipfs_routing* routing, char* param1, size_t param2, void* param3, size_t* param4) { int ipfs_routing_kademlia_find_peer(struct s_ipfs_routing* routing, char* param1, size_t param2, void* param3, size_t* param4) {
return 0; return 0;
} }
int ipfs_routing_kademlia_provide(struct s_ipfs_routing* routing, char* param1, size_t param2) { int ipfs_routing_kademlia_provide(struct s_ipfs_routing* routing, char* key, size_t key_size) {
return 0; //TODO: Announce to the network that I can provide this file
// save in a cache
// store key and address in cache. Key is the hash, peer id is the value
libp2p_providerstore_add(routing->local_node->providerstore, (unsigned char*)key, key_size, (unsigned char*)routing->local_node->identity->peer_id, strlen(routing->local_node->identity->peer_id));
return 1;
} }
// declared here so as to have the code in 1 place // declared here so as to have the code in 1 place

View File

@ -54,7 +54,7 @@ int ipfs_routing_generic_get_value (ipfs_routing* routing, char *key, size_t key
return 0; return 0;
} }
int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, void *ret, size_t *rlen) int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, struct Libp2pVector** multiaddresses)
{ {
return ErrOffline; return ErrOffline;
} }

View File

@ -8,7 +8,7 @@
* Implements the routing interface for network clients * Implements the routing interface for network clients
*/ */
int ipfs_routing_online_find_providers(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { int ipfs_routing_online_find_providers(struct s_ipfs_routing* routing, char* val1, size_t val2, struct Libp2pVector** multiaddresses) {
return 0; return 0;
} }
int ipfs_routing_online_find_peer(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { int ipfs_routing_online_find_peer(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) {

View File

@ -3,6 +3,12 @@
#include "../test_helper.h" #include "../test_helper.h"
#include "ipfs/routing/routing.h" #include "ipfs/routing/routing.h"
#include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/repo/fsrepo/fs_repo.h"
#include "libp2p/net/multistream.h"
#include "libp2p/nodeio/nodeio.h"
#include "libp2p/utils/vector.h"
#include "libp2p/utils/linked_list.h"
#include "libp2p/peer/peerstore.h"
#include "libp2p/peer/providerstore.h"
void stop_kademlia(void); void stop_kademlia(void);
@ -33,6 +39,12 @@ int test_routing_supernode_start() {
return retVal; return retVal;
} }
void* start_daemon(void* path) {
char* repo_path = (char*)path;
ipfs_daemon_start(repo_path);
return NULL;
}
int test_routing_supernode_get_value() { int test_routing_supernode_get_value() {
int retVal = 0; int retVal = 0;
struct FSRepo* fs_repo = NULL; struct FSRepo* fs_repo = NULL;
@ -40,84 +52,94 @@ int test_routing_supernode_get_value() {
struct Stream* stream = NULL; struct Stream* stream = NULL;
int file_size = 1000; int file_size = 1000;
unsigned char bytes[file_size]; unsigned char bytes[file_size];
char* fileName = "temp_file.bin"; //char* fileName = "temp_file.bin";
char* fullFileName = "/tmp/temp_file.bin"; char* fullFileName = "/tmp/temp_file.bin";
struct Node* write_node = NULL; struct Node* write_node = NULL;
size_t bytes_written = 0; size_t bytes_written = 0;
unsigned char base58Hash[100]; struct Libp2pVector* multiaddresses;
size_t results_size = 2048; unsigned char* results;
char results_buffer[results_size]; size_t results_size = 0;
struct Node* node = NULL;
char* ip = NULL;
if (!drop_build_and_open_repo("/tmp/.ipfs", &fs_repo)) if (!drop_build_and_open_repo("/tmp/.ipfs", &fs_repo))
goto exit; goto exit;
// start daemon
pthread_t thread;
pthread_create(&thread, NULL, start_daemon, (void*)"/tmp/.ipfs");
ipfs_node = (struct IpfsNode*)malloc(sizeof(struct IpfsNode)); ipfs_node = (struct IpfsNode*)malloc(sizeof(struct IpfsNode));
ipfs_node->mode = MODE_ONLINE; ipfs_node->mode = MODE_ONLINE;
ipfs_node->identity = fs_repo->config->identity; ipfs_node->identity = fs_repo->config->identity;
ipfs_node->repo = fs_repo; ipfs_node->repo = fs_repo;
ipfs_node->providerstore = libp2p_providerstore_new();
ipfs_node->peerstore = libp2p_peerstore_new();
struct Libp2pPeer this_peer;
this_peer.id = fs_repo->config->identity->peer_id;
this_peer.id_size = strlen(fs_repo->config->identity->peer_id);
this_peer.addr_head = libp2p_utils_linked_list_new();
this_peer.addr_head->item = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/4001");
libp2p_peerstore_add_peer(ipfs_node->peerstore, &this_peer);
ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key, stream); ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key, stream);
if (ipfs_node->routing == NULL) if (ipfs_node->routing == NULL)
goto exit; goto exit;
// start listening
// create a file // create a file
create_bytes(&bytes[0], file_size); create_bytes(&bytes[0], file_size);
create_file(fullFileName, bytes, file_size); create_file(fullFileName, bytes, file_size);
// write to ipfs // write to ipfs
if (ipfs_import_file("/tmp", fileName, &write_node, fs_repo, &bytes_written, 1) == 0) { if (ipfs_import_file("/tmp", fullFileName, &write_node, fs_repo, &bytes_written, 1) == 0) {
goto exit; goto exit;
} }
if (!ipfs_node->routing->Provide(ipfs_node->routing, (char*)write_node->data, write_node->data_size)) // TODO: announce to network that this can be provided
goto exit; if (!ipfs_node->routing->Provide(ipfs_node->routing, (char*)write_node->hash, write_node->hash_size))
// write_node->hash has the base32 key of the file. Convert this to a base58.
if (!ipfs_cid_hash_to_base58(write_node->hash, write_node->hash_size, base58Hash, 100))
goto exit; goto exit;
// ask the network who can provide this // ask the network who can provide this
if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)base58Hash, 100, &results_buffer[0], &results_size)) if (!ipfs_node->routing->FindProviders(ipfs_node->routing, (char*)write_node->hash, write_node->hash_size, &multiaddresses))
goto exit; goto exit;
// Q: What should FindProviders have in the results buffer? A: A struct of:
// 20 byte id
// 4 byte (or 16 byte for ip6) ip address
// 2 byte port number
// that means we have to attempt a connection and ask for peer ID
// TODO: Q: How do we determine ip4 vs ip6?
struct Libp2pLinkedList* multiaddress_head;
// get an IP4 ip and port
if (!ipfs_routing_supernode_parse_provider(results_buffer, results_size, &multiaddress_head))
goto exit;
struct Libp2pLinkedList* current_address = multiaddress_head;
struct MultiAddress* addr = NULL; struct MultiAddress* addr = NULL;
while (current_address != NULL) { for(int i = 0; i < multiaddresses->total; i++) {
addr = (struct Multiaddress*)current_address->item; addr = (struct MultiAddress*) libp2p_utils_vector_get(multiaddresses, i);
if (multiaddress_is_ip4(addr)) if (multiaddress_is_ip4(addr)) {
break; break;
}
addr = NULL; addr = NULL;
current_address = current_address->next;
} }
if (addr == NULL) if (addr == NULL)
goto exit; goto exit;
// Connect to server // Connect to server
char* ip;
multiaddress_get_ip_address(addr, &ip); multiaddress_get_ip_address(addr, &ip);
struct Stream* file_stream = libp2p_net_multistream_connect(ip, multiaddress_get_ip_port(addr)); struct Stream* file_stream = libp2p_net_multistream_connect(ip, multiaddress_get_ip_port(addr));
if (file_stream == NULL)
goto exit;
struct SessionContext context;
context.insecure_stream = file_stream;
context.default_stream = file_stream;
// Switch from multistream to NodeIO // Switch from multistream to NodeIO
if (!libp2p_nodeio_upgrade_stream(file_stream)) if (!libp2p_nodeio_upgrade_stream(&context))
goto exit; goto exit;
// Ask for file // Ask for file
struct Node* node = libp2p_nodeio_get(file_stream, base58Hash, 100); if (!libp2p_nodeio_get(&context, write_node->hash, write_node->hash_size, &results, &results_size))
if (node == NULL)
goto exit; goto exit;
if (!ipfs_node_protobuf_decode(results, results_size, &node))
goto exit;
//TODO: see if we got it
retVal = 1; retVal = 1;
exit: exit:
if (ipfs_node->routing != NULL) if (ipfs_node->routing != NULL)

View File

@ -57,6 +57,7 @@ const char* names[] = {
"test_merkledag_add_node", "test_merkledag_add_node",
"test_merkledag_add_node_with_links", "test_merkledag_add_node_with_links",
"test_resolver_get", "test_resolver_get",
"test_routing_supernode_get_value",
"test_unixfs_encode_decode", "test_unixfs_encode_decode",
"test_unixfs_encode_smallfile", "test_unixfs_encode_smallfile",
"test_ping", "test_ping",
@ -94,6 +95,7 @@ int (*funcs[])(void) = {
test_merkledag_add_node, test_merkledag_add_node,
test_merkledag_add_node_with_links, test_merkledag_add_node_with_links,
test_resolver_get, test_resolver_get,
test_routing_supernode_get_value,
test_unixfs_encode_decode, test_unixfs_encode_decode,
test_unixfs_encode_smallfile, test_unixfs_encode_smallfile,
test_ping, test_ping,