Kademlia search returns multiaddress.

This commit is contained in:
Jose Marcial Vieira Bisneto 2017-03-16 05:45:44 -03:00
parent dc1b8b6b3d
commit 17dfadb6fd
3 changed files with 182 additions and 56 deletions

View file

@ -6,9 +6,7 @@ void stop_kademlia (void);
void *kademlia_thread (void *ptr); void *kademlia_thread (void *ptr);
void *announce_thread (void *ptr); void *announce_thread (void *ptr);
int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout);
int announce_kademlia (char* peer_id, uint16_t port); int announce_kademlia (char* peer_id, uint16_t port);
int search_kademlia(char* peer_id, int timeout); struct MultiAddress** search_kademlia(char* peer_id, int timeout);
int ping_kademlia (char *ip, uint16_t port); int ping_kademlia (char *ip, uint16_t port);

View file

@ -1,6 +1,6 @@
DHT_DIR = dht DHT_DIR = dht
CC = gcc CC = gcc
CFLAGS = -O0 -I../include -I$(DHT_DIR) -g3 CFLAGS = -O0 -I../include -I../../c-multiaddr/include -I$(DHT_DIR) -g3
LFLAGS = LFLAGS =
DEPS = $(DHT_DIR)/dht.h DEPS = $(DHT_DIR)/dht.h
OBJS = kademlia.o dht.o OBJS = kademlia.o dht.o

View file

@ -17,6 +17,9 @@
#include <libp2p/crypto/sha256.h> #include <libp2p/crypto/sha256.h>
#include <libp2p/routing/kademlia.h> #include <libp2p/routing/kademlia.h>
#include <dht.h> #include <dht.h>
#include <multiaddr/multiaddr.h>
extern FILE *dht_debug;
#define MAX_BOOTSTRAP_NODES 20 #define MAX_BOOTSTRAP_NODES 20
static struct sockaddr_storage bootstrap_nodes[MAX_BOOTSTRAP_NODES]; static struct sockaddr_storage bootstrap_nodes[MAX_BOOTSTRAP_NODES];
@ -42,12 +45,34 @@ volatile int8_t closing = 0;
#define ANNOUNCE_WAIT_TIME (28 * 60) // Wait 28 minutes. #define ANNOUNCE_WAIT_TIME (28 * 60) // Wait 28 minutes.
#define ANNOUNCE_WAIT_TOLERANCE 60 #define ANNOUNCE_WAIT_TOLERANCE 60
struct announce_struct { struct announce_struct {
unsigned char hash[20]; unsigned char hash[sizeof hash];
uint16_t port; uint16_t port;
unsigned int time; unsigned int time;
struct announce_struct *next; struct announce_struct *next;
} *announce_list = NULL; } *announce_list = NULL;
#define DHT_MAX_IPV4 50
#define DHT_MAX_IPV6 10
struct ipv4_struct {
uint32_t ip;
uint16_t port;
};
struct ipv6_struct {
uint8_t ip[16];
uint16_t port;
};
struct search_struct {
unsigned char hash[sizeof hash];
uint8_t ipv4_count;
uint8_t ipv6_count;
struct ipv4_struct ipv4[DHT_MAX_IPV4];
struct ipv6_struct ipv6[DHT_MAX_IPV6];
struct search_struct *next;
} *search_result = NULL;
/* The call-back function is called by the DHT whenever something /* The call-back function is called by the DHT whenever something
interesting happens. Right now, it only happens when we get a new value or interesting happens. Right now, it only happens when we get a new value or
when a search completes, but this may be extended in future versions. */ when a search completes, but this may be extended in future versions. */
@ -57,14 +82,83 @@ callback(void *closure,
const unsigned char *info_hash, const unsigned char *info_hash,
const void *data, size_t data_len) const void *data, size_t data_len)
{ {
struct search_struct *sp, *rp = NULL; // struct pointer and result pointer
switch (event) { switch (event) {
case DHT_EVENT_VALUES: case DHT_EVENT_VALUES:
case DHT_EVENT_VALUES6: case DHT_EVENT_VALUES6:
printf("Received %d values.\n", (int)(data_len / 6)); if (dht_debug) {
break; fprintf(dht_debug, "Received %d values.\n", (int)(data_len / 6));
}
// Find the item in the list.
for (rp = search_result ; rp ; rp = rp->next) {
if (memcmp(rp->hash, info_hash, sizeof hash) == 0) { // Found.
int i;
if (event == DHT_EVENT_VALUES) { // IPv4
struct ipv4_struct ipv4;
if (rp->ipv4_count == DHT_MAX_IPV4) { // Full
return;
}
// Make sure the data is in struct format.
memcpy(&ipv4.ip, data, 4);
memcpy(&ipv4.port, data+4, 2);
ipv4.port = ntohs(ipv4.port);
for (i = 0 ; i < rp->ipv4_count ; i++) {
if (memcmp(&rp->ipv4[i], &ipv4, sizeof ipv4)) {
return; // Alread in the list.
}
}
// Not in the list, then add.
memcpy(&rp->ipv4[rp->ipv4_count], &ipv4, sizeof ipv4);
rp->ipv4_count++;
} else { // IPv6
struct ipv6_struct ipv6;
if (rp->ipv6_count == DHT_MAX_IPV6) { // Full
return;
}
// Make sure the data is in struct format.
memcpy(&ipv6.ip, data, 16);
memcpy(&ipv6.port, data+16, 2);
for (i = 0 ; i < rp->ipv6_count ; i++) {
if (memcmp(&rp->ipv6[i], &ipv6, sizeof ipv6)) {
return; // Alread in the list.
}
}
// Not in the list, then add.
memcpy(&rp->ipv6[rp->ipv6_count], &ipv6, sizeof ipv6);
rp->ipv6_count++;
}
return;
}
}
break; // Not found, how can EVENT_VALUE may occur before SEARCH_DONE?
case DHT_EVENT_SEARCH_DONE: case DHT_EVENT_SEARCH_DONE:
case DHT_EVENT_SEARCH_DONE6: case DHT_EVENT_SEARCH_DONE6:
printf("Search done.\n"); if (dht_debug) {
fprintf(dht_debug, "Search done.\n");
}
if (search_result) {
// Try to find the item in the list.
for (sp = search_result ; sp->next ; sp = sp->next) {
if (memcmp(sp->hash, info_hash, sizeof hash) == 0) { // Found.
rp = sp;
break;
}
}
if (!rp) {
rp = malloc(sizeof(struct search_struct));
if (!rp) return; // Abort, out of memory.
memset(rp, 0, sizeof(struct search_struct));
memcpy(rp->hash, info_hash, sizeof hash);
sp->next = rp; // Insert in the list.
}
} else {
rp = malloc(sizeof(struct search_struct));
if (!rp) return; // Abort, out of memory.
memset(rp, 0, sizeof(struct search_struct));
memcpy(rp->hash, info_hash, sizeof hash);
search_result = rp; // Insert first item in the list.
}
break; break;
default: default:
break; break;
@ -220,6 +314,32 @@ void *kademlia_thread (void *ptr)
} }
} }
int search_kademlia_internal (unsigned char* id, int port, int to)
{
int i;
while (searching != 0) {
i = random() % 100000;
if (i > to) {
return 0; // timeout waiting a chance
}
usleep(i);
to -= i;
}
searching = -1; // lock.
for (i = 0 ; i < sizeof hash ; i++) {
hash[i] = id[i];
}
announce_port = port;
searching = 1; // search.
return to;
}
void *announce_thread (void *ptr) void *announce_thread (void *ptr)
{ {
unsigned int wait; unsigned int wait;
@ -244,7 +364,7 @@ void *announce_thread (void *ptr)
sleep (wait); sleep (wait);
} else { } else {
if (p) { if (p) {
announce_once_kademlia (p->hash, p->port, ANNOUNCE_WAIT_TOLERANCE); search_kademlia_internal (p->hash, p->port, ANNOUNCE_WAIT_TOLERANCE * 1000000);
p->time = time(NULL); p->time = time(NULL);
} }
} }
@ -256,37 +376,6 @@ void *announce_thread (void *ptr)
} }
} }
// Announce kademlia id hash only once.
int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout)
{
int i, to = timeout * 1000000;
if (ksock == -1) {
return 0; // start thread first.
}
while (searching != 0) {
i = random() % 100000;
if (i > to) {
return 0; // timeout waiting a chance
}
usleep(i);
to -= i;
}
searching = -1; // lock.
for (i = 0 ; i < sizeof hash ; i++) {
hash[i] = id[i];
}
announce_port = port;
searching = 1; // announce.
return 1;
}
int announce_kademlia (char* peer_id, uint16_t port) int announce_kademlia (char* peer_id, uint16_t port)
{ {
unsigned char id[sizeof hash]; unsigned char id[sizeof hash];
@ -308,7 +397,7 @@ int announce_kademlia (char* peer_id, uint16_t port)
return 0; // Fail to alloc. return 0; // Fail to alloc.
} }
announce_once_kademlia (id, port, ANNOUNCE_WAIT_TOLERANCE); search_kademlia_internal (id, port, ANNOUNCE_WAIT_TOLERANCE * 1000000);
memcpy(p->hash, id, sizeof id); memcpy(p->hash, id, sizeof id);
p->port = port; p->port = port;
@ -324,37 +413,76 @@ int announce_kademlia (char* peer_id, uint16_t port)
return 1; // Announced and added to the list. return 1; // Announced and added to the list.
} }
int search_kademlia(char* peer_id, int timeout) struct MultiAddress** search_kademlia(char* peer_id, int timeout)
{ {
unsigned char id[sizeof hash]; unsigned char id[sizeof hash];
int i, to = timeout * 1000000; int i, to = timeout * 1000000;
struct search_struct *rp; // result pointer
struct MultiAddress **ret;
if (ksock == -1) { if (ksock == -1) {
return 0; // start thread first. return NULL; // start thread first.
} }
dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0);
while (searching != 0) { to = search_kademlia_internal (id, 0, to);
if (to == 0) return NULL; // time out.
// Wait for search completion.
for(;;) {
i = random() % 100000; i = random() % 100000;
if (i > to) { if (i > to) {
return 0; // timeout waiting a chance return NULL; // timeout.
} }
usleep(i); usleep(i);
for (rp = search_result ; rp ; rp = rp->next) {
if (memcmp(rp->hash, id, sizeof hash) == 0) { // Found.
char ipstr[INET6_ADDRSTRLEN + 1];
char str[sizeof ipstr + 16];
int c = 0;
to = search_kademlia_internal (id, 0, to); // Repeat search to collect result.
if (to == 0) return NULL; // time out.
usleep(2000000); // Wait a few seconds for the result.
ret = calloc(search_result->ipv4_count + search_result->ipv6_count + 1, // IPv4 + IPv6 itens and a NULL terminator.
sizeof (struct MultiAddress*)); // array of pointer.
if (!ret) {
return NULL;
}
for (i = 0 ; i < search_result->ipv4_count ; i++) {
if (inet_ntop(AF_INET, &search_result->ipv4[i].ip, ipstr, sizeof ipstr)) {
snprintf (str, sizeof str, "/ip4/%s/tcp/%d", ipstr, search_result->ipv4[i].port);
if (dht_debug) {
fprintf(dht_debug, "SEARCH %s (%d) = %s\n", peer_id, c, str);
}
ret[c] = multiaddress_new_from_string (str);
if (ret[c] > 0) { // Sucess.
c++;
}
}
}
for (i = 0 ; i < search_result->ipv6_count ; i++) {
if (inet_ntop(AF_INET6, search_result->ipv6[i].ip, ipstr, sizeof ipstr)) {
snprintf (str, sizeof str, "/ip6/%s/tcp/%d", ipstr, search_result->ipv6[i].port);
if (dht_debug) {
fprintf(dht_debug, "SEARCH %s (%d) = %s\n", peer_id, c, str);
}
ret[c] = multiaddress_new_from_string (str);
if (ret[c] > 0) { // Sucess.
c++;
}
}
}
ret[c] = NULL; // NULL terminator.
return ret;
}
}
to -= i; to -= i;
} }
searching = -1; // lock. return NULL;
for (i = 0 ; i < sizeof hash ; i++) {
hash[i] = id[i];
}
announce_port = 0;
searching = 1; // search.
return 1;
} }
int ping_kademlia (char *ip, uint16_t port) int ping_kademlia (char *ip, uint16_t port)