Added get_value to dht
This commit is contained in:
parent
1fa1e92016
commit
e237b239a1
2 changed files with 52 additions and 23 deletions
|
@ -1,6 +1,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "libp2p/crypto/key.h"
|
||||
#include "libp2p/db/datastore.h"
|
||||
/***
|
||||
* Holds the details of communication between two hosts
|
||||
*/
|
||||
|
@ -16,6 +17,7 @@ struct SessionContext {
|
|||
struct Stream* insecure_stream;
|
||||
struct Stream* secure_stream;
|
||||
struct Stream* default_stream;
|
||||
struct Datastore* datastore;
|
||||
// filled in during negotiations
|
||||
char* chosen_curve;
|
||||
char* chosen_cipher;
|
||||
|
|
|
@ -11,6 +11,25 @@
|
|||
* This is where kademlia and dht talk to the outside world
|
||||
*/
|
||||
|
||||
|
||||
/***
|
||||
* Helper method to protobuf a message
|
||||
* @param message the message
|
||||
* @param buffer where to put the results
|
||||
* @param buffer_size the size of the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int libp2p_routing_dht_protobuf_message(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) {
|
||||
*buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*buffer = malloc(*buffer_size);
|
||||
if (!libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size)) {
|
||||
free(*buffer);
|
||||
*buffer_size = 0;
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Take existing stream and upgrade to the Kademlia / DHT protocol/codec
|
||||
* @param context the context
|
||||
|
@ -57,9 +76,7 @@ int libp2p_routing_dht_handshake(struct SessionContext* context) {
|
|||
*/
|
||||
int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) {
|
||||
// just turn message back into a protobuf and send it back...
|
||||
*buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*buffer = (unsigned char*)malloc(*buffer_size);
|
||||
return libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size);
|
||||
return libp2p_routing_dht_protobuf_message(message, buffer, buffer_size);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,12 +112,7 @@ int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, stru
|
|||
*/
|
||||
if (message->provider_peer_head != NULL) {
|
||||
// protobuf it and send it back
|
||||
*results_size = libp2p_message_protobuf_encode_size(message);
|
||||
*results = (unsigned char*)malloc(*results_size);
|
||||
if (!libp2p_message_protobuf_encode(message, *results, *results_size, results_size)) {
|
||||
free(*results);
|
||||
*results = NULL;
|
||||
*results_size = 0;
|
||||
if (!libp2p_routing_dht_protobuf_message(message, results, results_size)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -184,13 +196,9 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
|
|||
goto exit;
|
||||
}
|
||||
|
||||
*result_buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*result_buffer = (unsigned char*)malloc(*result_buffer_size);
|
||||
if (*result_buffer == NULL)
|
||||
if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) {
|
||||
goto exit;
|
||||
if (!libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size))
|
||||
goto exit;
|
||||
libp2p_logger_debug("dht_protocol", "add_provider protobuf'd the message. Returning results.\n");
|
||||
}
|
||||
|
||||
retVal = 1;
|
||||
exit:
|
||||
|
@ -220,7 +228,30 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
|
|||
int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct Libp2pMessage* message,
|
||||
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
|
||||
//TODO: implement this
|
||||
return 0;
|
||||
struct Datastore* datastore = session->datastore;
|
||||
size_t data_size = 65535;
|
||||
char* data = malloc(data_size);
|
||||
if (!datastore->datastore_get(message->key, message->key_size, data, data_size, &data_size, datastore)) {
|
||||
free(data);
|
||||
return 0;
|
||||
}
|
||||
struct Libp2pRecord *record = libp2p_record_new();
|
||||
record->key_size = message->key_size;
|
||||
record->key = malloc(record->key_size);
|
||||
memcpy(record->key, message->key, record->key_size);
|
||||
record->value_size = data_size;
|
||||
record->value = malloc(record->value_size);
|
||||
memcpy(record->value, data, record->value_size);
|
||||
message->record = record;
|
||||
free(data);
|
||||
|
||||
if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) {
|
||||
libp2p_record_free(record);
|
||||
message->record = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -255,14 +286,10 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct L
|
|||
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, message->key, message->key_size);
|
||||
if (peer != NULL) {
|
||||
message->closer_peer_head = peer->addr_head;
|
||||
*result_buffer_size = libp2p_message_protobuf_encode_size(message);
|
||||
*result_buffer = (unsigned char*)malloc(*result_buffer_size);
|
||||
if (libp2p_message_protobuf_encode(message, *result_buffer, *result_buffer_size, result_buffer_size)) {
|
||||
return 1;
|
||||
if (!libp2p_routing_dht_protobuf_message(message, result_buffer, result_buffer_size)) {
|
||||
return 0;
|
||||
}
|
||||
free(*result_buffer);
|
||||
*result_buffer = NULL;
|
||||
*result_buffer_size = 0;
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue