Adding network calls for kademlia
This commit is contained in:
parent
db0f62cee4
commit
79a2a894dd
9 changed files with 358 additions and 15 deletions
|
@ -11,7 +11,7 @@ struct Stream {
|
|||
|
||||
/**
|
||||
* Reads from the stream
|
||||
* @param stream the stream
|
||||
* @param stream the stream context (usually a SessionContext pointer)
|
||||
* @param buffer where to put the results
|
||||
* @param bytes_read how many bytes were read
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
|
@ -20,7 +20,7 @@ struct Stream {
|
|||
|
||||
/**
|
||||
* Writes to a stream
|
||||
* @param stream the stream
|
||||
* @param stream the stream context
|
||||
* @param buffer what to write
|
||||
* @param how much to write
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
|
@ -29,7 +29,7 @@ struct Stream {
|
|||
|
||||
/**
|
||||
* Closes a stream
|
||||
* @param stream the stream
|
||||
* @param stream the stream context
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int (*close)(void* stream_context);
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
#pragma once
|
||||
|
||||
/**
|
||||
* Contains a hash and the peer id of
|
||||
* who can provide it
|
||||
*/
|
||||
struct ProviderEntry {
|
||||
unsigned char* hash;
|
||||
int hash_size;
|
||||
|
@ -7,14 +11,15 @@ struct ProviderEntry {
|
|||
int peer_id_size;
|
||||
};
|
||||
|
||||
/***
|
||||
* A structure to store providers. The implementation
|
||||
* is a vector of ProviderEntry structures, which contain
|
||||
* the hash and peer id.
|
||||
*/
|
||||
struct ProviderStore {
|
||||
struct Libp2pVector* provider_entries;
|
||||
};
|
||||
|
||||
/***
|
||||
* Stores hashes, and peers where you can possibly get them
|
||||
*/
|
||||
|
||||
/**
|
||||
* Create a new ProviderStore
|
||||
* @returns a ProviderStore struct
|
||||
|
|
|
@ -24,8 +24,11 @@ THE SOFTWARE.
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef void
|
||||
dht_callback(void *closure, int event,
|
||||
/**
|
||||
* The callback method that you can implement
|
||||
* to receive events
|
||||
*/
|
||||
typedef void dht_callback(void *closure, int event,
|
||||
const unsigned char *info_hash,
|
||||
const void *data, size_t data_len);
|
||||
|
||||
|
@ -40,9 +43,32 @@ extern FILE *dht_debug;
|
|||
int dht_init(int s, int s6, const unsigned char *id, const unsigned char *v);
|
||||
int dht_insert_node(const unsigned char *id, struct sockaddr *sa, int salen);
|
||||
int dht_ping_node(const struct sockaddr *sa, int salen);
|
||||
/***
|
||||
* Called when something is received from the network or
|
||||
* the network times out (things that should be done
|
||||
* periodically)
|
||||
* @param buf what came in from the network
|
||||
* @param buflen the size of buf
|
||||
* @param from where it came from
|
||||
* @param fromlen
|
||||
* @param tosleep
|
||||
* @param callback
|
||||
* @param closure
|
||||
* @returns ??
|
||||
*/
|
||||
int dht_periodic(const void *buf, size_t buflen,
|
||||
const struct sockaddr *from, int fromlen,
|
||||
time_t *tosleep, dht_callback *callback, void *closure);
|
||||
/**
|
||||
* Start a search. If port is non-zero, perform an announce when the
|
||||
* search is complete.
|
||||
* @param id the hash to search for
|
||||
* @param port where it is available
|
||||
* @param af IP family (AF_INET or AF_INET6)
|
||||
* @param callback the callback method
|
||||
* @param closure
|
||||
* @returns -1 on failure, 1 on success
|
||||
**/
|
||||
int dht_search(const unsigned char *id, int port, int af,
|
||||
dht_callback *callback, void *closure);
|
||||
int dht_nodes(int af,
|
||||
|
|
34
include/libp2p/routing/dht_protocol.h
Normal file
34
include/libp2p/routing/dht_protocol.h
Normal file
|
@ -0,0 +1,34 @@
|
|||
#pragma once
|
||||
|
||||
#include "libp2p/conn/session.h"
|
||||
#include "libp2p/peer/peerstore.h"
|
||||
#include "libp2p/peer/providerstore.h"
|
||||
|
||||
|
||||
/***
|
||||
* This is where kademlia and dht talk to the outside world
|
||||
*/
|
||||
|
||||
/**
|
||||
* Take existing stream and upgrade to the Kademlia / DHT protocol/codec
|
||||
* @param context the context
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_upgrade_stream(struct SessionContext* context);
|
||||
|
||||
/**
|
||||
* Handle a client requesting an upgrade to the DHT protocol
|
||||
* @param context the context
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handshake(struct SessionContext* context);
|
||||
|
||||
/***
|
||||
* Handle the incoming message. Handshake should have already
|
||||
* been done. We should expect that the next read contains
|
||||
* a protobuf'd kademlia message.
|
||||
* @param session the context
|
||||
* @param peerstore a list of peers
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore);
|
|
@ -11,6 +11,13 @@ void *kademlia_thread (void *ptr);
|
|||
void *announce_thread (void *ptr);
|
||||
|
||||
int announce_kademlia (char* peer_id, uint16_t port);
|
||||
|
||||
/***
|
||||
* Search for a hash
|
||||
* @param peer_id the hash to search for
|
||||
* @param timeout timeout in seconds
|
||||
* @returns an array of MultiAddress
|
||||
*/
|
||||
struct MultiAddress** search_kademlia(char* peer_id, int timeout);
|
||||
|
||||
int ping_kademlia (char *ip, uint16_t port);
|
||||
|
|
|
@ -3,7 +3,7 @@ CC = gcc
|
|||
CFLAGS = -O0 -I../include -I../../c-multiaddr/include -I$(DHT_DIR) -g3
|
||||
LFLAGS =
|
||||
DEPS = # $(DHT_DIR)/dht.h
|
||||
OBJS = kademlia.o dht.o
|
||||
OBJS = kademlia.o dht.o dht_protocol.o
|
||||
|
||||
%.o: %.c $(DEPS)
|
||||
$(CC) -c -o $@ $< $(CFLAGS)
|
||||
|
|
|
@ -1213,10 +1213,17 @@ insert_search_bucket(struct bucket *b, struct search *sr)
|
|||
}
|
||||
}
|
||||
|
||||
/* Start a search. If port is non-zero, perform an announce when the
|
||||
search is complete. */
|
||||
int
|
||||
dht_search(const unsigned char *id, int port, int af,
|
||||
/**
|
||||
* Start a search. If port is non-zero, perform an announce when the
|
||||
* search is complete.
|
||||
* @param id the hash to search for
|
||||
* @param port where it is available
|
||||
* @param af IP family (AF_INET or AF_INET6)
|
||||
* @param callback the callback method
|
||||
* @param closure
|
||||
* @returns -1 on failure, 1 on success
|
||||
**/
|
||||
int dht_search(const unsigned char *id, int port, int af,
|
||||
dht_callback *callback, void *closure)
|
||||
{
|
||||
struct search *sr;
|
||||
|
@ -1927,7 +1934,7 @@ bucket_maintenance(int af)
|
|||
/***
|
||||
* Called when something is received from the network or
|
||||
* the network times out (things that should be done
|
||||
* periodically
|
||||
* periodically)
|
||||
* @param buf what came in from the network
|
||||
* @param buflen the size of buf
|
||||
* @param from where it came from
|
||||
|
@ -2379,6 +2386,15 @@ dht_ping_node(const struct sockaddr *sa, int salen)
|
|||
COPY(buf, offset, my_v, sizeof(my_v), size); \
|
||||
}
|
||||
|
||||
/***
|
||||
* Send data over the network
|
||||
* @param buf what to send
|
||||
* @param len the length of buf
|
||||
* @param flags
|
||||
* @param sa who to send to
|
||||
* @param salen the length of sa
|
||||
* @returns -1 on error, or number of bytes sent
|
||||
*/
|
||||
static int
|
||||
dht_send(const void *buf, size_t len, int flags,
|
||||
const struct sockaddr *sa, int salen)
|
||||
|
|
246
routing/dht_protocol.c
Normal file
246
routing/dht_protocol.c
Normal file
|
@ -0,0 +1,246 @@
|
|||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "libp2p/net/stream.h"
|
||||
#include "libp2p/routing/dht_protocol.h"
|
||||
#include "libp2p/record/message.h"
|
||||
|
||||
|
||||
/***
|
||||
* This is where kademlia and dht talk to the outside world
|
||||
*/
|
||||
|
||||
/**
|
||||
* Take existing stream and upgrade to the Kademlia / DHT protocol/codec
|
||||
* @param context the context
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) {
|
||||
int retVal = 0;
|
||||
char* protocol = "/ipfs/kad/1.0.0\n";
|
||||
unsigned char* results = NULL;
|
||||
size_t results_size = 0;
|
||||
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)))
|
||||
goto exit;
|
||||
if (!context->default_stream->read(context, &results, &results_size))
|
||||
goto exit;
|
||||
if (results_size != strlen(protocol))
|
||||
goto exit;
|
||||
if (strncmp((char*)results, protocol, results_size) != 0)
|
||||
goto exit;
|
||||
retVal = 1;
|
||||
exit:
|
||||
if (results != NULL) {
|
||||
free(results);
|
||||
results = NULL;
|
||||
}
|
||||
return retVal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a client requesting an upgrade to the DHT protocol
|
||||
* @param context the context
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handshake(struct SessionContext* context) {
|
||||
char* protocol = "/ipfs/kad/1.0.0\n";
|
||||
return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol));
|
||||
}
|
||||
|
||||
/**
|
||||
* A remote client has requested a ping
|
||||
* @param message the message
|
||||
* @param buffer where to put the results
|
||||
* @param buffer_size the length of the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) {
|
||||
// just turn message back into a protobuf and send it back...
|
||||
*buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*buffer = (unsigned char*)malloc(*buffer_size);
|
||||
return libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size);
|
||||
}
|
||||
|
||||
/**
|
||||
* See if we have information as to who can provide this item
|
||||
* @param session the context
|
||||
* @param message the message from the caller, contains a key
|
||||
* @param peerstore the list of peers
|
||||
* @param providerstore the list of peers that can provide things
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct Libp2pMessage* message, struct Peerstore* peerstore,
|
||||
struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) {
|
||||
unsigned char* peer_id = NULL;
|
||||
int peer_id_size = 0;
|
||||
|
||||
// This shouldn't be needed, but just in case:
|
||||
message->provider_peer_head = NULL;
|
||||
|
||||
// Can I provide it?
|
||||
if (libp2p_providerstore_get(providerstore, message->key, message->key_size, &peer_id, &peer_id_size)) {
|
||||
// we have a peer id, convert it to a peer object
|
||||
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, peer_id, peer_id_size);
|
||||
if (peer->addr_head != NULL)
|
||||
message->provider_peer_head = peer->addr_head;
|
||||
}
|
||||
free(peer_id);
|
||||
// TODO: find closer peers
|
||||
/*
|
||||
if (message->provider_peer_head == NULL) {
|
||||
// Who else can provide it?
|
||||
//while ()
|
||||
}
|
||||
*/
|
||||
if (message->provider_peer_head != NULL) {
|
||||
// protobuf it and send it back
|
||||
*results_size = libp2p_message_protobuf_encode_size(message);
|
||||
*results = (unsigned char*)malloc(*results_size);
|
||||
if (!libp2p_message_protobuf_encode(message, *results, *results_size, results_size)) {
|
||||
free(*results);
|
||||
*results = NULL;
|
||||
*results_size = 0;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/***
|
||||
* Remote peer has announced that he can provide a key
|
||||
* @param session session context
|
||||
* @param message the message
|
||||
* @param peerstore the peerstore
|
||||
* @param providerstore the providerstore
|
||||
* @param result_buffer where to put the result
|
||||
* @param result_buffer_size the size of the result buffer
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct Libp2pMessage* message,
|
||||
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) {
|
||||
//TODO: verify peer signature
|
||||
if (message->record != NULL && message->record->author != NULL && message->record->author_size > 0
|
||||
&& message->key != NULL && message->key_size > 0) {
|
||||
struct Libp2pPeer* peer = libp2p_peer_new();
|
||||
peer->id_size = message->record->author_size;
|
||||
peer->id = malloc(peer->id_size);
|
||||
memcpy(peer->id, message->record->author, message->record->author_size);
|
||||
int retVal = libp2p_peerstore_add_peer(peerstore, peer);
|
||||
libp2p_peer_free(peer);
|
||||
return retVal;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieve something from the dht datastore
|
||||
* @param session the session context
|
||||
* @param message the message
|
||||
* @param peerstore the peerstore
|
||||
* @param providerstore the providerstore
|
||||
* @param result_buffer the results
|
||||
* @param result_buffer_size the size of the results
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct Libp2pMessage* message,
|
||||
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
|
||||
//TODO: implement this
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Put something in the dht datastore
|
||||
* @param session the session context
|
||||
* @param message the message
|
||||
* @param peerstore the peerstore
|
||||
* @param providerstore the providerstore
|
||||
* @param result_buffer the results
|
||||
* @param result_buffer_size the size of the results
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct Libp2pMessage* message,
|
||||
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
|
||||
//TODO: implement this
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Find a node
|
||||
* @param session the session context
|
||||
* @param message the message
|
||||
* @param peerstore the peerstore
|
||||
* @param providerstore the providerstore
|
||||
* @param result_buffer the results
|
||||
* @param result_buffer_size the size of the results
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct Libp2pMessage* message,
|
||||
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
|
||||
// look through peer store
|
||||
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, message->key, message->key_size);
|
||||
if (peer != NULL) {
|
||||
message->closer_peer_head = peer->addr_head;
|
||||
*result_buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*result_buffer = (unsigned char*)malloc(*result_buffer_size);
|
||||
if (libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size)) {
|
||||
return 1;
|
||||
}
|
||||
free(*result_buffer);
|
||||
*result_buffer = NULL;
|
||||
*result_buffer_size = 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/***
|
||||
* Handle the incoming message. Handshake should have already
|
||||
* been done. We should expect that the next read contains
|
||||
* a protobuf'd kademlia message.
|
||||
* @param session the context
|
||||
* @param peerstore a list of peers
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) {
|
||||
unsigned char* buffer = NULL, *result_buffer = NULL;
|
||||
size_t buffer_size = 0, result_buffer_size = 0;
|
||||
int retVal = 0;
|
||||
struct Libp2pMessage* message = NULL;
|
||||
|
||||
// read from stream
|
||||
if (!session->default_stream->read(session, &buffer, &buffer_size))
|
||||
goto exit;
|
||||
// unprotobuf
|
||||
if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message))
|
||||
goto exit;
|
||||
// handle message
|
||||
switch(message->message_type) {
|
||||
case(MESSAGE_TYPE_PUT_VALUE): // store a value in local storage
|
||||
libp2p_routing_dht_handle_put_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
case(MESSAGE_TYPE_GET_VALUE): // get a value from local storage
|
||||
libp2p_routing_dht_handle_get_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
case(MESSAGE_TYPE_ADD_PROVIDER): // client wants us to know he can provide something
|
||||
libp2p_routing_dht_handle_add_provider(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
case(MESSAGE_TYPE_GET_PROVIDERS): // see if we can help, and send closer peers
|
||||
libp2p_routing_dht_handle_get_providers(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
case(MESSAGE_TYPE_FIND_NODE): // find peers
|
||||
libp2p_routing_dht_handle_find_node(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
case(MESSAGE_TYPE_PING):
|
||||
libp2p_routing_dht_handle_ping(message, &result_buffer, &result_buffer_size);
|
||||
break;
|
||||
}
|
||||
// if we have something to send, send it.
|
||||
if (result_buffer != NULL && !session->default_stream->write(session, result_buffer, result_buffer_size))
|
||||
goto exit;
|
||||
retVal = 1;
|
||||
exit:
|
||||
if (buffer != NULL)
|
||||
free(buffer);
|
||||
if (result_buffer != NULL)
|
||||
free(result_buffer);
|
||||
return retVal;
|
||||
}
|
|
@ -365,6 +365,13 @@ void *kademlia_thread (void *ptr)
|
|||
return (void*)1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for a hash
|
||||
* @param id the hash to look for
|
||||
* @param port the port if it is available
|
||||
* @param to the time out
|
||||
* @returns the time left
|
||||
*/
|
||||
int search_kademlia_internal (unsigned char* id, int port, int to)
|
||||
{
|
||||
int i;
|
||||
|
@ -478,6 +485,8 @@ struct MultiAddress** search_kademlia(char* peer_id, int timeout)
|
|||
|
||||
dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0);
|
||||
|
||||
//TODO: Is this the right place to ask the net?
|
||||
dht_search(peer_id, 0, AF_INET, NULL, NULL);
|
||||
to = search_kademlia_internal (id, 0, to);
|
||||
if (to == 0) return NULL; // time out.
|
||||
|
||||
|
|
Loading…
Reference in a new issue