From 79a2a894dd33aa84f88c45fc5ac331041a84d6c3 Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 30 Mar 2017 13:58:53 -0500 Subject: [PATCH] Adding network calls for kademlia --- include/libp2p/net/stream.h | 6 +- include/libp2p/peer/providerstore.h | 13 +- include/libp2p/routing/dht.h | 30 +++- include/libp2p/routing/dht_protocol.h | 34 ++++ include/libp2p/routing/kademlia.h | 7 + routing/Makefile | 2 +- routing/dht.c | 26 ++- routing/dht_protocol.c | 246 ++++++++++++++++++++++++++ routing/kademlia.c | 9 + 9 files changed, 358 insertions(+), 15 deletions(-) create mode 100644 include/libp2p/routing/dht_protocol.h create mode 100644 routing/dht_protocol.c diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 2f01195..fcf0e8c 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -11,7 +11,7 @@ struct Stream { /** * Reads from the stream - * @param stream the stream + * @param stream the stream context (usually a SessionContext pointer) * @param buffer where to put the results * @param bytes_read how many bytes were read * @returns true(1) on success, false(0) otherwise @@ -20,7 +20,7 @@ struct Stream { /** * Writes to a stream - * @param stream the stream + * @param stream the stream context * @param buffer what to write * @param how much to write * @returns true(1) on success, false(0) otherwise @@ -29,7 +29,7 @@ struct Stream { /** * Closes a stream - * @param stream the stream + * @param stream the stream context * @returns true(1) on success, otherwise false(0) */ int (*close)(void* stream_context); diff --git a/include/libp2p/peer/providerstore.h b/include/libp2p/peer/providerstore.h index 2648876..fe23b22 100644 --- a/include/libp2p/peer/providerstore.h +++ b/include/libp2p/peer/providerstore.h @@ -1,5 +1,9 @@ #pragma once +/** + * Contains a hash and the peer id of + * who can provide it + */ struct ProviderEntry { unsigned char* hash; int hash_size; @@ -7,14 +11,15 @@ struct ProviderEntry { int peer_id_size; }; +/*** + * A structure to store providers. The implementation + * is a vector of ProviderEntry structures, which contain + * the hash and peer id. + */ struct ProviderStore { struct Libp2pVector* provider_entries; }; -/*** - * Stores hashes, and peers where you can possibly get them - */ - /** * Create a new ProviderStore * @returns a ProviderStore struct diff --git a/include/libp2p/routing/dht.h b/include/libp2p/routing/dht.h index 8f889a5..8ba3340 100644 --- a/include/libp2p/routing/dht.h +++ b/include/libp2p/routing/dht.h @@ -24,8 +24,11 @@ THE SOFTWARE. extern "C" { #endif -typedef void -dht_callback(void *closure, int event, +/** + * The callback method that you can implement + * to receive events + */ +typedef void dht_callback(void *closure, int event, const unsigned char *info_hash, const void *data, size_t data_len); @@ -40,9 +43,32 @@ extern FILE *dht_debug; int dht_init(int s, int s6, const unsigned char *id, const unsigned char *v); int dht_insert_node(const unsigned char *id, struct sockaddr *sa, int salen); int dht_ping_node(const struct sockaddr *sa, int salen); +/*** + * Called when something is received from the network or + * the network times out (things that should be done + * periodically) + * @param buf what came in from the network + * @param buflen the size of buf + * @param from where it came from + * @param fromlen + * @param tosleep + * @param callback + * @param closure + * @returns ?? + */ int dht_periodic(const void *buf, size_t buflen, const struct sockaddr *from, int fromlen, time_t *tosleep, dht_callback *callback, void *closure); +/** + * Start a search. If port is non-zero, perform an announce when the + * search is complete. + * @param id the hash to search for + * @param port where it is available + * @param af IP family (AF_INET or AF_INET6) + * @param callback the callback method + * @param closure + * @returns -1 on failure, 1 on success + **/ int dht_search(const unsigned char *id, int port, int af, dht_callback *callback, void *closure); int dht_nodes(int af, diff --git a/include/libp2p/routing/dht_protocol.h b/include/libp2p/routing/dht_protocol.h new file mode 100644 index 0000000..28f98e3 --- /dev/null +++ b/include/libp2p/routing/dht_protocol.h @@ -0,0 +1,34 @@ +#pragma once + +#include "libp2p/conn/session.h" +#include "libp2p/peer/peerstore.h" +#include "libp2p/peer/providerstore.h" + + +/*** + * This is where kademlia and dht talk to the outside world + */ + +/** + * Take existing stream and upgrade to the Kademlia / DHT protocol/codec + * @param context the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_upgrade_stream(struct SessionContext* context); + +/** + * Handle a client requesting an upgrade to the DHT protocol + * @param context the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handshake(struct SessionContext* context); + +/*** + * Handle the incoming message. Handshake should have already + * been done. We should expect that the next read contains + * a protobuf'd kademlia message. + * @param session the context + * @param peerstore a list of peers + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore); diff --git a/include/libp2p/routing/kademlia.h b/include/libp2p/routing/kademlia.h index f7642a1..173ee35 100644 --- a/include/libp2p/routing/kademlia.h +++ b/include/libp2p/routing/kademlia.h @@ -11,6 +11,13 @@ void *kademlia_thread (void *ptr); void *announce_thread (void *ptr); int announce_kademlia (char* peer_id, uint16_t port); + +/*** + * Search for a hash + * @param peer_id the hash to search for + * @param timeout timeout in seconds + * @returns an array of MultiAddress + */ struct MultiAddress** search_kademlia(char* peer_id, int timeout); int ping_kademlia (char *ip, uint16_t port); diff --git a/routing/Makefile b/routing/Makefile index ef8d94b..f71a960 100644 --- a/routing/Makefile +++ b/routing/Makefile @@ -3,7 +3,7 @@ CC = gcc CFLAGS = -O0 -I../include -I../../c-multiaddr/include -I$(DHT_DIR) -g3 LFLAGS = DEPS = # $(DHT_DIR)/dht.h -OBJS = kademlia.o dht.o +OBJS = kademlia.o dht.o dht_protocol.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/routing/dht.c b/routing/dht.c index 471172c..53163ec 100644 --- a/routing/dht.c +++ b/routing/dht.c @@ -1213,10 +1213,17 @@ insert_search_bucket(struct bucket *b, struct search *sr) } } -/* Start a search. If port is non-zero, perform an announce when the - search is complete. */ -int -dht_search(const unsigned char *id, int port, int af, +/** + * Start a search. If port is non-zero, perform an announce when the + * search is complete. + * @param id the hash to search for + * @param port where it is available + * @param af IP family (AF_INET or AF_INET6) + * @param callback the callback method + * @param closure + * @returns -1 on failure, 1 on success + **/ +int dht_search(const unsigned char *id, int port, int af, dht_callback *callback, void *closure) { struct search *sr; @@ -1927,7 +1934,7 @@ bucket_maintenance(int af) /*** * Called when something is received from the network or * the network times out (things that should be done - * periodically + * periodically) * @param buf what came in from the network * @param buflen the size of buf * @param from where it came from @@ -2379,6 +2386,15 @@ dht_ping_node(const struct sockaddr *sa, int salen) COPY(buf, offset, my_v, sizeof(my_v), size); \ } +/*** + * Send data over the network + * @param buf what to send + * @param len the length of buf + * @param flags + * @param sa who to send to + * @param salen the length of sa + * @returns -1 on error, or number of bytes sent + */ static int dht_send(const void *buf, size_t len, int flags, const struct sockaddr *sa, int salen) diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c new file mode 100644 index 0000000..4d091a9 --- /dev/null +++ b/routing/dht_protocol.c @@ -0,0 +1,246 @@ +#include +#include + +#include "libp2p/net/stream.h" +#include "libp2p/routing/dht_protocol.h" +#include "libp2p/record/message.h" + + +/*** + * This is where kademlia and dht talk to the outside world + */ + +/** + * Take existing stream and upgrade to the Kademlia / DHT protocol/codec + * @param context the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { + int retVal = 0; + char* protocol = "/ipfs/kad/1.0.0\n"; + unsigned char* results = NULL; + size_t results_size = 0; + if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) + goto exit; + if (!context->default_stream->read(context, &results, &results_size)) + goto exit; + if (results_size != strlen(protocol)) + goto exit; + if (strncmp((char*)results, protocol, results_size) != 0) + goto exit; + retVal = 1; + exit: + if (results != NULL) { + free(results); + results = NULL; + } + return retVal; +} + +/** + * Handle a client requesting an upgrade to the DHT protocol + * @param context the context + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handshake(struct SessionContext* context) { + char* protocol = "/ipfs/kad/1.0.0\n"; + return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)); +} + +/** + * A remote client has requested a ping + * @param message the message + * @param buffer where to put the results + * @param buffer_size the length of the results + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) { + // just turn message back into a protobuf and send it back... + *buffer_size = libp2p_message_protobuf_encode_size(message); + *buffer = (unsigned char*)malloc(*buffer_size); + return libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size); +} + +/** + * See if we have information as to who can provide this item + * @param session the context + * @param message the message from the caller, contains a key + * @param peerstore the list of peers + * @param providerstore the list of peers that can provide things + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct Libp2pMessage* message, struct Peerstore* peerstore, + struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) { + unsigned char* peer_id = NULL; + int peer_id_size = 0; + + // This shouldn't be needed, but just in case: + message->provider_peer_head = NULL; + + // Can I provide it? + if (libp2p_providerstore_get(providerstore, message->key, message->key_size, &peer_id, &peer_id_size)) { + // we have a peer id, convert it to a peer object + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, peer_id, peer_id_size); + if (peer->addr_head != NULL) + message->provider_peer_head = peer->addr_head; + } + free(peer_id); + // TODO: find closer peers + /* + if (message->provider_peer_head == NULL) { + // Who else can provide it? + //while () + } + */ + if (message->provider_peer_head != NULL) { + // protobuf it and send it back + *results_size = libp2p_message_protobuf_encode_size(message); + *results = (unsigned char*)malloc(*results_size); + if (!libp2p_message_protobuf_encode(message, *results, *results_size, results_size)) { + free(*results); + *results = NULL; + *results_size = 0; + return 0; + } + } + return 1; +} + +/*** + * Remote peer has announced that he can provide a key + * @param session session context + * @param message the message + * @param peerstore the peerstore + * @param providerstore the providerstore + * @param result_buffer where to put the result + * @param result_buffer_size the size of the result buffer + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct Libp2pMessage* message, + struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) { + //TODO: verify peer signature + if (message->record != NULL && message->record->author != NULL && message->record->author_size > 0 + && message->key != NULL && message->key_size > 0) { + struct Libp2pPeer* peer = libp2p_peer_new(); + peer->id_size = message->record->author_size; + peer->id = malloc(peer->id_size); + memcpy(peer->id, message->record->author, message->record->author_size); + int retVal = libp2p_peerstore_add_peer(peerstore, peer); + libp2p_peer_free(peer); + return retVal; + } + return 0; +} + +/** + * Retrieve something from the dht datastore + * @param session the session context + * @param message the message + * @param peerstore the peerstore + * @param providerstore the providerstore + * @param result_buffer the results + * @param result_buffer_size the size of the results + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct Libp2pMessage* message, + struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { + //TODO: implement this + return 0; +} + +/** + * Put something in the dht datastore + * @param session the session context + * @param message the message + * @param peerstore the peerstore + * @param providerstore the providerstore + * @param result_buffer the results + * @param result_buffer_size the size of the results + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct Libp2pMessage* message, + struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { + //TODO: implement this + return 0; +} + +/** + * Find a node + * @param session the session context + * @param message the message + * @param peerstore the peerstore + * @param providerstore the providerstore + * @param result_buffer the results + * @param result_buffer_size the size of the results + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct Libp2pMessage* message, + struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { + // look through peer store + struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, message->key, message->key_size); + if (peer != NULL) { + message->closer_peer_head = peer->addr_head; + *result_buffer_size = libp2p_message_protobuf_encode_size(message); + *result_buffer = (unsigned char*)malloc(*result_buffer_size); + if (libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size)) { + return 1; + } + free(*result_buffer); + *result_buffer = NULL; + *result_buffer_size = 0; + } + return 0; +} + +/*** + * Handle the incoming message. Handshake should have already + * been done. We should expect that the next read contains + * a protobuf'd kademlia message. + * @param session the context + * @param peerstore a list of peers + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) { + unsigned char* buffer = NULL, *result_buffer = NULL; + size_t buffer_size = 0, result_buffer_size = 0; + int retVal = 0; + struct Libp2pMessage* message = NULL; + + // read from stream + if (!session->default_stream->read(session, &buffer, &buffer_size)) + goto exit; + // unprotobuf + if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message)) + goto exit; + // handle message + switch(message->message_type) { + case(MESSAGE_TYPE_PUT_VALUE): // store a value in local storage + libp2p_routing_dht_handle_put_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + break; + case(MESSAGE_TYPE_GET_VALUE): // get a value from local storage + libp2p_routing_dht_handle_get_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + break; + case(MESSAGE_TYPE_ADD_PROVIDER): // client wants us to know he can provide something + libp2p_routing_dht_handle_add_provider(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + break; + case(MESSAGE_TYPE_GET_PROVIDERS): // see if we can help, and send closer peers + libp2p_routing_dht_handle_get_providers(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + break; + case(MESSAGE_TYPE_FIND_NODE): // find peers + libp2p_routing_dht_handle_find_node(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size); + break; + case(MESSAGE_TYPE_PING): + libp2p_routing_dht_handle_ping(message, &result_buffer, &result_buffer_size); + break; + } + // if we have something to send, send it. + if (result_buffer != NULL && !session->default_stream->write(session, result_buffer, result_buffer_size)) + goto exit; + retVal = 1; + exit: + if (buffer != NULL) + free(buffer); + if (result_buffer != NULL) + free(result_buffer); + return retVal; +} diff --git a/routing/kademlia.c b/routing/kademlia.c index 398e5e4..5aaa3fb 100644 --- a/routing/kademlia.c +++ b/routing/kademlia.c @@ -365,6 +365,13 @@ void *kademlia_thread (void *ptr) return (void*)1; } +/** + * Search for a hash + * @param id the hash to look for + * @param port the port if it is available + * @param to the time out + * @returns the time left + */ int search_kademlia_internal (unsigned char* id, int port, int to) { int i; @@ -478,6 +485,8 @@ struct MultiAddress** search_kademlia(char* peer_id, int timeout) dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); + //TODO: Is this the right place to ask the net? + dht_search(peer_id, 0, AF_INET, NULL, NULL); to = search_kademlia_internal (id, 0, to); if (to == 0) return NULL; // time out.