diff --git a/include/libp2p/routing/kademlia.h b/include/libp2p/routing/kademlia.h index 03a5421..d683439 100644 --- a/include/libp2p/routing/kademlia.h +++ b/include/libp2p/routing/kademlia.h @@ -6,9 +6,7 @@ void stop_kademlia (void); void *kademlia_thread (void *ptr); void *announce_thread (void *ptr); -int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout); - int announce_kademlia (char* peer_id, uint16_t port); -int search_kademlia(char* peer_id, int timeout); +struct MultiAddress** search_kademlia(char* peer_id, int timeout); int ping_kademlia (char *ip, uint16_t port); diff --git a/routing/Makefile b/routing/Makefile index 7804bc4..6446cae 100644 --- a/routing/Makefile +++ b/routing/Makefile @@ -1,6 +1,6 @@ DHT_DIR = dht CC = gcc -CFLAGS = -O0 -I../include -I$(DHT_DIR) -g3 +CFLAGS = -O0 -I../include -I../../c-multiaddr/include -I$(DHT_DIR) -g3 LFLAGS = DEPS = $(DHT_DIR)/dht.h OBJS = kademlia.o dht.o diff --git a/routing/kademlia.c b/routing/kademlia.c index 99c14e7..71317c6 100644 --- a/routing/kademlia.c +++ b/routing/kademlia.c @@ -17,6 +17,9 @@ #include #include #include +#include + +extern FILE *dht_debug; #define MAX_BOOTSTRAP_NODES 20 static struct sockaddr_storage bootstrap_nodes[MAX_BOOTSTRAP_NODES]; @@ -42,12 +45,34 @@ volatile int8_t closing = 0; #define ANNOUNCE_WAIT_TIME (28 * 60) // Wait 28 minutes. #define ANNOUNCE_WAIT_TOLERANCE 60 struct announce_struct { - unsigned char hash[20]; + unsigned char hash[sizeof hash]; uint16_t port; unsigned int time; struct announce_struct *next; } *announce_list = NULL; +#define DHT_MAX_IPV4 50 +#define DHT_MAX_IPV6 10 + +struct ipv4_struct { + uint32_t ip; + uint16_t port; +}; + +struct ipv6_struct { + uint8_t ip[16]; + uint16_t port; +}; + +struct search_struct { + unsigned char hash[sizeof hash]; + uint8_t ipv4_count; + uint8_t ipv6_count; + struct ipv4_struct ipv4[DHT_MAX_IPV4]; + struct ipv6_struct ipv6[DHT_MAX_IPV6]; + struct search_struct *next; +} *search_result = NULL; + /* The call-back function is called by the DHT whenever something interesting happens. Right now, it only happens when we get a new value or when a search completes, but this may be extended in future versions. */ @@ -57,14 +82,83 @@ callback(void *closure, const unsigned char *info_hash, const void *data, size_t data_len) { + struct search_struct *sp, *rp = NULL; // struct pointer and result pointer + switch (event) { case DHT_EVENT_VALUES: case DHT_EVENT_VALUES6: - printf("Received %d values.\n", (int)(data_len / 6)); - break; + if (dht_debug) { + fprintf(dht_debug, "Received %d values.\n", (int)(data_len / 6)); + } + // Find the item in the list. + for (rp = search_result ; rp ; rp = rp->next) { + if (memcmp(rp->hash, info_hash, sizeof hash) == 0) { // Found. + int i; + if (event == DHT_EVENT_VALUES) { // IPv4 + struct ipv4_struct ipv4; + if (rp->ipv4_count == DHT_MAX_IPV4) { // Full + return; + } + // Make sure the data is in struct format. + memcpy(&ipv4.ip, data, 4); + memcpy(&ipv4.port, data+4, 2); + ipv4.port = ntohs(ipv4.port); + for (i = 0 ; i < rp->ipv4_count ; i++) { + if (memcmp(&rp->ipv4[i], &ipv4, sizeof ipv4)) { + return; // Alread in the list. + } + } + // Not in the list, then add. + memcpy(&rp->ipv4[rp->ipv4_count], &ipv4, sizeof ipv4); + rp->ipv4_count++; + } else { // IPv6 + struct ipv6_struct ipv6; + if (rp->ipv6_count == DHT_MAX_IPV6) { // Full + return; + } + // Make sure the data is in struct format. + memcpy(&ipv6.ip, data, 16); + memcpy(&ipv6.port, data+16, 2); + for (i = 0 ; i < rp->ipv6_count ; i++) { + if (memcmp(&rp->ipv6[i], &ipv6, sizeof ipv6)) { + return; // Alread in the list. + } + } + // Not in the list, then add. + memcpy(&rp->ipv6[rp->ipv6_count], &ipv6, sizeof ipv6); + rp->ipv6_count++; + } + return; + } + } + break; // Not found, how can EVENT_VALUE may occur before SEARCH_DONE? case DHT_EVENT_SEARCH_DONE: case DHT_EVENT_SEARCH_DONE6: - printf("Search done.\n"); + if (dht_debug) { + fprintf(dht_debug, "Search done.\n"); + } + if (search_result) { + // Try to find the item in the list. + for (sp = search_result ; sp->next ; sp = sp->next) { + if (memcmp(sp->hash, info_hash, sizeof hash) == 0) { // Found. + rp = sp; + break; + } + } + if (!rp) { + rp = malloc(sizeof(struct search_struct)); + if (!rp) return; // Abort, out of memory. + memset(rp, 0, sizeof(struct search_struct)); + memcpy(rp->hash, info_hash, sizeof hash); + sp->next = rp; // Insert in the list. + } + } else { + rp = malloc(sizeof(struct search_struct)); + if (!rp) return; // Abort, out of memory. + memset(rp, 0, sizeof(struct search_struct)); + memcpy(rp->hash, info_hash, sizeof hash); + search_result = rp; // Insert first item in the list. + } break; default: break; @@ -220,6 +314,32 @@ void *kademlia_thread (void *ptr) } } +int search_kademlia_internal (unsigned char* id, int port, int to) +{ + int i; + + while (searching != 0) { + i = random() % 100000; + if (i > to) { + return 0; // timeout waiting a chance + } + usleep(i); + to -= i; + } + + searching = -1; // lock. + + for (i = 0 ; i < sizeof hash ; i++) { + hash[i] = id[i]; + } + + announce_port = port; + + searching = 1; // search. + + return to; +} + void *announce_thread (void *ptr) { unsigned int wait; @@ -244,7 +364,7 @@ void *announce_thread (void *ptr) sleep (wait); } else { if (p) { - announce_once_kademlia (p->hash, p->port, ANNOUNCE_WAIT_TOLERANCE); + search_kademlia_internal (p->hash, p->port, ANNOUNCE_WAIT_TOLERANCE * 1000000); p->time = time(NULL); } } @@ -256,37 +376,6 @@ void *announce_thread (void *ptr) } } -// Announce kademlia id hash only once. -int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout) -{ - int i, to = timeout * 1000000; - - if (ksock == -1) { - return 0; // start thread first. - } - - while (searching != 0) { - i = random() % 100000; - if (i > to) { - return 0; // timeout waiting a chance - } - usleep(i); - to -= i; - } - - searching = -1; // lock. - - for (i = 0 ; i < sizeof hash ; i++) { - hash[i] = id[i]; - } - - announce_port = port; - - searching = 1; // announce. - - return 1; -} - int announce_kademlia (char* peer_id, uint16_t port) { unsigned char id[sizeof hash]; @@ -308,7 +397,7 @@ int announce_kademlia (char* peer_id, uint16_t port) return 0; // Fail to alloc. } - announce_once_kademlia (id, port, ANNOUNCE_WAIT_TOLERANCE); + search_kademlia_internal (id, port, ANNOUNCE_WAIT_TOLERANCE * 1000000); memcpy(p->hash, id, sizeof id); p->port = port; @@ -324,37 +413,76 @@ int announce_kademlia (char* peer_id, uint16_t port) return 1; // Announced and added to the list. } -int search_kademlia(char* peer_id, int timeout) +struct MultiAddress** search_kademlia(char* peer_id, int timeout) { unsigned char id[sizeof hash]; int i, to = timeout * 1000000; + struct search_struct *rp; // result pointer + struct MultiAddress **ret; if (ksock == -1) { - return 0; // start thread first. + return NULL; // start thread first. } dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); - while (searching != 0) { + to = search_kademlia_internal (id, 0, to); + if (to == 0) return NULL; // time out. + + // Wait for search completion. + for(;;) { i = random() % 100000; if (i > to) { - return 0; // timeout waiting a chance + return NULL; // timeout. } usleep(i); + for (rp = search_result ; rp ; rp = rp->next) { + if (memcmp(rp->hash, id, sizeof hash) == 0) { // Found. + char ipstr[INET6_ADDRSTRLEN + 1]; + char str[sizeof ipstr + 16]; + int c = 0; + + to = search_kademlia_internal (id, 0, to); // Repeat search to collect result. + if (to == 0) return NULL; // time out. + usleep(2000000); // Wait a few seconds for the result. + ret = calloc(search_result->ipv4_count + search_result->ipv6_count + 1, // IPv4 + IPv6 itens and a NULL terminator. + sizeof (struct MultiAddress*)); // array of pointer. + if (!ret) { + return NULL; + } + + for (i = 0 ; i < search_result->ipv4_count ; i++) { + if (inet_ntop(AF_INET, &search_result->ipv4[i].ip, ipstr, sizeof ipstr)) { + snprintf (str, sizeof str, "/ip4/%s/tcp/%d", ipstr, search_result->ipv4[i].port); + if (dht_debug) { + fprintf(dht_debug, "SEARCH %s (%d) = %s\n", peer_id, c, str); + } + ret[c] = multiaddress_new_from_string (str); + if (ret[c] > 0) { // Sucess. + c++; + } + } + } + for (i = 0 ; i < search_result->ipv6_count ; i++) { + if (inet_ntop(AF_INET6, search_result->ipv6[i].ip, ipstr, sizeof ipstr)) { + snprintf (str, sizeof str, "/ip6/%s/tcp/%d", ipstr, search_result->ipv6[i].port); + if (dht_debug) { + fprintf(dht_debug, "SEARCH %s (%d) = %s\n", peer_id, c, str); + } + ret[c] = multiaddress_new_from_string (str); + if (ret[c] > 0) { // Sucess. + c++; + } + } + } + ret[c] = NULL; // NULL terminator. + return ret; + } + } to -= i; } - searching = -1; // lock. - - for (i = 0 ; i < sizeof hash ; i++) { - hash[i] = id[i]; - } - - announce_port = 0; - - searching = 1; // search. - - return 1; + return NULL; } int ping_kademlia (char *ip, uint16_t port)