2017-03-30 18:58:53 +00:00
# include <stdlib.h>
# include <string.h>
2017-08-31 11:41:06 +00:00
# include "libp2p/crypto/encoding/base58.h"
2017-03-30 18:58:53 +00:00
# include "libp2p/net/stream.h"
2017-09-07 23:46:03 +00:00
# include "libp2p/os/utils.h"
2017-03-30 18:58:53 +00:00
# include "libp2p/routing/dht_protocol.h"
# include "libp2p/record/message.h"
2017-07-31 17:49:41 +00:00
# include "libp2p/utils/linked_list.h"
2017-04-03 23:13:42 +00:00
# include "libp2p/utils/logger.h"
2017-04-20 22:55:18 +00:00
# include "libp2p/conn/session.h"
2017-03-30 18:58:53 +00:00
/***
* This is where kademlia and dht talk to the outside world
*/
2017-10-23 20:21:50 +00:00
int libp2p_routing_dht_can_handle ( const struct StreamMessage * msg ) {
2017-11-27 14:06:33 +00:00
if ( msg = = NULL | | msg - > data_size = = 0 | | msg - > data = = NULL )
return 0 ;
2017-10-23 20:21:50 +00:00
if ( msg - > data_size < 8 )
2017-08-09 13:03:40 +00:00
return 0 ;
2017-10-23 20:21:50 +00:00
char * result = strnstr ( ( char * ) msg - > data , " /ipfs/kad " , msg - > data_size ) ;
if ( result ! = NULL & & result = = ( char * ) msg - > data )
2017-08-09 17:08:57 +00:00
return 1 ;
return 0 ;
2017-08-09 13:03:40 +00:00
}
int libp2p_routing_dht_shutdown ( void * context ) {
free ( context ) ;
return 1 ;
}
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_msg ( const struct StreamMessage * msg , struct Stream * stream , void * protocol_context ) {
struct DhtContext * ctx = ( struct DhtContext * ) protocol_context ;
if ( ! libp2p_routing_dht_handshake ( stream ) )
2017-08-09 17:08:57 +00:00
return - 1 ;
2017-11-08 15:51:43 +00:00
return ( libp2p_routing_dht_handle_message ( stream , ctx ) = = 0 ) ? - 1 : 1 ;
2017-08-09 13:03:40 +00:00
}
2017-11-08 15:51:43 +00:00
struct Libp2pProtocolHandler * libp2p_routing_dht_build_protocol_handler ( struct Peerstore * peer_store , struct ProviderStore * provider_store ,
struct Datastore * datastore , struct Filestore * filestore ) {
2017-08-09 13:03:40 +00:00
struct Libp2pProtocolHandler * handler = ( struct Libp2pProtocolHandler * ) malloc ( sizeof ( struct Libp2pProtocolHandler ) ) ;
if ( handler ! = NULL ) {
struct DhtContext * ctx = ( struct DhtContext * ) malloc ( sizeof ( struct DhtContext ) ) ;
ctx - > peer_store = peer_store ;
ctx - > provider_store = provider_store ;
2017-11-08 15:51:43 +00:00
ctx - > datastore = datastore ;
ctx - > filestore = filestore ;
2017-08-09 13:03:40 +00:00
handler - > context = ctx ;
handler - > CanHandle = libp2p_routing_dht_can_handle ;
handler - > HandleMessage = libp2p_routing_dht_handle_msg ;
handler - > Shutdown = libp2p_routing_dht_shutdown ;
}
return handler ;
}
2017-04-06 14:55:01 +00:00
/***
* Helper method to protobuf a message
* @ param message the message
* @ param buffer where to put the results
* @ param buffer_size the size of the results
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-08-30 16:09:28 +00:00
int libp2p_routing_dht_protobuf_message ( struct KademliaMessage * message , unsigned char * * buffer , size_t * buffer_size ) {
2017-04-06 14:55:01 +00:00
* buffer_size = libp2p_message_protobuf_encode_size ( message ) ;
* buffer = malloc ( * buffer_size ) ;
if ( ! libp2p_message_protobuf_encode ( message , * buffer , * buffer_size , buffer_size ) ) {
free ( * buffer ) ;
* buffer_size = 0 ;
return 0 ;
}
return 1 ;
}
2017-03-30 18:58:53 +00:00
/**
* Take existing stream and upgrade to the Kademlia / DHT protocol / codec
* @ param context the context
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
int libp2p_routing_dht_upgrade_stream ( struct SessionContext * context ) {
int retVal = 0 ;
char * protocol = " /ipfs/kad/1.0.0 \n " ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
outgoing . data = ( uint8_t * ) protocol ;
outgoing . data_size = strlen ( protocol ) ;
2017-10-23 14:01:03 +00:00
struct StreamMessage * results = NULL ;
2017-10-23 14:47:54 +00:00
if ( ! context - > default_stream - > write ( context , & outgoing ) ) {
2017-08-03 22:47:02 +00:00
libp2p_logger_error ( " dht_protocol " , " Unable to write to stream during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
2017-10-23 14:01:03 +00:00
if ( ! context - > default_stream - > read ( context , & results , 5 ) ) {
2017-08-03 22:47:02 +00:00
libp2p_logger_error ( " dht_protocol " , " Unable to read from stream during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
2017-10-23 14:01:03 +00:00
if ( results = = NULL | | results - > data_size ! = strlen ( protocol ) ) {
2017-08-03 22:47:02 +00:00
libp2p_logger_error ( " dht_protocol " , " Expected response size incorrect during upgrade attempt. \n " ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
2017-10-23 14:01:03 +00:00
if ( strncmp ( ( char * ) results - > data , protocol , results - > data_size ) ! = 0 ) {
2017-08-03 22:47:02 +00:00
libp2p_logger_error ( " dht_protocol " , " Expected %s but received %s. \n " , protocol , results ) ;
2017-03-30 18:58:53 +00:00
goto exit ;
2017-08-03 22:47:02 +00:00
}
2017-03-30 18:58:53 +00:00
retVal = 1 ;
exit :
2017-10-23 14:01:03 +00:00
libp2p_stream_message_free ( results ) ;
results = NULL ;
2017-03-30 18:58:53 +00:00
return retVal ;
}
/**
* Handle a client requesting an upgrade to the DHT protocol
2017-11-08 15:51:43 +00:00
* @ param stream the stream to the remote
2017-03-30 18:58:53 +00:00
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handshake ( struct Stream * stream ) {
2017-03-30 18:58:53 +00:00
char * protocol = " /ipfs/kad/1.0.0 \n " ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
outgoing . data = ( uint8_t * ) protocol ;
outgoing . data_size = strlen ( protocol ) ;
2017-11-08 15:51:43 +00:00
return stream - > write ( stream - > stream_context , & outgoing ) ;
2017-03-30 18:58:53 +00:00
}
/**
* A remote client has requested a ping
* @ param message the message
* @ param buffer where to put the results
* @ param buffer_size the length of the results
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-08-30 16:09:28 +00:00
int libp2p_routing_dht_handle_ping ( struct KademliaMessage * message , unsigned char * * buffer , size_t * buffer_size ) {
2017-03-30 18:58:53 +00:00
// just turn message back into a protobuf and send it back...
2017-04-06 14:55:01 +00:00
return libp2p_routing_dht_protobuf_message ( message , buffer , buffer_size ) ;
2017-03-30 18:58:53 +00:00
}
/**
* See if we have information as to who can provide this item
2017-11-08 15:51:43 +00:00
* @ param stream the incoming stream
2017-03-30 18:58:53 +00:00
* @ param message the message from the caller , contains a key
2017-11-08 15:51:43 +00:00
* @ param protocol_context the context
* @ param results where to put the results
* @ param results_size the size of the results
2017-03-30 18:58:53 +00:00
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_get_providers ( struct Stream * stream , struct KademliaMessage * message , struct DhtContext * protocol_context ,
unsigned char * * results , size_t * results_size ) {
2017-03-30 18:58:53 +00:00
unsigned char * peer_id = NULL ;
int peer_id_size = 0 ;
// This shouldn't be needed, but just in case:
message - > provider_peer_head = NULL ;
2017-07-31 17:49:41 +00:00
// Can I provide it locally?
2017-08-28 15:54:56 +00:00
struct DatastoreRecord * datastore_record = NULL ;
2017-11-08 15:51:43 +00:00
if ( protocol_context - > datastore - > datastore_get ( ( unsigned char * ) message - > key , message - > key_size , & datastore_record , protocol_context - > datastore ) ) {
2017-07-31 17:49:41 +00:00
// we can provide this hash from our datastore
2017-08-28 15:54:56 +00:00
libp2p_datastore_record_free ( datastore_record ) ;
2017-08-16 11:41:01 +00:00
libp2p_logger_debug ( " dht_protocol " , " I can provide myself as a provider for this key. \n " ) ;
2017-07-31 17:49:41 +00:00
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
2017-11-08 15:51:43 +00:00
message - > provider_peer_head - > item = libp2p_peer_copy ( libp2p_peerstore_get_local_peer ( protocol_context - > peer_store ) ) ;
} else if ( libp2p_providerstore_get ( protocol_context - > provider_store , ( unsigned char * ) message - > key , message - > key_size , & peer_id , & peer_id_size ) ) {
2017-08-16 11:41:01 +00:00
// Can I provide it because someone announced it earlier?
2017-03-30 18:58:53 +00:00
// we have a peer id, convert it to a peer object
2017-11-08 15:51:43 +00:00
struct Libp2pPeer * peer = libp2p_peerstore_get_peer ( protocol_context - > peer_store , peer_id , peer_id_size ) ;
2017-04-17 04:46:52 +00:00
if ( peer ! = NULL ) {
2017-09-28 18:21:07 +00:00
libp2p_logger_debug ( " dht_protocol " , " I can provide a provider for this key, because %s says he has it. \n " , libp2p_peer_id_to_string ( peer ) ) ;
2017-07-31 17:49:41 +00:00
// add it to the message
if ( message - > provider_peer_head = = NULL ) {
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
2017-08-16 11:41:01 +00:00
message - > provider_peer_head - > item = libp2p_peer_copy ( peer ) ;
2017-07-31 17:49:41 +00:00
} else {
2017-09-28 18:21:07 +00:00
2017-07-31 17:49:41 +00:00
struct Libp2pLinkedList * current = message - > provider_peer_head ;
// find the last one in the list
while ( current - > next ! = NULL ) {
current = current - > next ;
}
// add to the list
current - > next = libp2p_utils_linked_list_new ( ) ;
current - > next - > item = peer ;
}
2017-04-17 04:46:52 +00:00
}
2017-04-20 22:55:18 +00:00
} else {
2017-08-31 11:41:06 +00:00
size_t b58_size = 100 ;
uint8_t * b58key = ( uint8_t * ) malloc ( b58_size ) ;
if ( ! libp2p_crypto_encoding_base58_encode ( ( unsigned char * ) message - > key , message - > key_size , ( unsigned char * * ) & b58key , & b58_size ) ) {
libp2p_logger_debug ( " dht_protocol " , " I cannot provide a provider for this key. \n " ) ;
} else {
libp2p_logger_debug ( " dht_protocol " , " I cannot provide a provider for the key %s. \n " , b58key ) ;
}
free ( b58key ) ;
2017-03-30 18:58:53 +00:00
}
2017-07-31 17:49:41 +00:00
if ( peer_id ! = NULL )
free ( peer_id ) ;
2017-03-30 18:58:53 +00:00
// TODO: find closer peers
/*
if ( message - > provider_peer_head = = NULL ) {
// Who else can provide it?
//while ()
}
*/
if ( message - > provider_peer_head ! = NULL ) {
2017-09-28 18:21:07 +00:00
libp2p_logger_debug ( " dht_protocol " , " GetProviders: We have a peer. Sending it back. " ) ;
2017-03-30 18:58:53 +00:00
// protobuf it and send it back
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , results , results_size ) ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_error ( " dht_protocol " , " GetProviders: Error protobufing results \n " ) ;
2017-03-30 18:58:53 +00:00
return 0 ;
}
}
return 1 ;
}
2017-04-04 01:54:41 +00:00
/***
* helper method to get ip multiaddress from peer ' s linked list
* @ param head linked list of multiaddresses
* @ returns the IP multiaddress in the list , or NULL if none found
*/
struct MultiAddress * libp2p_routing_dht_find_peer_ip_multiaddress ( struct Libp2pLinkedList * head ) {
struct MultiAddress * out = NULL ;
struct Libp2pLinkedList * current = head ;
while ( current ! = NULL ) {
out = ( struct MultiAddress * ) current - > item ;
2017-04-04 02:17:29 +00:00
if ( multiaddress_is_ip ( out ) ) {
2017-09-28 12:58:24 +00:00
//libp2p_logger_debug("dht_protocol", "Found MultiAddress %s\n", out->string);
2017-04-04 01:54:41 +00:00
break ;
2017-04-04 02:17:29 +00:00
}
2017-04-04 01:54:41 +00:00
current = current - > next ;
}
if ( current = = NULL )
out = NULL ;
return out ;
}
2017-03-30 18:58:53 +00:00
/***
* Remote peer has announced that he can provide a key
2017-11-08 15:51:43 +00:00
* @ param stream the incoming stream
2017-03-30 18:58:53 +00:00
* @ param message the message
2017-11-08 15:51:43 +00:00
* @ param protocol_context the context
2017-03-30 18:58:53 +00:00
* @ param result_buffer where to put the result
* @ param result_buffer_size the size of the result buffer
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_add_provider ( struct Stream * stream , struct KademliaMessage * message ,
struct DhtContext * protocol_context , unsigned char * * result_buffer , size_t * result_buffer_size ) {
2017-04-03 22:26:08 +00:00
int retVal = 0 ;
struct Libp2pPeer * peer = NULL ;
2017-04-03 23:03:49 +00:00
2017-03-30 18:58:53 +00:00
//TODO: verify peer signature
2017-09-28 12:58:24 +00:00
if ( libp2p_logger_watching_class ( " dht_protocol " ) ) {
struct Libp2pPeer * first_peer = message - > provider_peer_head - > item ;
libp2p_logger_debug ( " dht_protocol " , " Peer %s says he can provide a key. \n " , libp2p_peer_id_to_string ( first_peer ) ) ;
}
2017-04-20 22:55:18 +00:00
2017-04-03 23:13:42 +00:00
struct Libp2pLinkedList * current = message - > provider_peer_head ;
2017-04-04 02:03:16 +00:00
if ( current = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Provider has no peer. \n " ) ;
goto exit ;
}
2017-09-28 12:58:24 +00:00
2017-04-04 01:54:41 +00:00
// there should only be 1 when adding a provider
if ( current ! = NULL ) {
2017-04-17 04:46:52 +00:00
peer = current - > item ;
if ( peer = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Message add_provider has no peer \n " ) ;
goto exit ;
}
2017-04-04 01:54:41 +00:00
struct MultiAddress * peer_ma = libp2p_routing_dht_find_peer_ip_multiaddress ( peer - > addr_head ) ;
2017-04-04 02:03:16 +00:00
if ( peer_ma = = NULL ) {
libp2p_logger_error ( " dht_protocol " , " Peer has no IP MultiAddress. \n " ) ;
goto exit ;
}
2017-09-28 12:58:24 +00:00
2017-04-04 01:54:41 +00:00
// add what we know to be the ip for this peer
2017-09-28 18:21:07 +00:00
/*
2017-04-04 01:54:41 +00:00
char * ip ;
char new_string [ 255 ] ;
multiaddress_get_ip_address ( session - > default_stream - > address , & ip ) ;
int port = multiaddress_get_ip_port ( peer_ma ) ;
2017-04-17 16:57:37 +00:00
char * peer_id = multiaddress_get_peer_id ( peer_ma ) ;
sprintf ( new_string , " /ip4/%s/tcp/%d/ipfs/%s " , ip , port , peer_id ) ;
free ( ip ) ;
free ( peer_id ) ;
2017-04-04 01:54:41 +00:00
struct MultiAddress * new_ma = multiaddress_new_from_string ( new_string ) ;
2017-04-17 04:46:52 +00:00
if ( new_ma = = NULL )
goto exit ;
2017-09-28 12:58:24 +00:00
2017-04-06 14:32:54 +00:00
// TODO: See if the sender is who he says he is
2017-04-04 01:54:41 +00:00
// set it as the first in the list
struct Libp2pLinkedList * new_head = libp2p_utils_linked_list_new ( ) ;
new_head - > item = new_ma ;
new_head - > next = peer - > addr_head ;
peer - > addr_head = new_head ;
2017-09-28 18:21:07 +00:00
*/
2017-04-04 01:54:41 +00:00
// now add the peer to the peerstore
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " About to add peer %s to peerstore \n " , peer_ma - > string ) ;
2017-11-08 15:51:43 +00:00
if ( ! libp2p_peerstore_add_peer ( protocol_context - > peer_store , peer ) )
2017-04-03 22:26:08 +00:00
goto exit ;
2017-04-04 03:07:22 +00:00
libp2p_logger_debug ( " dht_protocol " , " About to add key to providerstore \n " ) ;
2017-11-08 15:51:43 +00:00
if ( ! libp2p_providerstore_add ( protocol_context - > provider_store , ( unsigned char * ) message - > key , message - > key_size , ( unsigned char * ) peer - > id , peer - > id_size ) )
2017-04-03 22:26:08 +00:00
goto exit ;
2017-03-30 18:58:53 +00:00
}
2017-04-03 23:13:42 +00:00
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
2017-04-03 23:13:42 +00:00
goto exit ;
2017-04-06 14:55:01 +00:00
}
2017-04-03 23:13:42 +00:00
2017-04-03 22:26:08 +00:00
retVal = 1 ;
exit :
2017-04-03 23:03:49 +00:00
if ( retVal ! = 1 ) {
if ( * result_buffer ! = NULL ) {
free ( * result_buffer ) ;
* result_buffer_size = 0 ;
* result_buffer = NULL ;
}
libp2p_logger_error ( " dht_protocol " , " add_provider returning false \n " ) ;
}
2017-04-17 19:03:27 +00:00
/*
2017-04-03 22:26:08 +00:00
if ( peer ! = NULL )
libp2p_peer_free ( peer ) ;
2017-04-17 19:03:27 +00:00
*/
2017-04-03 22:26:08 +00:00
return retVal ;
2017-03-30 18:58:53 +00:00
}
/**
* Retrieve something from the dht datastore
* @ param session the session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
2017-09-14 16:35:16 +00:00
* @ param result_buffer the results , as a protobuf ' d KademliaMessage
2017-03-30 18:58:53 +00:00
* @ param result_buffer_size the size of the results
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_get_value ( struct Stream * stream , struct KademliaMessage * message , struct DhtContext * dht_context ,
unsigned char * * result_buffer , size_t * result_buffer_size ) {
2017-04-20 22:55:18 +00:00
2017-11-08 15:51:43 +00:00
struct Datastore * datastore = dht_context - > datastore ;
struct Filestore * filestore = dht_context - > filestore ;
2017-04-20 22:55:18 +00:00
size_t data_size = 0 ;
unsigned char * data = NULL ;
// We need to get the data from the disk
2017-07-17 21:14:20 +00:00
if ( ! filestore - > node_get ( ( unsigned char * ) message - > key , message - > key_size , ( void * * ) & data , & data_size , filestore ) ) {
2017-09-28 12:58:24 +00:00
size_t sz = 100 ;
unsigned char key [ sz ] ;
memset ( key , 0 , sz ) ;
unsigned char * ptr = & key [ 0 ] ;
libp2p_crypto_encoding_base58_encode ( ( unsigned char * ) message - > key , message - > key_size , & ptr , & sz ) ;
libp2p_logger_debug ( " dht_protocol " , " handle_get_value: Unable to get key %s from filestore. \n " , key ) ;
2017-09-25 22:55:45 +00:00
return 0 ;
2017-04-06 14:55:01 +00:00
}
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " handle_get_value: value retrieved from the datastore \n " ) ;
2017-04-06 14:55:01 +00:00
struct Libp2pRecord * record = libp2p_record_new ( ) ;
record - > key_size = message - > key_size ;
record - > key = malloc ( record - > key_size ) ;
memcpy ( record - > key , message - > key , record - > key_size ) ;
record - > value_size = data_size ;
record - > value = malloc ( record - > value_size ) ;
memcpy ( record - > value , data , record - > value_size ) ;
message - > record = record ;
free ( data ) ;
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
libp2p_record_free ( record ) ;
message - > record = NULL ;
return 0 ;
}
return 1 ;
2017-03-30 18:58:53 +00:00
}
/**
* Put something in the dht datastore
2017-11-08 15:51:43 +00:00
* @ param stream the incoming stream
2017-03-30 18:58:53 +00:00
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
2017-11-08 15:51:43 +00:00
* @ param datastore the datastore
2017-03-30 18:58:53 +00:00
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_put_value ( struct Stream * stream , struct K ademliaMessage * message ,
struct DhtContext * protocol_context ) {
2017-09-14 16:35:16 +00:00
if ( message - > record = = NULL )
return 0 ;
struct DatastoreRecord * record = libp2p_datastore_record_new ( ) ;
if ( record = = NULL )
return 0 ;
// set the key from the message->record->key
record - > key_size = message - > record - > key_size ;
record - > key = ( uint8_t * ) malloc ( record - > key_size ) ;
if ( record - > key = = NULL ) {
libp2p_datastore_record_free ( record ) ;
return 0 ;
}
memcpy ( record - > key , message - > record - > key , record - > key_size ) ;
// set the value from the message->record->value
record - > value_size = message - > record - > value_size ;
record - > value = ( uint8_t * ) malloc ( record - > value_size ) ;
if ( record - > value = = NULL ) {
libp2p_datastore_record_free ( record ) ;
return 0 ;
}
memcpy ( record - > value , message - > record - > value , record - > value_size ) ;
2017-11-08 15:51:43 +00:00
int retVal = protocol_context - > datastore - > datastore_put ( record , protocol_context - > datastore ) ;
2017-09-14 16:35:16 +00:00
libp2p_datastore_record_free ( record ) ;
return retVal ;
2017-03-30 18:58:53 +00:00
}
/**
* Find a node
* @ param session the session context
* @ param message the message
* @ param peerstore the peerstore
* @ param providerstore the providerstore
* @ param result_buffer the results
* @ param result_buffer_size the size of the results
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_find_node ( struct Stream * stream , struct KademliaMessage * message ,
struct DhtContext * protocol_context , unsigned char * * result_buffer , size_t * result_buffer_size ) {
2017-03-30 18:58:53 +00:00
// look through peer store
2017-11-08 15:51:43 +00:00
struct Libp2pPeer * peer = libp2p_peerstore_get_peer ( protocol_context - > peer_store , ( unsigned char * ) message - > key , message - > key_size ) ;
2017-03-30 18:58:53 +00:00
if ( peer ! = NULL ) {
2017-04-13 14:30:28 +00:00
message - > provider_peer_head = libp2p_utils_linked_list_new ( ) ;
2017-05-11 18:53:23 +00:00
message - > provider_peer_head - > item = libp2p_peer_copy ( peer ) ;
2017-04-06 14:55:01 +00:00
if ( ! libp2p_routing_dht_protobuf_message ( message , result_buffer , result_buffer_size ) ) {
return 0 ;
2017-03-30 18:58:53 +00:00
}
2017-04-06 14:55:01 +00:00
return 1 ;
2017-03-30 18:58:53 +00:00
}
return 0 ;
}
/***
* Handle the incoming message . Handshake should have already
* been done . We should expect that the next read contains
* a protobuf ' d kademlia message .
* @ param session the context
* @ param peerstore a list of peers
* @ returns true ( 1 ) on success , otherwise false ( 0 )
*/
2017-11-08 15:51:43 +00:00
int libp2p_routing_dht_handle_message ( struct Stream * stream , struct DhtContext * protocol_context ) {
2017-10-23 14:01:03 +00:00
unsigned char * result_buffer = NULL ;
struct StreamMessage * buffer = NULL ;
size_t result_buffer_size = 0 ;
2017-03-30 18:58:53 +00:00
int retVal = 0 ;
2017-08-30 16:09:28 +00:00
struct KademliaMessage * message = NULL ;
2017-03-30 18:58:53 +00:00
// read from stream
2017-11-08 15:51:43 +00:00
if ( ! stream - > read ( stream - > stream_context , & buffer , 5 ) )
2017-03-30 18:58:53 +00:00
goto exit ;
// unprotobuf
2017-10-23 14:01:03 +00:00
if ( ! libp2p_message_protobuf_decode ( buffer - > data , buffer - > data_size , & message ) )
2017-03-30 18:58:53 +00:00
goto exit ;
2017-04-17 04:46:52 +00:00
2017-03-30 18:58:53 +00:00
// handle message
switch ( message - > message_type ) {
case ( MESSAGE_TYPE_PUT_VALUE ) : // store a value in local storage
2017-11-08 15:51:43 +00:00
libp2p_routing_dht_handle_put_value ( stream , message , protocol_context ) ;
2017-03-30 18:58:53 +00:00
break ;
case ( MESSAGE_TYPE_GET_VALUE ) : // get a value from local storage
2017-11-08 15:51:43 +00:00
libp2p_routing_dht_handle_get_value ( stream , message , protocol_context , & result_buffer , & result_buffer_size ) ;
2017-03-30 18:58:53 +00:00
break ;
case ( MESSAGE_TYPE_ADD_PROVIDER ) : // client wants us to know he can provide something
2017-11-08 15:51:43 +00:00
libp2p_routing_dht_handle_add_provider ( stream , message , protocol_context , & result_buffer , & result_buffer_size ) ;
2017-03-30 18:58:53 +00:00
break ;
case ( MESSAGE_TYPE_GET_PROVIDERS ) : // see if we can help, and send closer peers
2017-11-08 15:51:43 +00:00
libp2p_routing_dht_handle_get_providers ( stream , message , protocol_context , & result_buffer , & result_buffer_size ) ;
2017-03-30 18:58:53 +00:00
break ;
case ( MESSAGE_TYPE_FIND_NODE ) : // find peers
2017-11-08 15:51:43 +00:00
libp2p_routing_dht_handle_find_node ( stream , message , protocol_context , & result_buffer , & result_buffer_size ) ;
2017-03-30 18:58:53 +00:00
break ;
case ( MESSAGE_TYPE_PING ) :
libp2p_routing_dht_handle_ping ( message , & result_buffer , & result_buffer_size ) ;
break ;
}
// if we have something to send, send it.
2017-04-03 22:36:51 +00:00
if ( result_buffer ! = NULL ) {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " Sending message back to caller. Message type: %d \n " , message - > message_type ) ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
outgoing . data = result_buffer ;
outgoing . data_size = result_buffer_size ;
2017-11-08 15:51:43 +00:00
if ( ! stream - > write ( stream - > stream_context , & outgoing ) )
2017-04-03 22:36:51 +00:00
goto exit ;
2017-04-03 22:41:13 +00:00
} else {
2017-04-20 22:55:18 +00:00
libp2p_logger_debug ( " dht_protocol " , " DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d \n " , message - > message_type ) ;
2017-04-03 22:36:51 +00:00
}
2017-03-30 18:58:53 +00:00
retVal = 1 ;
exit :
2017-10-23 14:01:03 +00:00
libp2p_stream_message_free ( buffer ) ;
2017-03-30 18:58:53 +00:00
if ( result_buffer ! = NULL )
free ( result_buffer ) ;
2017-04-17 16:57:37 +00:00
if ( message ! = NULL )
libp2p_message_free ( message ) ;
2017-03-30 18:58:53 +00:00
return retVal ;
}
2017-09-14 21:49:50 +00:00
2017-09-18 11:32:37 +00:00
/**
* Attempt to receive a kademlia message
* NOTE : This call assumes that a send_message was sent
* @ param sessionContext the context
* @ param result where to put the results
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_routing_dht_receive_message ( struct SessionContext * sessionContext , struct KademliaMessage * * result ) {
2017-10-23 14:01:03 +00:00
struct StreamMessage * results = NULL ;
2017-09-18 11:32:37 +00:00
2017-10-23 14:01:03 +00:00
if ( ! sessionContext - > default_stream - > read ( sessionContext , & results , 5 ) ) {
2017-09-18 11:32:37 +00:00
libp2p_logger_error ( " online " , " Attempted to read from Kademlia stream, but could not. \n " ) ;
goto exit ;
}
// see if we can unprotobuf
2017-10-23 14:01:03 +00:00
if ( ! libp2p_message_protobuf_decode ( results - > data , results - > data_size , result ) ) {
2017-09-18 11:32:37 +00:00
libp2p_logger_error ( " online " , " Received kademlia response, but cannot decode it. \n " ) ;
goto exit ;
}
exit :
2017-10-23 14:01:03 +00:00
libp2p_stream_message_free ( results ) ;
2017-09-18 11:32:37 +00:00
return result ! = NULL ;
}
/***
* Send a kademlia message
* NOTE : this call upgrades the stream to / ipfs / kad / 1.0 .0
* @ param context the context
* @ param message the message
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_routing_dht_send_message ( struct SessionContext * sessionContext , struct KademliaMessage * message ) {
size_t protobuf_size = 0 , retVal = 0 ;
unsigned char * protobuf = NULL ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
2017-09-18 11:32:37 +00:00
protobuf_size = libp2p_message_protobuf_encode_size ( message ) ;
protobuf = ( unsigned char * ) malloc ( protobuf_size ) ;
libp2p_message_protobuf_encode ( message , & protobuf [ 0 ] , protobuf_size , & protobuf_size ) ;
// upgrade to kademlia protocol
if ( ! libp2p_routing_dht_upgrade_stream ( sessionContext ) ) {
libp2p_logger_error ( " dht_protocol " , " send_message: Unable to upgrade to kademlia stream. \n " ) ;
goto exit ;
}
// send the message
2017-10-23 14:47:54 +00:00
outgoing . data = protobuf ;
outgoing . data_size = protobuf_size ;
if ( ! sessionContext - > default_stream - > write ( sessionContext , & outgoing ) ) {
2017-09-18 11:32:37 +00:00
libp2p_logger_error ( " dht_protocol " , " send_message: Attempted to write to Kademlia stream, but could not. \n " ) ;
goto exit ;
}
retVal = 1 ;
exit :
if ( protobuf ! = NULL )
free ( protobuf ) ;
return retVal ;
}
2017-09-14 21:49:50 +00:00
/**
* Used to send a message to the nearest x peers
*
2017-09-18 11:32:37 +00:00
* @ param private_key the private key of the local peer
* @ param peerstore the collection of peers
* @ param datastore a connection to the datastore
2017-09-14 21:49:50 +00:00
* @ param msg the message to send
2017-09-18 11:32:37 +00:00
* @ returns true ( 1 ) if we sent to at least 1 , false ( 0 ) otherwise
2017-09-14 21:49:50 +00:00
*/
2017-10-23 20:21:50 +00:00
int libp2p_routing_dht_send_message_nearest_x ( const struct Dialer * dialer , struct Peerstore * peerstore ,
2017-09-18 11:32:37 +00:00
struct Datastore * datastore , struct KademliaMessage * msg , int numToSend ) {
// TODO: Calculate "Nearest"
// but for now, grab x peers, and send to them
int numSent = 0 ;
struct Libp2pLinkedList * llpeer_entry = peerstore - > head_entry ;
while ( llpeer_entry ! = NULL ) {
struct PeerEntry * entry = llpeer_entry - > item ;
if ( entry = = NULL )
break ;
struct Libp2pPeer * remote_peer = entry - > peer ;
2017-09-28 18:48:35 +00:00
if ( ! remote_peer - > is_local ) {
// connect (if not connected)
2017-10-23 20:21:50 +00:00
if ( libp2p_peer_connect ( dialer , remote_peer , peerstore , datastore , 5 ) ) {
2017-09-28 18:48:35 +00:00
// send message
if ( libp2p_routing_dht_send_message ( remote_peer - > sessionContext , msg ) )
numSent + + ;
}
if ( numSent > = numToSend )
break ;
2017-09-18 11:32:37 +00:00
}
// grab next entry
llpeer_entry = llpeer_entry - > next ;
}
return numSent > 0 ;
2017-09-14 21:49:50 +00:00
}