From 87cf77970492b337316089137e69fc826800945c Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 3 Apr 2017 17:26:33 -0500 Subject: [PATCH] handling add provider --- core/bootstrap.c | 9 ++- core/null.c | 11 ++- core/ping.c | 115 +++++++------------------- importer/resolver.c | 16 +--- include/ipfs/routing/routing.h | 7 +- routing/k_routing.c | 8 +- routing/offline.c | 4 +- routing/online.c | 142 ++++++++++++++++++++++++++------- test/core/test_ping.h | 2 +- 9 files changed, 163 insertions(+), 151 deletions(-) diff --git a/core/bootstrap.c b/core/bootstrap.c index 49f9bd7..47b87fb 100644 --- a/core/bootstrap.c +++ b/core/bootstrap.c @@ -1,5 +1,6 @@ #include "libp2p/peer/peer.h" +#include "libp2p/utils/logger.h" #include "ipfs/routing/routing.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/thirdparty/ipfsaddr/ipfs_addr.h" @@ -45,8 +46,8 @@ void ipfs_bootstrap_announce_files(struct IpfsNode* local_node) { int key_size = 0; enum DatastoreCursorOp op = CURSOR_FIRST; while (db->datastore_cursor_get(&key, &key_size, NULL, 0, op, db)) { + libp2p_logger_debug("bootstrap", "Announcing a file to the world.\n"); local_node->routing->Provide(local_node->routing, (char*)key, key_size); - // TODO announce the file op = CURSOR_NEXT; free(key); } @@ -55,15 +56,17 @@ void ipfs_bootstrap_announce_files(struct IpfsNode* local_node) { } /*** - * Listen for connections on the API port (usually 5001) + * connect to the swarm * NOTE: This fills in the IpfsNode->routing struct * * @param param the IpfsNode information * @returns nothing useful */ void *ipfs_bootstrap_routing(void* param) { + libp2p_logger_add_class("bootstrap"); struct IpfsNode* local_node = (struct IpfsNode*)param; - local_node->routing = ipfs_routing_new_kademlia(local_node, &local_node->identity->private_key, NULL); + local_node->routing = ipfs_routing_new_online(local_node, &local_node->identity->private_key, NULL); + local_node->routing->Bootstrap(local_node->routing); ipfs_bootstrap_announce_files(local_node); return (void*)2; } diff --git a/core/null.c b/core/null.c index ac7eaff..2c13703 100644 --- a/core/null.c +++ b/core/null.c @@ -55,13 +55,12 @@ void *ipfs_null_connection (void *ptr) for(;;) { // check if they're looking for an upgrade (i.e. secio) unsigned char* results = NULL; - size_t bytes_read; - session.default_stream->read(&session, &results, &bytes_read); - libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); - for(int i = 0; i < bytes_read; i++) { - libp2p_logger_debug("null", "%02x ", results[i]); + size_t bytes_read = 0; + if (!session.default_stream->read(&session, &results, &bytes_read) ) { + libp2p_logger_debug("null", "stream transaction read returned false\n"); + break; } - libp2p_logger_debug("null", "\n"); + libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); if (protocol_compare(results, bytes_read, "/secio")) { libp2p_logger_debug("null", "Attempting secure io connection...\n"); if (!libp2p_secio_handshake(&session, &connection_param->local_node->identity->private_key, 1)) { diff --git a/core/ping.c b/core/ping.c index 3cb4184..dd80b8d 100644 --- a/core/ping.c +++ b/core/ping.c @@ -19,26 +19,20 @@ int ipfs_ping (int argc, char **argv) { - unsigned char* results = NULL; - size_t results_size = 0; - int port = 0; - char* ip = NULL; - struct SessionContext session; + int retVal = 0; struct MultiAddress* address; int addressAllocated = 0; - int retVal = 0; - struct Libp2pMessage *msg = NULL, *msg_returned = NULL; struct IpfsNode local_node; - unsigned char* protobuf = NULL; - size_t protobuf_size = 0; struct Stream* stream = NULL; + struct Libp2pPeer* peer_to_ping = NULL; + char* id = NULL; + struct FSRepo* fs_repo = NULL; // sanity check if (argc < 3) - return 0; + goto exit; // read the configuration - struct FSRepo* fs_repo; if (!ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo)) goto exit; @@ -56,102 +50,49 @@ int ipfs_ping (int argc, char **argv) if (local_node.routing->Bootstrap(local_node.routing) != 0) goto exit; - if (strstr(argv[2], "/ipfs/") != NULL) { + if (strstr(argv[2], "Qm") == &argv[2][0]) { // resolve the peer id - struct Libp2pPeer *peer = ipfs_resolver_find_peer(argv[2], &local_node); - struct Libp2pLinkedList* current = peer->addr_head; - // try to find an IP version of the multiaddress - while (current != NULL) { - address = (struct MultiAddress*)current->item; - if (multiaddress_is_ip(address)) - break; - address = NULL; - } + peer_to_ping = ipfs_resolver_find_peer(argv[2], &local_node); } else { // perhaps they passed an IP and port if (argc >= 3) { char* str = malloc(strlen(argv[2]) + strlen(argv[3]) + 100); sprintf(str, "/ip4/%s/tcp/%s", argv[2], argv[3]); address = multiaddress_new_from_string(str); - free(str); if (address != NULL) addressAllocated = 1; + peer_to_ping = libp2p_peer_new(); + peer_to_ping->addr_head = libp2p_utils_linked_list_new(); + peer_to_ping->addr_head->item = address; + peer_to_ping->id = str; + peer_to_ping->id_size = strlen(str); + free(str); } //TODO: Error checking } - if (address == NULL || !multiaddress_is_ip(address)) { - fprintf(stderr, "Unable to find address\n"); + if (peer_to_ping == NULL) + goto exit; + + if (!local_node.routing->Ping(local_node.routing, peer_to_ping)) { + id = malloc(peer_to_ping->id_size + 1); + memcpy(id, peer_to_ping->id, peer_to_ping->id_size); + id[peer_to_ping->id_size] = 0; + fprintf(stderr, "Unable to ping %s\n", id); + free(id); goto exit; } - if (!multiaddress_get_ip_address(address, &ip)) { - fprintf(stderr, "Could not convert IP address %s\n", address->string); - goto exit; - } - port = multiaddress_get_ip_port(address); - - session.insecure_stream = libp2p_net_multistream_connect(ip, port); - session.default_stream = session.insecure_stream; - if (session.insecure_stream == NULL) { - fprintf(stderr, "Unable to connect to %s on port %d", ip, port); - goto exit; - } - - //TODO: Fix mac problem, then use this to try to switch to secio - /* - if (!libp2p_secio_handshake(&session, &fs_repo->config->identity->private_key, 0)) { - fprintf(stderr, "Unable to switch to secure connection. Attempting insecure ping...\n"); - } - */ - - // prepare the PING message - msg = libp2p_message_new(); - msg->message_type = MESSAGE_TYPE_PING; - - protobuf_size = libp2p_message_protobuf_encode_size(msg); - protobuf = (unsigned char*)malloc(protobuf_size); - libp2p_message_protobuf_encode(msg, &protobuf[0], protobuf_size, &protobuf_size); - if (!libp2p_routing_dht_upgrade_stream(&session)) { - fprintf(stderr, "PING unsuccessful. Unable to switch to kademlia protocol\n"); - goto exit; - } - session.default_stream->write(&session, protobuf, protobuf_size); - session.default_stream->read(&session, &results, &results_size); - - // see if we can unprotobuf - libp2p_message_protobuf_decode(results, results_size, &msg_returned); - if (msg_returned->message_type != MESSAGE_TYPE_PING) { - fprintf(stderr, "Ping unsuccessful. Returned message was not a PING"); - goto exit; - } - - if (results_size != protobuf_size) { - fprintf(stderr, "PING unsuccessful. Original size: %lu, returned size: %lu\n", protobuf_size, results_size); - goto exit; - } - if (memcmp(results, protobuf, protobuf_size) != 0) { - fprintf(stderr, "PING unsuccessful. Results do not match.\n"); - goto exit; - } - - fprintf(stdout, "Ping of %s:%d successful.\n", ip, port); - retVal = 1; exit: - if (fs_repo != NULL) - ipfs_repo_fsrepo_free(fs_repo); if (addressAllocated) multiaddress_free(address); - if (ip != NULL) - free(ip); - if (msg != NULL) - libp2p_message_free(msg); - if (msg_returned != NULL) - libp2p_message_free(msg_returned); - if (protobuf != NULL) - free(protobuf); - + if (fs_repo != NULL) + ipfs_repo_fsrepo_free(fs_repo); + if (local_node.peerstore != NULL) + libp2p_peerstore_free(local_node.peerstore); + if (local_node.providerstore != NULL) + libp2p_providerstore_free(local_node.providerstore); return retVal; } diff --git a/importer/resolver.c b/importer/resolver.c index 36273e3..c264784 100644 --- a/importer/resolver.c +++ b/importer/resolver.c @@ -267,8 +267,6 @@ struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct */ struct Libp2pPeer* ipfs_resolver_find_peer(const char* path, const struct IpfsNode* ipfs_node) { struct FSRepo* fs_repo = ipfs_node->repo; - unsigned char* results = NULL; - size_t results_size = 0; struct Libp2pLinkedList *addresses = NULL; struct Libp2pPeer* peer = NULL; @@ -292,20 +290,8 @@ struct Libp2pPeer* ipfs_resolver_find_peer(const char* path, const struct IpfsNo // ask the swarm for the peer const char* address_string = ipfs_resolver_remove_path_prefix(path, fs_repo); - if (ipfs_node->routing->FindPeer(ipfs_node->routing, address_string, strlen(address_string), (void**)&results, &results_size) != 0) - goto exit; + ipfs_node->routing->FindPeer(ipfs_node->routing, address_string, strlen(address_string), &peer); - // we should have gotten a protobuf'd peer - if (!libp2p_peer_protobuf_decode(results, results_size, &peer)) - goto exit; - - if (peer == NULL) - goto exit; - - - exit: - if (results != NULL) - free(results); return peer; } diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index 527a422..528d47c 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -1,5 +1,6 @@ #pragma once +#include "libp2p/peer/peer.h" #include "libp2p/crypto/rsa.h" #include "libp2p/record/message.h" #include "ipfs/core/ipfs_node.h" @@ -50,7 +51,7 @@ struct IpfsRouting { * @param 5 the size of the results * @returns 0 or error code */ - int (*FindPeer) (struct IpfsRouting*, const char*, size_t, void**, size_t*); + int (*FindPeer) (struct IpfsRouting*, const char*, size_t, struct Libp2pPeer** result); /** * Announce to the network that this host can provide this key * @param 1 the context @@ -62,10 +63,10 @@ struct IpfsRouting { /** * Ping * @param routing the context - * @param message the message + * @param peer the peer * @returns true(1) on success, otherwise false(0) */ - int (*Ping) (struct IpfsRouting*, struct Libp2pMessage*); + int (*Ping) (struct IpfsRouting* routing, struct Libp2pPeer* peer); /** * Get everything going * @param routing the context diff --git a/routing/k_routing.c b/routing/k_routing.c index f0e808d..428ee35 100644 --- a/routing/k_routing.c +++ b/routing/k_routing.c @@ -92,7 +92,7 @@ int ipfs_routing_kademlia_find_providers(struct IpfsRouting* routing, char* key, /** * Find a peer */ -int ipfs_routing_kademlia_find_peer(struct IpfsRouting* routing, const char* param1, size_t param2, void** param3, size_t* param4) { +int ipfs_routing_kademlia_find_peer(struct IpfsRouting* routing, const char* param1, size_t param2, struct Libp2pPeer **result) { return 0; } @@ -113,12 +113,12 @@ int ipfs_routing_kademlia_provide(struct IpfsRouting* routing, char* key, size_t } // declared here so as to have the code in 1 place -int ipfs_routing_online_ping(struct IpfsRouting*, struct Libp2pMessage*); +int ipfs_routing_online_ping(struct IpfsRouting*, struct Libp2pPeer*); /** * Ping this instance */ -int ipfs_routing_kademlia_ping(struct IpfsRouting* routing, struct Libp2pMessage* message) { - return ipfs_routing_online_ping(routing, message); +int ipfs_routing_kademlia_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) { + return ipfs_routing_online_ping(routing, peer); } int ipfs_routing_kademlia_bootstrap(struct IpfsRouting* routing) { diff --git a/routing/offline.c b/routing/offline.c index cb05a70..ee02dde 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -59,7 +59,7 @@ int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key return ErrOffline; } -int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, const char *peer_id, size_t pid_size, void **ret, size_t *rlen) +int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, const char *peer_id, size_t pid_size, struct Libp2pPeer **result) { return ErrOffline; } @@ -69,7 +69,7 @@ int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, char *cid, size_ return ErrOffline; } -int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pMessage* message) +int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pPeer* peer) { return ErrOffline; } diff --git a/routing/online.c b/routing/online.c index efaef80..9e568c6 100644 --- a/routing/online.c +++ b/routing/online.c @@ -3,11 +3,49 @@ #include "ipfs/routing/routing.h" #include "libp2p/record/message.h" #include "libp2p/net/stream.h" +#include "libp2p/conn/session.h" +#include "libp2p/routing/dht_protocol.h" /** * Implements the routing interface for communicating with network clients */ +/** + * Helper method to send and receive a kademlia message + * @param session_context the session context + * @param message what to send + * @returns what was received + */ +struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct Stream* stream, struct Libp2pMessage* message) { + size_t protobuf_size = 0, results_size = 0; + unsigned char* protobuf = NULL, *results = NULL; + struct Libp2pMessage* return_message = NULL; + struct SessionContext session_context; + + protobuf_size = libp2p_message_protobuf_encode_size(message); + protobuf = (unsigned char*)malloc(protobuf_size); + libp2p_message_protobuf_encode(message, &protobuf[0], protobuf_size, &protobuf_size); + + session_context.default_stream = stream; + session_context.insecure_stream = stream; + + // upgrade to kademlia protocol + if (!libp2p_routing_dht_upgrade_stream(&session_context)) { + goto exit; + } + + // send the message, and expect the same back + session_context.default_stream->write(&session_context, protobuf, protobuf_size); + session_context.default_stream->read(&session_context, &results, &results_size); + + // see if we can unprotobuf + if (!libp2p_message_protobuf_decode(results, results_size, &return_message)) + goto exit; + + exit: + return return_message; +} + int ipfs_routing_online_find_providers(struct IpfsRouting* routing, char* val1, size_t val2, struct Libp2pVector** multiaddresses) { return 0; } @@ -17,51 +55,88 @@ int ipfs_routing_online_find_providers(struct IpfsRouting* routing, char* val1, * @param routing the context * @param peer_id the id to look for * @param peer_id_size the size of the id - * @param results the results of the search - * @param results_size the size of results - * @returns 0 on success, otherwise -1 + * @param peer the result of the search + * @returns true(1) on success, otherwise false(0) */ -int ipfs_routing_online_find_peer(struct IpfsRouting* routing, const char* peer_id, size_t peer_id_size, void** results, size_t* results_size) { +int ipfs_routing_online_find_peer(struct IpfsRouting* routing, const char* peer_id, size_t peer_id_size, struct Libp2pPeer **result) { // first look to see if we have it in the peerstore struct Peerstore* peerstore = routing->local_node->peerstore; - struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)peer_id, peer_id_size); - if (peer != NULL) { - //we found it. Return it - *results_size = libp2p_peer_protobuf_encode_size(peer); - *results = malloc(*results_size); - if (!libp2p_peer_protobuf_encode(peer, *results, *results_size, results_size)) { - free(results); - *results = NULL; - *results_size = 0; - return -1; - } - return 0; + *result = libp2p_peerstore_get_peer(peerstore, (unsigned char*)peer_id, peer_id_size); + if (*result != NULL) { + return 1; } //TODO: ask the swarm to find the peer - return -1; + return 0; } /** * Notify the network that this host can provide this key * @param routing information about this host - * @param val1 the key (hash) of the data + * @param key the key (hash) of the data + * @param key_size the length of the key * @returns true(1) on success, otherwise false */ -int ipfs_routing_online_provide(struct IpfsRouting* routing, char* val1, size_t val2) { - return 0; -} -int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pMessage* message) { - // send the same stuff back - size_t protobuf_size = libp2p_message_protobuf_encode_size(message); - unsigned char protobuf[protobuf_size]; +int ipfs_routing_online_provide(struct IpfsRouting* routing, char* key, size_t key_size) { + struct Libp2pPeer local_peer; + local_peer.id_size = strlen(routing->local_node->identity->peer_id); + local_peer.id = routing->local_node->identity->peer_id; + local_peer.connection_type = CONNECTION_TYPE_CONNECTED; + local_peer.addr_head = NULL; - if (!libp2p_message_protobuf_encode(message, protobuf, protobuf_size, &protobuf_size)) { - return -1; + struct Libp2pMessage* msg = libp2p_message_new(); + msg->key_size = key_size; + msg->key = malloc(msg->key_size); + memcpy(msg->key, key, msg->key_size); + msg->message_type = MESSAGE_TYPE_ADD_PROVIDER; + msg->provider_peer_head = libp2p_utils_linked_list_new(); + msg->provider_peer_head->item = &local_peer; + + struct Libp2pLinkedList *current = routing->local_node->peerstore->head_entry; + while (current != NULL) { + struct PeerEntry* current_peer_entry = (struct PeerEntry*)current->item; + struct Libp2pPeer* current_peer = current_peer_entry->peer; + if (current_peer->connection_type == CONNECTION_TYPE_CONNECTED) { + // ignoring results is okay this time + ipfs_routing_online_send_receive_message(current_peer->connection, msg); + } + current = current->next; } - int retVal = routing->stream->write(routing->stream, protobuf, protobuf_size); - if (retVal == 0) - retVal = -1; + libp2p_message_free(msg); + + return 1; +} + +/** + * Ping a remote + * @param routing the context + * @param message the message that we want to send + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) { + int retVal = 0; + struct Libp2pMessage* msg = NULL, *msg_returned = NULL; + + if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { + if (!libp2p_peer_connect(peer)) + return 0; + } + if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { + + // build the message + msg = libp2p_message_new(); + msg->message_type = MESSAGE_TYPE_PING; + + msg_returned = ipfs_routing_online_send_receive_message(peer->connection, msg); + + if (msg_returned == NULL) + goto exit; + if (msg_returned->message_type != msg->message_type) + goto exit; + } + + retVal = 1; + exit: return retVal; } @@ -94,6 +169,13 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { peer->addr_head->item = address; libp2p_peerstore_add_peer(routing->local_node->peerstore, peer); libp2p_peer_free(peer); + // now find it and attempt to connect + peer = libp2p_peerstore_get_peer(routing->local_node->peerstore, (const unsigned char*)peer_id, strlen(peer_id)); + if (peer == NULL) + return -1; // this should never happen + if (peer->connection == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) + libp2p_peer_connect(peer); + } } } diff --git a/test/core/test_ping.h b/test/core/test_ping.h index 97ffdc2..d8a5bc8 100644 --- a/test/core/test_ping.h +++ b/test/core/test_ping.h @@ -78,7 +78,7 @@ int test_ping() { } int test_ping_remote() { - char* argv[] = { "ipfs", "ping", "/ipfs/QmTjg669YQemhffXLrkA3as9jT8SzyRtWaLXHKwYN6wCBd" }; + char* argv[] = { "ipfs", "ping", "QmTjg669YQemhffXLrkA3as9jT8SzyRtWaLXHKwYN6wCBd" }; int argc = 3; return ipfs_ping(argc, argv);