diff --git a/include/libp2p/routing/kademlia.h b/include/libp2p/routing/kademlia.h new file mode 100644 index 0000000..03a5421 --- /dev/null +++ b/include/libp2p/routing/kademlia.h @@ -0,0 +1,14 @@ +#pragma once + +int start_kademlia(int sock, int family, char* peer_id, int timeout); +void stop_kademlia (void); + +void *kademlia_thread (void *ptr); +void *announce_thread (void *ptr); + +int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout); + +int announce_kademlia (char* peer_id, uint16_t port); +int search_kademlia(char* peer_id, int timeout); + +int ping_kademlia (char *ip, uint16_t port); diff --git a/routing/kademlia.c b/routing/kademlia.c index 26dca1d..1b11336 100644 --- a/routing/kademlia.c +++ b/routing/kademlia.c @@ -15,22 +15,14 @@ #include #include #include +#include #include #define MAX_BOOTSTRAP_NODES 20 static struct sockaddr_storage bootstrap_nodes[MAX_BOOTSTRAP_NODES]; static int num_bootstrap_nodes = 0; -pthread_t pth; -time_t tosleep = 0; -int ksock = -1; -int net_family = 0; -volatile int searching = 0; // search lock, -1 to busy, 0 to free, 1 to running. -volatile char hash[20]; // hash to be search or announce. -volatile int announce_port = 0; -volatile int closing = 0; - -struct bs_list { +struct bs_struct { char *ip; uint16_t port; } bootstrap_list[] = { @@ -38,6 +30,24 @@ struct bs_list { { "127.0.0.1", 4321 } }; +pthread_t pth_kademlia, pth_announce; +time_t tosleep = 0; +int ksock = -1; +int net_family = 0; +volatile int8_t searching = 0; // search lock, -1 to busy, 0 to free, 1 to running. +volatile char hash[20]; // hash to be search or announce. +volatile uint16_t announce_port = 0; +volatile int8_t closing = 0; + +#define ANNOUNCE_WAIT_TIME (28 * 60) // Wait 28 minutes. +#define ANNOUNCE_WAIT_TOLERANCE 60 +struct announce_struct { + unsigned char hash[20]; + uint16_t port; + unsigned int time; + struct announce_struct *next; +} *announce_list = NULL; + /* The call-back function is called by the DHT whenever something interesting happens. Right now, it only happens when we get a new value or when a search completes, but this may be extended in future versions. */ @@ -61,6 +71,90 @@ callback(void *closure, } } +int start_kademlia(int sock, int family, char* peer_id, int timeout) +{ + int rc, i, len; + unsigned char id[sizeof hash]; + struct sockaddr_in sa; + + len = sizeof(bootstrap_list) / sizeof(bootstrap_list[0]); // array length + + if (len > MAX_BOOTSTRAP_NODES) { + len = MAX_BOOTSTRAP_NODES; // limit array length + } + + memset(&sa, 0, sizeof sa); + for (i = 0 ; i < len ; i++) { + if (family == AF_INET6 && inet_pton(AF_INET6, bootstrap_list[i].ip, &(sa.sin_addr.s_addr)) == 1) { + sa.sin_family = AF_INET6; + } else if (inet_pton(AF_INET, bootstrap_list[i].ip, &(sa.sin_addr.s_addr)) == 1) { + sa.sin_family = AF_INET; + } else { + continue; // not an ipv6 or ipv4? + } + + sa.sin_port = htons (bootstrap_list[i].port); + + memcpy(&bootstrap_nodes[num_bootstrap_nodes++], &sa, sizeof(sa)); + } + + dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); + + if (family == AF_INET6) { + rc = dht_init(-1, sock, id, NULL); + } else { + rc = dht_init(sock, -1, id, NULL); + } + if (rc < 0) { + return rc; + } + + /* For bootstrapping, we need an initial list of nodes. This could be + hard-wired, but can also be obtained from the nodes key of a torrent + file, or from the PORT bittorrent message. + + Dht_ping_node is the brutal way of bootstrapping -- it actually + sends a message to the peer. If you're going to bootstrap from + a massive number of nodes (for example because you're restoring from + a dump) and you already know their ids, it's better to use + dht_insert_node. If the ids are incorrect, the DHT will recover. */ + for(i = 0; i < num_bootstrap_nodes; i++) { + dht_ping_node((struct sockaddr*)&bootstrap_nodes[i], + sizeof (bootstrap_nodes[i])); + usleep(random() % 100000); + } + + // TODO: Read cache nodes from file and load using dht_insert_node. + + ksock = sock; + net_family = family; + tosleep = timeout; + + rc = pthread_create(&pth_kademlia, NULL, kademlia_thread, NULL); + if (rc) { + return rc; // error + } + + return pthread_create(&pth_announce, NULL, announce_thread, NULL); +} + +void stop_kademlia (void) +{ + if (ksock != -1) { + closing = 1; + + pthread_cancel(pth_announce); + + // Wait kademlia_thread finish. + pthread_join(pth_kademlia, NULL); + + dht_uninit(); + + close (ksock); + ksock = -1; + } +} + void *kademlia_thread (void *ptr) { int rc; @@ -126,78 +220,159 @@ void *kademlia_thread (void *ptr) } } -int bootstrap_kademlia(int sock, int family, char* peer_id, int timeout) +void *announce_thread (void *ptr) { - int rc, i, len; - unsigned char id[20]; - struct sockaddr_in sa; + unsigned int wait; + struct announce_struct *n, *p; - len = sizeof(bootstrap_list) / sizeof(bootstrap_list[0]); // array length + for(;;) { + if (announce_list) { + unsigned int now, minus_time = ((unsigned int) -1); - if (len > MAX_BOOTSTRAP_NODES) { - len = MAX_BOOTSTRAP_NODES; // limit array length + p = NULL; + // find max wait value. + for (n = announce_list ; n ; n = n->next) { + if (n->time < minus_time) { + minus_time = n->time; + p = n; + } + } + if (p) { + now = time(NULL); + if ((minus_time + ANNOUNCE_WAIT_TIME) > (now + ANNOUNCE_WAIT_TOLERANCE)) { + wait = ANNOUNCE_WAIT_TIME - (now - minus_time); + sleep (wait); + } else { + if (p) { + announce_once_kademlia (p->hash, p->port, ANNOUNCE_WAIT_TOLERANCE); + p->time = time(NULL); + } + } + continue; + } + } + // Empty list, just wait. + sleep (ANNOUNCE_WAIT_TIME); + } +} + +// Announce kademlia id hash only once. +int announce_once_kademlia(unsigned char* id, uint16_t port, int timeout) +{ + int i, to = timeout * 1000000; + + if (ksock == -1) { + return 0; // start thread first. } - memset(&sa, 0, sizeof sa); - for (i = 0 ; i < len ; i++) { - if (family == AF_INET6 && inet_pton(AF_INET6, bootstrap_list[i].ip, &(sa.sin_addr.s_addr)) == 1) { - sa.sin_family = AF_INET6; - } else if (inet_pton(AF_INET, bootstrap_list[i].ip, &(sa.sin_addr.s_addr)) == 1) { - sa.sin_family = AF_INET; - } else { - continue; // not an ipv6 or ipv4? + while (searching != 0) { + i = random() % 100000; + if (i > to) { + return 0; // timeout waiting a chance } + usleep(i); + to -= i; + } - sa.sin_port = htons (bootstrap_list[i].port); + searching = -1; // lock. - memcpy(&bootstrap_nodes[num_bootstrap_nodes++], &sa, sizeof(sa)); + for (i = 0 ; i < sizeof hash ; i++) { + hash[i] = id[i]; + } + + announce_port = port; + + searching = 1; // announce. + + return 1; +} + +int announce_kademlia (char* peer_id, uint16_t port) +{ + unsigned char id[sizeof hash]; + struct announce_struct *n, *p; + + dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); + + for (n = announce_list ; n ; n = n->next) { + if (memcmp(n->hash, id, sizeof id) == 0) { + return 0; // Already on the list. + } + if (! (n->next)) { + break; // Keep n->next at the insertion point. + } + } + + + if ((p = malloc (sizeof(struct announce_struct))) == NULL) { + return 0; // Fail to alloc. + } + + announce_once_kademlia (id, port, ANNOUNCE_WAIT_TOLERANCE); + + memcpy(p->hash, id, sizeof id); + p->port = port; + p->next = NULL; + p->time = time(NULL); + + if (!announce_list) { + announce_list = p; + } else { + n->next = p; + } + + return 1; // Announced and added to the list. +} + +int search_kademlia(char* peer_id, int timeout) +{ + unsigned char id[sizeof hash]; + int i, to = timeout * 1000000; + + if (ksock == -1) { + return 0; // start thread first. } dht_hash (id, sizeof(id), peer_id, strlen(peer_id), NULL, 0, NULL, 0); - if (family == AF_INET6) { - rc = dht_init(-1, sock, id, NULL); - } else { - rc = dht_init(sock, -1, id, NULL); - } - if (rc < 0) { - return rc; + while (searching != 0) { + i = random() % 100000; + if (i > to) { + return 0; // timeout waiting a chance + } + usleep(i); + to -= i; } - /* For bootstrapping, we need an initial list of nodes. This could be - hard-wired, but can also be obtained from the nodes key of a torrent - file, or from the PORT bittorrent message. + searching = -1; // lock. - Dht_ping_node is the brutal way of bootstrapping -- it actually - sends a message to the peer. If you're going to bootstrap from - a massive number of nodes (for example because you're restoring from - a dump) and you already know their ids, it's better to use - dht_insert_node. If the ids are incorrect, the DHT will recover. */ - for(i = 0; i < num_bootstrap_nodes; i++) { - dht_ping_node((struct sockaddr*)&bootstrap_nodes[i], - sizeof (bootstrap_nodes[i])); - usleep(random() % 100000); + for (i = 0 ; i < sizeof hash ; i++) { + hash[i] = id[i]; } - // TODO: Read cache nodes from file and load using dht_insert_node. + announce_port = 0; - ksock = sock; - net_family = family; - tosleep = timeout; + searching = 1; // search. - return pthread_create(&pth, NULL, kademlia_thread, NULL); + return 1; } -void stop_kademlia (void) +int ping_kademlia (char *ip, uint16_t port) { - closing = 1; + struct sockaddr_in sa; - // Wait kademlia_thread finish. - (void) pthread_join(pth, NULL); + if (inet_pton(AF_INET6, ip, &(sa.sin_addr.s_addr)) == 1) { + sa.sin_family = AF_INET6; + } if (inet_pton(AF_INET, ip, &(sa.sin_addr.s_addr)) == 1) { + sa.sin_family = AF_INET; + } else { + return 0; + } - dht_uninit(); + dht_ping_node((struct sockaddr*)&sa, sizeof sa); + //usleep(random() % 100000); - close (ksock); + return 1; } /* Functions called by the DHT. */ @@ -240,7 +415,7 @@ void dht_hash (void *hash_return, int hash_size, int dht_random_bytes (void *buf, size_t size) { - int fd, rc, save; + int fd, rc = 0, save; size_t len = 0; fd = open("/dev/urandom", O_RDONLY);