diff --git a/include/libp2p/conn/session.h b/include/libp2p/conn/session.h index ca0068f..6bda610 100644 --- a/include/libp2p/conn/session.h +++ b/include/libp2p/conn/session.h @@ -1,6 +1,7 @@ #pragma once #include "libp2p/crypto/key.h" +#include "libp2p/db/datastore.h" /*** * Holds the details of communication between two hosts */ @@ -16,6 +17,7 @@ struct SessionContext { struct Stream* insecure_stream; struct Stream* secure_stream; struct Stream* default_stream; + struct Datastore* datastore; // filled in during negotiations char* chosen_curve; char* chosen_cipher; diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index 5225b89..3c38a9e 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -11,6 +11,25 @@ * This is where kademlia and dht talk to the outside world */ + +/*** + * Helper method to protobuf a message + * @param message the message + * @param buffer where to put the results + * @param buffer_size the size of the results + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_protobuf_message(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) { + *buffer_size = libp2p_message_protobuf_encode_size(message); + *buffer = malloc(*buffer_size); + if (!libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size)) { + free(*buffer); + *buffer_size = 0; + return 0; + } + return 1; +} + /** * Take existing stream and upgrade to the Kademlia / DHT protocol/codec * @param context the context @@ -57,9 +76,7 @@ int libp2p_routing_dht_handshake(struct SessionContext* context) { */ int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) { // just turn message back into a protobuf and send it back... - *buffer_size = libp2p_message_protobuf_encode_size(message); - *buffer = (unsigned char*)malloc(*buffer_size); - return libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size); + return libp2p_routing_dht_protobuf_message(message, buffer, buffer_size); } /** @@ -95,12 +112,7 @@ int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, stru */ if (message->provider_peer_head != NULL) { // protobuf it and send it back - *results_size = libp2p_message_protobuf_encode_size(message); - *results = (unsigned char*)malloc(*results_size); - if (!libp2p_message_protobuf_encode(message, *results, *results_size, results_size)) { - free(*results); - *results = NULL; - *results_size = 0; + if (!libp2p_routing_dht_protobuf_message(message, results, results_size)) { return 0; } } @@ -184,13 +196,9 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc goto exit; } - *result_buffer_size = libp2p_message_protobuf_encode_size(message); - *result_buffer = (unsigned char*)malloc(*result_buffer_size); - if (*result_buffer == NULL) + if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) { goto exit; - if (!libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size)) - goto exit; - libp2p_logger_debug("dht_protocol", "add_provider protobuf'd the message. Returning results.\n"); + } retVal = 1; exit: @@ -220,7 +228,30 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct Libp2pMessage* message, struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { //TODO: implement this - return 0; + struct Datastore* datastore = session->datastore; + size_t data_size = 65535; + char* data = malloc(data_size); + if (!datastore->datastore_get(message->key, message->key_size, data, data_size, &data_size, datastore)) { + free(data); + return 0; + } + struct Libp2pRecord *record = libp2p_record_new(); + record->key_size = message->key_size; + record->key = malloc(record->key_size); + memcpy(record->key, message->key, record->key_size); + record->value_size = data_size; + record->value = malloc(record->value_size); + memcpy(record->value, data, record->value_size); + message->record = record; + free(data); + + if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) { + libp2p_record_free(record); + message->record = NULL; + return 0; + } + + return 1; } /** @@ -255,14 +286,10 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct L struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, message->key, message->key_size); if (peer != NULL) { message->closer_peer_head = peer->addr_head; - *result_buffer_size = libp2p_message_protobuf_encode_size(message); - *result_buffer = (unsigned char*)malloc(*result_buffer_size); - if (libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size)) { - return 1; + if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) { + return 0; } - free(*result_buffer); - *result_buffer = NULL; - *result_buffer_size = 0; + return 1; } return 0; }