handling add provider

yamux
John Jones 2017-04-03 17:26:33 -05:00
parent 58b5bc8cdd
commit 87cf779704
9 changed files with 163 additions and 151 deletions

View File

@ -1,5 +1,6 @@
#include "libp2p/peer/peer.h"
#include "libp2p/utils/logger.h"
#include "ipfs/routing/routing.h"
#include "ipfs/core/ipfs_node.h"
#include "ipfs/thirdparty/ipfsaddr/ipfs_addr.h"
@ -45,8 +46,8 @@ void ipfs_bootstrap_announce_files(struct IpfsNode* local_node) {
int key_size = 0;
enum DatastoreCursorOp op = CURSOR_FIRST;
while (db->datastore_cursor_get(&key, &key_size, NULL, 0, op, db)) {
libp2p_logger_debug("bootstrap", "Announcing a file to the world.\n");
local_node->routing->Provide(local_node->routing, (char*)key, key_size);
// TODO announce the file
op = CURSOR_NEXT;
free(key);
}
@ -55,15 +56,17 @@ void ipfs_bootstrap_announce_files(struct IpfsNode* local_node) {
}
/***
* Listen for connections on the API port (usually 5001)
* connect to the swarm
* NOTE: This fills in the IpfsNode->routing struct
*
* @param param the IpfsNode information
* @returns nothing useful
*/
void *ipfs_bootstrap_routing(void* param) {
libp2p_logger_add_class("bootstrap");
struct IpfsNode* local_node = (struct IpfsNode*)param;
local_node->routing = ipfs_routing_new_kademlia(local_node, &local_node->identity->private_key, NULL);
local_node->routing = ipfs_routing_new_online(local_node, &local_node->identity->private_key, NULL);
local_node->routing->Bootstrap(local_node->routing);
ipfs_bootstrap_announce_files(local_node);
return (void*)2;
}

View File

@ -55,13 +55,12 @@ void *ipfs_null_connection (void *ptr)
for(;;) {
// check if they're looking for an upgrade (i.e. secio)
unsigned char* results = NULL;
size_t bytes_read;
session.default_stream->read(&session, &results, &bytes_read);
libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read);
for(int i = 0; i < bytes_read; i++) {
libp2p_logger_debug("null", "%02x ", results[i]);
size_t bytes_read = 0;
if (!session.default_stream->read(&session, &results, &bytes_read) ) {
libp2p_logger_debug("null", "stream transaction read returned false\n");
break;
}
libp2p_logger_debug("null", "\n");
libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read);
if (protocol_compare(results, bytes_read, "/secio")) {
libp2p_logger_debug("null", "Attempting secure io connection...\n");
if (!libp2p_secio_handshake(&session, &connection_param->local_node->identity->private_key, 1)) {

View File

@ -19,26 +19,20 @@
int ipfs_ping (int argc, char **argv)
{
unsigned char* results = NULL;
size_t results_size = 0;
int port = 0;
char* ip = NULL;
struct SessionContext session;
int retVal = 0;
struct MultiAddress* address;
int addressAllocated = 0;
int retVal = 0;
struct Libp2pMessage *msg = NULL, *msg_returned = NULL;
struct IpfsNode local_node;
unsigned char* protobuf = NULL;
size_t protobuf_size = 0;
struct Stream* stream = NULL;
struct Libp2pPeer* peer_to_ping = NULL;
char* id = NULL;
struct FSRepo* fs_repo = NULL;
// sanity check
if (argc < 3)
return 0;
goto exit;
// read the configuration
struct FSRepo* fs_repo;
if (!ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo))
goto exit;
@ -56,102 +50,49 @@ int ipfs_ping (int argc, char **argv)
if (local_node.routing->Bootstrap(local_node.routing) != 0)
goto exit;
if (strstr(argv[2], "/ipfs/") != NULL) {
if (strstr(argv[2], "Qm") == &argv[2][0]) {
// resolve the peer id
struct Libp2pPeer *peer = ipfs_resolver_find_peer(argv[2], &local_node);
struct Libp2pLinkedList* current = peer->addr_head;
// try to find an IP version of the multiaddress
while (current != NULL) {
address = (struct MultiAddress*)current->item;
if (multiaddress_is_ip(address))
break;
address = NULL;
}
peer_to_ping = ipfs_resolver_find_peer(argv[2], &local_node);
} else {
// perhaps they passed an IP and port
if (argc >= 3) {
char* str = malloc(strlen(argv[2]) + strlen(argv[3]) + 100);
sprintf(str, "/ip4/%s/tcp/%s", argv[2], argv[3]);
address = multiaddress_new_from_string(str);
free(str);
if (address != NULL)
addressAllocated = 1;
peer_to_ping = libp2p_peer_new();
peer_to_ping->addr_head = libp2p_utils_linked_list_new();
peer_to_ping->addr_head->item = address;
peer_to_ping->id = str;
peer_to_ping->id_size = strlen(str);
free(str);
}
//TODO: Error checking
}
if (address == NULL || !multiaddress_is_ip(address)) {
fprintf(stderr, "Unable to find address\n");
if (peer_to_ping == NULL)
goto exit;
if (!local_node.routing->Ping(local_node.routing, peer_to_ping)) {
id = malloc(peer_to_ping->id_size + 1);
memcpy(id, peer_to_ping->id, peer_to_ping->id_size);
id[peer_to_ping->id_size] = 0;
fprintf(stderr, "Unable to ping %s\n", id);
free(id);
goto exit;
}
if (!multiaddress_get_ip_address(address, &ip)) {
fprintf(stderr, "Could not convert IP address %s\n", address->string);
goto exit;
}
port = multiaddress_get_ip_port(address);
session.insecure_stream = libp2p_net_multistream_connect(ip, port);
session.default_stream = session.insecure_stream;
if (session.insecure_stream == NULL) {
fprintf(stderr, "Unable to connect to %s on port %d", ip, port);
goto exit;
}
//TODO: Fix mac problem, then use this to try to switch to secio
/*
if (!libp2p_secio_handshake(&session, &fs_repo->config->identity->private_key, 0)) {
fprintf(stderr, "Unable to switch to secure connection. Attempting insecure ping...\n");
}
*/
// prepare the PING message
msg = libp2p_message_new();
msg->message_type = MESSAGE_TYPE_PING;
protobuf_size = libp2p_message_protobuf_encode_size(msg);
protobuf = (unsigned char*)malloc(protobuf_size);
libp2p_message_protobuf_encode(msg, &protobuf[0], protobuf_size, &protobuf_size);
if (!libp2p_routing_dht_upgrade_stream(&session)) {
fprintf(stderr, "PING unsuccessful. Unable to switch to kademlia protocol\n");
goto exit;
}
session.default_stream->write(&session, protobuf, protobuf_size);
session.default_stream->read(&session, &results, &results_size);
// see if we can unprotobuf
libp2p_message_protobuf_decode(results, results_size, &msg_returned);
if (msg_returned->message_type != MESSAGE_TYPE_PING) {
fprintf(stderr, "Ping unsuccessful. Returned message was not a PING");
goto exit;
}
if (results_size != protobuf_size) {
fprintf(stderr, "PING unsuccessful. Original size: %lu, returned size: %lu\n", protobuf_size, results_size);
goto exit;
}
if (memcmp(results, protobuf, protobuf_size) != 0) {
fprintf(stderr, "PING unsuccessful. Results do not match.\n");
goto exit;
}
fprintf(stdout, "Ping of %s:%d successful.\n", ip, port);
retVal = 1;
exit:
if (fs_repo != NULL)
ipfs_repo_fsrepo_free(fs_repo);
if (addressAllocated)
multiaddress_free(address);
if (ip != NULL)
free(ip);
if (msg != NULL)
libp2p_message_free(msg);
if (msg_returned != NULL)
libp2p_message_free(msg_returned);
if (protobuf != NULL)
free(protobuf);
if (fs_repo != NULL)
ipfs_repo_fsrepo_free(fs_repo);
if (local_node.peerstore != NULL)
libp2p_peerstore_free(local_node.peerstore);
if (local_node.providerstore != NULL)
libp2p_providerstore_free(local_node.providerstore);
return retVal;
}

View File

@ -267,8 +267,6 @@ struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct
*/
struct Libp2pPeer* ipfs_resolver_find_peer(const char* path, const struct IpfsNode* ipfs_node) {
struct FSRepo* fs_repo = ipfs_node->repo;
unsigned char* results = NULL;
size_t results_size = 0;
struct Libp2pLinkedList *addresses = NULL;
struct Libp2pPeer* peer = NULL;
@ -292,20 +290,8 @@ struct Libp2pPeer* ipfs_resolver_find_peer(const char* path, const struct IpfsNo
// ask the swarm for the peer
const char* address_string = ipfs_resolver_remove_path_prefix(path, fs_repo);
if (ipfs_node->routing->FindPeer(ipfs_node->routing, address_string, strlen(address_string), (void**)&results, &results_size) != 0)
goto exit;
ipfs_node->routing->FindPeer(ipfs_node->routing, address_string, strlen(address_string), &peer);
// we should have gotten a protobuf'd peer
if (!libp2p_peer_protobuf_decode(results, results_size, &peer))
goto exit;
if (peer == NULL)
goto exit;
exit:
if (results != NULL)
free(results);
return peer;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include "libp2p/peer/peer.h"
#include "libp2p/crypto/rsa.h"
#include "libp2p/record/message.h"
#include "ipfs/core/ipfs_node.h"
@ -50,7 +51,7 @@ struct IpfsRouting {
* @param 5 the size of the results
* @returns 0 or error code
*/
int (*FindPeer) (struct IpfsRouting*, const char*, size_t, void**, size_t*);
int (*FindPeer) (struct IpfsRouting*, const char*, size_t, struct Libp2pPeer** result);
/**
* Announce to the network that this host can provide this key
* @param 1 the context
@ -62,10 +63,10 @@ struct IpfsRouting {
/**
* Ping
* @param routing the context
* @param message the message
* @param peer the peer
* @returns true(1) on success, otherwise false(0)
*/
int (*Ping) (struct IpfsRouting*, struct Libp2pMessage*);
int (*Ping) (struct IpfsRouting* routing, struct Libp2pPeer* peer);
/**
* Get everything going
* @param routing the context

View File

@ -92,7 +92,7 @@ int ipfs_routing_kademlia_find_providers(struct IpfsRouting* routing, char* key,
/**
* Find a peer
*/
int ipfs_routing_kademlia_find_peer(struct IpfsRouting* routing, const char* param1, size_t param2, void** param3, size_t* param4) {
int ipfs_routing_kademlia_find_peer(struct IpfsRouting* routing, const char* param1, size_t param2, struct Libp2pPeer **result) {
return 0;
}
@ -113,12 +113,12 @@ int ipfs_routing_kademlia_provide(struct IpfsRouting* routing, char* key, size_t
}
// declared here so as to have the code in 1 place
int ipfs_routing_online_ping(struct IpfsRouting*, struct Libp2pMessage*);
int ipfs_routing_online_ping(struct IpfsRouting*, struct Libp2pPeer*);
/**
* Ping this instance
*/
int ipfs_routing_kademlia_ping(struct IpfsRouting* routing, struct Libp2pMessage* message) {
return ipfs_routing_online_ping(routing, message);
int ipfs_routing_kademlia_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) {
return ipfs_routing_online_ping(routing, peer);
}
int ipfs_routing_kademlia_bootstrap(struct IpfsRouting* routing) {

View File

@ -59,7 +59,7 @@ int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key
return ErrOffline;
}
int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, const char *peer_id, size_t pid_size, void **ret, size_t *rlen)
int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, const char *peer_id, size_t pid_size, struct Libp2pPeer **result)
{
return ErrOffline;
}
@ -69,7 +69,7 @@ int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, char *cid, size_
return ErrOffline;
}
int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pMessage* message)
int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pPeer* peer)
{
return ErrOffline;
}

View File

@ -3,11 +3,49 @@
#include "ipfs/routing/routing.h"
#include "libp2p/record/message.h"
#include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
#include "libp2p/routing/dht_protocol.h"
/**
* Implements the routing interface for communicating with network clients
*/
/**
* Helper method to send and receive a kademlia message
* @param session_context the session context
* @param message what to send
* @returns what was received
*/
struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct Stream* stream, struct Libp2pMessage* message) {
size_t protobuf_size = 0, results_size = 0;
unsigned char* protobuf = NULL, *results = NULL;
struct Libp2pMessage* return_message = NULL;
struct SessionContext session_context;
protobuf_size = libp2p_message_protobuf_encode_size(message);
protobuf = (unsigned char*)malloc(protobuf_size);
libp2p_message_protobuf_encode(message, &protobuf[0], protobuf_size, &protobuf_size);
session_context.default_stream = stream;
session_context.insecure_stream = stream;
// upgrade to kademlia protocol
if (!libp2p_routing_dht_upgrade_stream(&session_context)) {
goto exit;
}
// send the message, and expect the same back
session_context.default_stream->write(&session_context, protobuf, protobuf_size);
session_context.default_stream->read(&session_context, &results, &results_size);
// see if we can unprotobuf
if (!libp2p_message_protobuf_decode(results, results_size, &return_message))
goto exit;
exit:
return return_message;
}
int ipfs_routing_online_find_providers(struct IpfsRouting* routing, char* val1, size_t val2, struct Libp2pVector** multiaddresses) {
return 0;
}
@ -17,51 +55,88 @@ int ipfs_routing_online_find_providers(struct IpfsRouting* routing, char* val1,
* @param routing the context
* @param peer_id the id to look for
* @param peer_id_size the size of the id
* @param results the results of the search
* @param results_size the size of results
* @returns 0 on success, otherwise -1
* @param peer the result of the search
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_find_peer(struct IpfsRouting* routing, const char* peer_id, size_t peer_id_size, void** results, size_t* results_size) {
int ipfs_routing_online_find_peer(struct IpfsRouting* routing, const char* peer_id, size_t peer_id_size, struct Libp2pPeer **result) {
// first look to see if we have it in the peerstore
struct Peerstore* peerstore = routing->local_node->peerstore;
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)peer_id, peer_id_size);
if (peer != NULL) {
//we found it. Return it
*results_size = libp2p_peer_protobuf_encode_size(peer);
*results = malloc(*results_size);
if (!libp2p_peer_protobuf_encode(peer, *results, *results_size, results_size)) {
free(results);
*results = NULL;
*results_size = 0;
return -1;
}
return 0;
*result = libp2p_peerstore_get_peer(peerstore, (unsigned char*)peer_id, peer_id_size);
if (*result != NULL) {
return 1;
}
//TODO: ask the swarm to find the peer
return -1;
return 0;
}
/**
* Notify the network that this host can provide this key
* @param routing information about this host
* @param val1 the key (hash) of the data
* @param key the key (hash) of the data
* @param key_size the length of the key
* @returns true(1) on success, otherwise false
*/
int ipfs_routing_online_provide(struct IpfsRouting* routing, char* val1, size_t val2) {
return 0;
}
int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pMessage* message) {
// send the same stuff back
size_t protobuf_size = libp2p_message_protobuf_encode_size(message);
unsigned char protobuf[protobuf_size];
int ipfs_routing_online_provide(struct IpfsRouting* routing, char* key, size_t key_size) {
struct Libp2pPeer local_peer;
local_peer.id_size = strlen(routing->local_node->identity->peer_id);
local_peer.id = routing->local_node->identity->peer_id;
local_peer.connection_type = CONNECTION_TYPE_CONNECTED;
local_peer.addr_head = NULL;
if (!libp2p_message_protobuf_encode(message, protobuf, protobuf_size, &protobuf_size)) {
return -1;
struct Libp2pMessage* msg = libp2p_message_new();
msg->key_size = key_size;
msg->key = malloc(msg->key_size);
memcpy(msg->key, key, msg->key_size);
msg->message_type = MESSAGE_TYPE_ADD_PROVIDER;
msg->provider_peer_head = libp2p_utils_linked_list_new();
msg->provider_peer_head->item = &local_peer;
struct Libp2pLinkedList *current = routing->local_node->peerstore->head_entry;
while (current != NULL) {
struct PeerEntry* current_peer_entry = (struct PeerEntry*)current->item;
struct Libp2pPeer* current_peer = current_peer_entry->peer;
if (current_peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// ignoring results is okay this time
ipfs_routing_online_send_receive_message(current_peer->connection, msg);
}
current = current->next;
}
int retVal = routing->stream->write(routing->stream, protobuf, protobuf_size);
if (retVal == 0)
retVal = -1;
libp2p_message_free(msg);
return 1;
}
/**
* Ping a remote
* @param routing the context
* @param message the message that we want to send
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) {
int retVal = 0;
struct Libp2pMessage* msg = NULL, *msg_returned = NULL;
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(peer))
return 0;
}
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// build the message
msg = libp2p_message_new();
msg->message_type = MESSAGE_TYPE_PING;
msg_returned = ipfs_routing_online_send_receive_message(peer->connection, msg);
if (msg_returned == NULL)
goto exit;
if (msg_returned->message_type != msg->message_type)
goto exit;
}
retVal = 1;
exit:
return retVal;
}
@ -94,6 +169,13 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) {
peer->addr_head->item = address;
libp2p_peerstore_add_peer(routing->local_node->peerstore, peer);
libp2p_peer_free(peer);
// now find it and attempt to connect
peer = libp2p_peerstore_get_peer(routing->local_node->peerstore, (const unsigned char*)peer_id, strlen(peer_id));
if (peer == NULL)
return -1; // this should never happen
if (peer->connection == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier)
libp2p_peer_connect(peer);
}
}
}

View File

@ -78,7 +78,7 @@ int test_ping() {
}
int test_ping_remote() {
char* argv[] = { "ipfs", "ping", "/ipfs/QmTjg669YQemhffXLrkA3as9jT8SzyRtWaLXHKwYN6wCBd" };
char* argv[] = { "ipfs", "ping", "QmTjg669YQemhffXLrkA3as9jT8SzyRtWaLXHKwYN6wCBd" };
int argc = 3;
return ipfs_ping(argc, argv);