2017-03-30 18:58:53 +00:00
# include <stdlib.h>
# include <string.h>
# include "libp2p/net/stream.h"
# include "libp2p/routing/dht_protocol.h"
# include "libp2p/record/message.h"
2017-07-31 17:49:41 +00:00
# include "libp2p/utils/linked_list.h"
2017-04-03 23:13:42 +00:00
# include "libp2p/utils/logger.h"
2017-04-20 22:55:18 +00:00
# include "libp2p/conn/session.h"
2017-03-30 18:58:53 +00:00
/***
* This is where kademlia and dht talk to the outside world
*/
2017-04-06 14:55:01 +00:00
/***
* Helper method to protobuf a message
* @ param message the message
* @ param buffer where to put the results
* @ param buffer_size the size of the results
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_routing_dht_protobuf_message ( struct Libp2pMessage * message , unsigned char * * buffer , size_t * buffer_size ) {
* buffer_size = libp2p_message_protobuf_encode_size ( message ) ;
* buffer = malloc ( * buffer_size ) ;
if ( ! libp2p_message_protobuf_encode ( message , * buffer , * buffer_size , buffer_size ) ) {
free ( * buffer ) ;
* buffer_size = 0 ;
return 0 ;
}
return 1 ;
}
2017-03-30 18:58:53 +00:00
/**
* Take existing stream and upgrade to the Kademlia / DHT protocol / codec
* @ param context the context
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_upgrade_stream ( struct SessionContext * context ) {
int retVal = 0 ;
char * protocol = " /ipfs/kad/1.0.0 \n " ;
unsigned char * results = NULL ;
size_t results_size = 0 ;
2017-08-03 22:47:02 +00:00
if ( ! context - > default_stream - > write ( context , ( unsigned char * ) protocol , strlen ( protocol ) ) ) {
libp2p_logger_error ( " dht_protocol " , " Unable to write to stream during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
if ( ! context - > default_stream - > read ( context , & results , & results_size , 5 ) ) {
libp2p_logger_error ( " dht_protocol " , " Unable to read from stream during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
if ( results_size ! = strlen ( protocol ) ) {
libp2p_logger_error ( " dht_protocol " , " Expected response size incorrect during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
if ( strncmp ( ( char * ) results , protocol , results_size ) ! = 0 ) {
libp2p_logger_error ( " dht_protocol " , " Expected %s but received %s. \n " , protocol , results ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
2017-03-30 18:58:53 +00:00
retVal = 1 ;
exit :
if ( results ! = NULL ) {
free ( results ) ;
results = NULL ;
}
return retVal ;
}
/**
* Handle a client requesting an upgrade to the DHT protocol
* @ param context the context
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handshake ( struct SessionContext * context ) {
char * protocol = " /ipfs/kad/1.0.0 \n " ;
return context - > default_stream - > write ( context , ( unsigned char * ) protocol , strlen ( protocol ) ) ;
}
/**
* A remote client has requested a ping
* @ param message the message
* @ param buffer where to put the results
* @ param buffer_size the length of the results
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_routing_dht_handle_ping ( struct Libp2pMessage * message , unsigned char * * buffer , size_t * buffer_size ) {
// just turn message back into a protobuf and send it back...
2017-04-06 14:55:01 +00:00
return libp2p_routing_dht_protobuf_message ( message , buffer , buffer_size ) ;
2017-03-30 18:58:53 +00:00
}
/**
* See if we have information as to who can provide this item
* @ param session the context
* @ param message the message from the caller , contains a key
* @ param peerstore the list of peers
* @ param providerstore the list of peers that can provide things
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_routing_dht_handle_get_providers ( struct SessionContext * session , struct Libp2pMessage * message , struct Peerstore * peerstore ,
struct ProviderStore * providerstore , unsigned char * * results , size_t * results_size ) {
unsigned char * peer_id = NULL ;
int peer_id_size = 0 ;
// This shouldn't be needed, but just in case:
message - > provider_peer_head = NULL ;
2017-07-31 17:49:41 +00:00
// Can I provide it locally?
unsigned char buf [ 65535 ] ;
size_t buf_size = 0 ;
if ( session - > datastore - > datastore_get ( message - > key , message - > key_size , & buf [ 0 ] , buf_size , & buf_size , session - > datastore ) ) {
// we can provide this hash from our datastore
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
struct Libp2pPeer * local_peer = ( struct Libp2pPeer * ) peerstore - > head_entry - > item ;
message - > provider_peer_head - > item = local_peer ;
}
// Can I provide it because someone announced it earlier?
2017-07-17 21:14:20 +00:00
if ( libp2p_providerstore_get ( providerstore , ( unsigned char * ) message - > key , message - > key_size , & peer_id , & peer_id_size ) ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " I can provide a provider for this key. \n " ) ;
2017-03-30 18:58:53 +00:00
// we have a peer id, convert it to a peer object
struct Libp2pPeer * peer = libp2p_peerstore_get_peer ( peerstore , peer_id , peer_id_size ) ;
2017-04-17 04:46:52 +00:00
if ( peer ! = NULL ) {
2017-07-31 17:49:41 +00:00
// add it to the message
if ( message - > provider_peer_head = = NULL ) {
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
message - > provider_peer_head - > item = peer ;
} else {
struct Libp2pLinkedList * current = message - > provider_peer_head ;
// find the last one in the list
while ( current - > next ! = NULL ) {
current = current - > next ;
}
// add to the list
current - > next = libp2p_utils_linked_list_new ( ) ;
current - > next - > item = peer ;
}
2017-04-17 04:46:52 +00:00
}
2017-04-20 22:55:18 +00:00
} else {
libp2p_logger_debug ( " dht_protocol " , " I cannot provide a provider for this key. \n " ) ;
2017-03-30 18:58:53 +00:00
}
2017-07-31 17:49:41 +00:00
if ( peer_id ! = NULL )
free ( peer_id ) ;
2017-03-30 18:58:53 +00:00
// TODO: find closer peers
/*
if ( message - > provider_peer_head = = NULL ) {
// Who else can provide it?
//while ()
}
*/
if ( message - > provider_peer_head ! = NULL ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " GetProviders: We have a peer. Sending it back \n " ) ;
2017-03-30 18:58:53 +00:00
// protobuf it and send it back
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , results , results_size ) ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_error ( " dht_protocol " , " GetProviders: Error protobufing results \n " ) ;
2017-03-30 18:58:53 +00:00
return 0 ;
}
}
return 1 ;
}
2017-04-04 01:54:41 +00:00
/***
* helper method to get ip multiaddress from peer ' s linked list
* @ param head linked list of multiaddresses
* @ returns the IP multiaddress in the list , or NULL if none found
*/
struct MultiAddress * libp2p_routing_dht_find_peer_ip_multiaddress ( struct Libp2pLinkedList * head ) {
struct MultiAddress * out = NULL ;
struct Libp2pLinkedList * current = head ;
while ( current ! = NULL ) {
out = ( struct MultiAddress * ) current - > item ;
2017-04-04 02:17:29 +00:00
if ( multiaddress_is_ip ( out ) ) {
libp2p_logger_debug ( " dht_protocol " , " Found MultiAddress %s \n " , out - > string ) ;
2017-04-04 01:54:41 +00:00
break ;
2017-04-04 02:17:29 +00:00
}
2017-04-04 01:54:41 +00:00
current = current - > next ;
}
if ( current = = NULL )
out = NULL ;
return out ;
}
2017-03-30 18:58:53 +00:00
/***
* Remote peer has announced that he can provide a key
* @ param session session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
* @ param result_buffer where to put the result
* @ param result_buffer_size the size of the result buffer
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handle_add_provider ( struct SessionContext * session , struct Libp2pMessage * message ,
struct Peerstore * peerstore , struct ProviderStore * providerstore , unsigned char * * result_buffer , size_t * result_buffer_size ) {
2017-04-03 22:26:08 +00:00
int retVal = 0 ;
struct Libp2pPeer * peer = NULL ;
2017-04-03 23:03:49 +00:00
2017-03-30 18:58:53 +00:00
//TODO: verify peer signature
2017-04-03 23:13:42 +00:00
/*
2017-03-30 18:58:53 +00:00
if ( message - > record ! = NULL & & message - > record - > author ! = NULL & & message - > record - > author_size > 0
2017-04-03 23:13:42 +00:00
& & message - > key ! = NULL & & message - > key_size > 0 )
*/
2017-04-20 22:55:18 +00:00
2017-04-03 23:13:42 +00:00
struct Libp2pLinkedList * current = message - > provider_peer_head ;
2017-04-04 02:03:16 +00:00
if ( current = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Provider has no peer. \n " ) ;
goto exit ;
}
2017-04-04 01:54:41 +00:00
// there should only be 1 when adding a provider
if ( current ! = NULL ) {
2017-04-17 04:46:52 +00:00
peer = current - > item ;
if ( peer = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Message add_provider has no peer \n " ) ;
goto exit ;
}
2017-04-04 01:54:41 +00:00
struct MultiAddress * peer_ma = libp2p_routing_dht_find_peer_ip_multiaddress ( peer - > addr_head ) ;
2017-04-04 02:03:16 +00:00
if ( peer_ma = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Peer has no IP MultiAddress. \n " ) ;
goto exit ;
}
2017-04-04 01:54:41 +00:00
// add what we know to be the ip for this peer
char * ip ;
char new_string [ 255 ] ;
multiaddress_get_ip_address ( session - > default_stream - > address , & ip ) ;
int port = multiaddress_get_ip_port ( peer_ma ) ;
2017-04-17 16:57:37 +00:00
char * peer_id = multiaddress_get_peer_id ( peer_ma ) ;
sprintf ( new_string , " /ip4/%s/tcp/%d/ipfs/%s " , ip , port , peer_id ) ;
free ( ip ) ;
free ( peer_id ) ;
2017-04-04 01:54:41 +00:00
struct MultiAddress * new_ma = multiaddress_new_from_string ( new_string ) ;
2017-04-17 04:46:52 +00:00
if ( new_ma = = NULL )
goto exit ;
2017-04-04 02:03:16 +00:00
libp2p_logger_debug ( " dht_protocol " , " New MultiAddress made with %s. \n " , new_string ) ;
2017-04-06 14:32:54 +00:00
// TODO: See if the sender is who he says he is
2017-04-04 01:54:41 +00:00
// set it as the first in the list
struct Libp2pLinkedList * new_head = libp2p_utils_linked_list_new ( ) ;
new_head - > item = new_ma ;
new_head - > next = peer - > addr_head ;
peer - > addr_head = new_head ;
// now add the peer to the peerstore
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " About to add peer %s to peerstore \n " , peer_ma - > string ) ;
2017-04-03 22:26:08 +00:00
if ( ! libp2p_peerstore_add_peer ( peerstore , peer ) )
goto exit ;
2017-04-04 03:07:22 +00:00
libp2p_logger_debug ( " dht_protocol " , " About to add key to providerstore \n " ) ;
2017-07-17 21:14:20 +00:00
if ( ! libp2p_providerstore_add ( providerstore , ( unsigned char * ) message - > key , message - > key_size , ( unsigned char * ) peer - > id , peer - > id_size ) )
2017-04-03 22:26:08 +00:00
goto exit ;
2017-03-30 18:58:53 +00:00
}
2017-04-03 23:13:42 +00:00
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
2017-04-03 23:13:42 +00:00
goto exit ;
2017-04-06 14:55:01 +00:00
}
2017-04-03 23:13:42 +00:00
2017-04-03 22:26:08 +00:00
retVal = 1 ;
exit :
2017-04-03 23:03:49 +00:00
if ( retVal ! = 1 ) {
if ( * result_buffer ! = NULL ) {
free ( * result_buffer ) ;
* result_buffer_size = 0 ;
* result_buffer = NULL ;
}
libp2p_logger_error ( " dht_protocol " , " add_provider returning false \n " ) ;
}
2017-04-17 19:03:27 +00:00
/*
2017-04-03 22:26:08 +00:00
if ( peer ! = NULL )
libp2p_peer_free ( peer ) ;
2017-04-17 19:03:27 +00:00
*/
2017-04-03 22:26:08 +00:00
return retVal ;
2017-03-30 18:58:53 +00:00
}
/**
* Retrieve something from the dht datastore
* @ param session the session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
* @ param result_buffer the results
* @ param result_buffer_size the size of the results
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handle_get_value ( struct SessionContext * session , struct Libp2pMessage * message ,
struct Peerstore * peerstore , struct ProviderStore * providerstore , unsigned char * * result_buffer , size_t * result_buffer_size ) {
2017-04-20 22:55:18 +00:00
2017-04-06 14:55:01 +00:00
struct Datastore * datastore = session - > datastore ;
2017-04-20 22:55:18 +00:00
struct Filestore * filestore = session - > filestore ;
size_t data_size = 0 ;
unsigned char * data = NULL ;
// We need to get the data from the disk
2017-07-17 21:14:20 +00:00
if ( ! filestore - > node_get ( ( unsigned char * ) message - > key , message - > key_size , ( void * * ) & data , & data_size , filestore ) ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " handle_get_value: Unable to get key from filestore \n " ) ;
2017-04-06 14:55:01 +00:00
}
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " handle_get_value: value retrieved from the datastore \n " ) ;
2017-04-06 14:55:01 +00:00
struct Libp2pRecord * record = libp2p_record_new ( ) ;
record - > key_size = message - > key_size ;
record - > key = malloc ( record - > key_size ) ;
memcpy ( record - > key , message - > key , record - > key_size ) ;
record - > value_size = data_size ;
record - > value = malloc ( record - > value_size ) ;
memcpy ( record - > value , data , record - > value_size ) ;
message - > record = record ;
free ( data ) ;
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
libp2p_record_free ( record ) ;
message - > record = NULL ;
return 0 ;
}
return 1 ;
2017-03-30 18:58:53 +00:00
}
/**
* Put something in the dht datastore
* @ param session the session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
* @ param result_buffer the results
* @ param result_buffer_size the size of the results
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handle_put_value ( struct SessionContext * session , struct Libp2pMessage * message ,
struct Peerstore * peerstore , struct ProviderStore * providerstore , unsigned char * * result_buffer , size_t * result_buffer_size ) {
//TODO: implement this
return 0 ;
}
/**
* Find a node
* @ param session the session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
* @ param result_buffer the results
* @ param result_buffer_size the size of the results
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handle_find_node ( struct SessionContext * session , struct Libp2pMessage * message ,
struct Peerstore * peerstore , struct ProviderStore * providerstore , unsigned char * * result_buffer , size_t * result_buffer_size ) {
// look through peer store
2017-07-17 21:14:20 +00:00
struct Libp2pPeer * peer = libp2p_peerstore_get_peer ( peerstore , ( unsigned char * ) message - > key , message - > key_size ) ;
2017-03-30 18:58:53 +00:00
if ( peer ! = NULL ) {
2017-04-13 14:30:28 +00:00
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
2017-05-11 18:53:23 +00:00
message - > provider_peer_head - > item = libp2p_peer_copy ( peer ) ;
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
return 0 ;
2017-03-30 18:58:53 +00:00
}
2017-04-06 14:55:01 +00:00
return 1 ;
2017-03-30 18:58:53 +00:00
}
return 0 ;
}
/***
* Handle the incoming message . Handshake should have already
* been done . We should expect that the next read contains
* a protobuf ' d kademlia message .
* @ param session the context
* @ param peerstore a list of peers
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_handle_message ( struct SessionContext * session , struct Peerstore * peerstore , struct ProviderStore * providerstore ) {
unsigned char * buffer = NULL , * result_buffer = NULL ;
size_t buffer_size = 0 , result_buffer_size = 0 ;
int retVal = 0 ;
struct Libp2pMessage * message = NULL ;
// read from stream
2017-04-17 19:03:27 +00:00
if ( ! session - > default_stream - > read ( session , & buffer , & buffer_size , 5 ) )
2017-03-30 18:58:53 +00:00
goto exit ;
// unprotobuf
if ( ! libp2p_message_protobuf_decode ( buffer , buffer_size , & message ) )
goto exit ;
2017-04-17 04:46:52 +00:00
2017-03-30 18:58:53 +00:00
// handle message
switch ( message - > message_type ) {
case ( MESSAGE_TYPE_PUT_VALUE ) : // store a value in local storage
libp2p_routing_dht_handle_put_value ( session , message , peerstore , providerstore , & result_buffer , & result_buffer_size ) ;
break ;
case ( MESSAGE_TYPE_GET_VALUE ) : // get a value from local storage
libp2p_routing_dht_handle_get_value ( session , message , peerstore , providerstore , & result_buffer , & result_buffer_size ) ;
break ;
case ( MESSAGE_TYPE_ADD_PROVIDER ) : // client wants us to know he can provide something
libp2p_routing_dht_handle_add_provider ( session , message , peerstore , providerstore , & result_buffer , & result_buffer_size ) ;
break ;
case ( MESSAGE_TYPE_GET_PROVIDERS ) : // see if we can help, and send closer peers
libp2p_routing_dht_handle_get_providers ( session , message , peerstore , providerstore , & result_buffer , & result_buffer_size ) ;
break ;
case ( MESSAGE_TYPE_FIND_NODE ) : // find peers
libp2p_routing_dht_handle_find_node ( session , message , peerstore , providerstore , & result_buffer , & result_buffer_size ) ;
break ;
case ( MESSAGE_TYPE_PING ) :
libp2p_routing_dht_handle_ping ( message , & result_buffer , & result_buffer_size ) ;
break ;
}
// if we have something to send, send it.
2017-04-03 22:36:51 +00:00
if ( result_buffer ! = NULL ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " Sending message back to caller. Message type: %d \n " , message - > message_type ) ;
2017-04-03 22:36:51 +00:00
if ( ! session - > default_stream - > write ( session , result_buffer , result_buffer_size ) )
goto exit ;
2017-04-03 22:41:13 +00:00
} else {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d \n " , message - > message_type ) ;
2017-04-03 22:36:51 +00:00
}
2017-03-30 18:58:53 +00:00
retVal = 1 ;
exit :
if ( buffer ! = NULL )
free ( buffer ) ;
if ( result_buffer ! = NULL )
free ( result_buffer ) ;
2017-04-17 16:57:37 +00:00
if ( message ! = NULL )
libp2p_message_free ( message ) ;
2017-03-30 18:58:53 +00:00
return retVal ;
}