From 059a3286c901f8da38136a49f42fc9fc14bfdcc9 Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 31 Jul 2017 06:43:15 -0500 Subject: [PATCH] More bitswap implementation --- cid/cid.c | 2 +- core/null.c | 4 +- exchange/bitswap/message.c | 54 +++++++++++++++++ exchange/bitswap/network.c | 11 ++++ exchange/bitswap/peer_request_queue.c | 59 ++++++++++++++++++- exchange/bitswap/wantlist_queue.c | 27 +++++++-- include/ipfs/cid/cid.h | 2 +- include/ipfs/exchange/bitswap/message.h | 15 +++++ include/ipfs/exchange/bitswap/network.h | 10 ++++ .../exchange/bitswap/peer_request_queue.h | 14 ++++- include/ipfs/routing/routing.h | 2 +- routing/online.c | 10 ++-- test/exchange/test_bitswap.h | 6 +- test/routing/test_routing.h | 2 +- 14 files changed, 194 insertions(+), 24 deletions(-) diff --git a/cid/cid.c b/cid/cid.c index abc0103..d559ee4 100644 --- a/cid/cid.c +++ b/cid/cid.c @@ -22,7 +22,7 @@ size_t ipfs_cid_protobuf_encode_size(struct Cid* cid) { return 0; } -int ipfs_cid_protobuf_encode(struct Cid* cid, unsigned char* buffer, size_t buffer_length, size_t* bytes_written) { +int ipfs_cid_protobuf_encode(const struct Cid* cid, unsigned char* buffer, size_t buffer_length, size_t* bytes_written) { size_t bytes_used; *bytes_written = 0; int retVal = 0; diff --git a/core/null.c b/core/null.c index 34b91fb..64aa309 100644 --- a/core/null.c +++ b/core/null.c @@ -149,8 +149,8 @@ void ipfs_null_connection (void *ptr) break; } if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { - // the read was unsuccessful. We should close the connection. - libp2p_logger_debug("null", "%s stream transaction read returned false.\n", connection_param->local_node->identity->peer_id); + // the read was unsuccessful wait a sec + sleep(1); continue; } if (null_shutting_down) { diff --git a/exchange/bitswap/message.c b/exchange/bitswap/message.c index 768e7e2..b0342a0 100644 --- a/exchange/bitswap/message.c +++ b/exchange/bitswap/message.c @@ -638,5 +638,59 @@ int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_le return 1; } +/**** + * Add a vector of Cids to the bitswap message + * @param message the message + * @param cids a Libp2pVector of cids + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_message_add_wantlist_items(struct BitswapMessage* message, struct Libp2pVector* cids) { + if (message->wantlist == NULL) { + message->wantlist = ipfs_bitswap_wantlist_new(); + if (message->wantlist == NULL) + return 0; + } + if (message->wantlist->entries == NULL) { + message->wantlist->entries = libp2p_utils_vector_new(1); + if (message->wantlist->entries == NULL) + return 0; + } + for(int i = 0; i < cids->total; i++) { + const struct Cid* cid = (const struct Cid*)libp2p_utils_vector_get(cids, i); + struct WantlistEntry* entry = ipfs_bitswap_wantlist_entry_new(); + if (!ipfs_cid_protobuf_encode(cid, entry->block, entry->block_size, &entry->block_size)) { + // TODO: we should do more than return a half-baked list + return 0; + } + entry->cancel = 0; + entry->priority = 1; + libp2p_utils_vector_add(message->wantlist->entries, entry); + } + return 1; +} + +/*** + * Add the blocks to the BitswapMessage + * @param message the message + * @param blocks the requested blocks + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_message_add_blocks(struct BitswapMessage* message, struct Libp2pVector* blocks) { + // bitswap 1.0 uses blocks, bitswap 1.1 uses payload + + if (message == NULL) + return 0; + if (message->payload == NULL) { + message->payload = libp2p_utils_vector_new(blocks->total); + if (message->payload == NULL) + return 0; + } + for(int i = 0; i < blocks->total; i++) { + const struct Block* current = (const struct Block*) libp2p_utils_vector_get(blocks, i); + libp2p_utils_vector_add(message->payload, current); + } + return 1; +} + diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 496fef3..4f45896 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -23,3 +23,14 @@ ipfs_bitswap_network_receive_message(struct BitswapContext* context) { /** * We want to pop something off the queue */ + +/**** + * send a message to a particular peer + * @param context the BitswapContext + * @param peer the peer that is the recipient + * @param message the message to send + */ +int ipfs_bitswap_network_send_message(const struct BitswapContext* context, const struct Libp2pPeer* peer, const struct BitswapMessage* message) { + return 0; +} + diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 82faebd..ef4cb94 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -7,6 +7,8 @@ #include "libp2p/conn/session.h" #include "ipfs/cid/cid.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" +#include "ipfs/exchange/bitswap/message.h" +#include "ipfs/exchange/bitswap/network.h" /** * Allocate resources for a new PeerRequest @@ -17,7 +19,7 @@ struct PeerRequest* ipfs_bitswap_peer_request_new() { if (request != NULL) { request->cids = NULL; request->blocks = NULL; - request->context = NULL; + request->peer = NULL; } return request; } @@ -118,7 +120,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR if (request != NULL) { struct PeerRequestEntry* current = queue->first; while (current != NULL) { - if (libp2p_session_context_compare(current->current->context, request->context) == 0) + if (libp2p_peer_compare(current->current->peer, request->peer) == 0) return current; current = current->next; } @@ -193,7 +195,58 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct * @returns true(1) on succes, otherwise false(0) */ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) { - //TODO: Implement this method + // determine if we're connected + int connected = request->peer == NULL || request->peer->connection_type == CONNECTION_TYPE_CONNECTED; + int need_to_connect = request->cids != NULL; + + // determine if we need to connect + if (need_to_connect) { + if (!connected) { + // connect + connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, 0); + } + if (connected) { + // build a message + struct BitswapMessage* msg = ipfs_bitswap_message_new(); + // add requests + ipfs_bitswap_message_add_wantlist_items(msg, request->cids); + // add blocks + ipfs_bitswap_message_add_blocks(msg, request->blocks); + // send message + if (ipfs_bitswap_network_send_message(context, request->peer, msg)) + return 1; + } + } return 0; } +/*** + * Find a PeerRequest related to a peer. If one is not found, it is created. + * + * @param peer_request_queue the queue to look through + * @param peer the peer to look for + * @returns a PeerRequestEntry or NULL on error + */ +struct PeerRequest* ipfs_peer_request_queue_find_peer(struct PeerRequestQueue* queue, struct Libp2pPeer* peer) { + + struct PeerRequestEntry* entry = queue->first; + while (entry != NULL) { + if (libp2p_peer_compare(entry->current->peer, peer) == 0) { + return entry->current; + } + } + + entry = ipfs_bitswap_peer_request_entry_new(); + entry->current->peer = peer; + entry->prior = queue->last; + queue->last = entry; + + return entry->current; +} + + + + + + + diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index 979d555..8a82f35 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -231,15 +231,30 @@ int ipfs_bitswap_wantlist_get_block_locally(struct BitswapContext* context, stru * * This will ask the network for who has the file, using the router. * It will then ask the specific nodes for the file. This method - * does not queue anything. It actually does the work. + * does not queue anything. It actually does the work. The remotes + * will queue the file, but we'll return before they respond. * * @param context the BitswapContext * @param cid the id of the file - * @param block where to put the results - * @returns true(1) if found, false(0) otherwise + * @returns true(1) if we found some providers to ask, false(0) otherwise */ -int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid, struct Block** block) { - //TODO: Implement this workhorse of a method +int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid) { + // find out who may have the file + struct Libp2pVector* providers = NULL; + if (context->ipfsNode->routing->FindProviders(context->ipfsNode->routing, cid->hash, cid->hash_length, &providers)) { + for(int i = 0; i < providers->total; i++) { + struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i); + // add this to their queue + struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current); + libp2p_utils_vector_add(queueEntry->cids, cid); + // process this queue + // NOTE: We need to ask the remotes via bitswap, and wait for a response before returning + // there will need to be some fancy stuff to know when we get it back so that this method + // can return with the block + ipfs_bitswap_peer_request_process_entry(context, queueEntry); + } + return 1; + } return 0; } @@ -257,7 +272,7 @@ int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct W return 0; } if (local_request && !have_local) { - if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid, &entry->block)) { + if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid)) { // if we were unsuccessful in retrieving it, put it back in the queue? // I don't think so. But I'm keeping this counter here until we have // a final decision. Maybe lower the priority? diff --git a/include/ipfs/cid/cid.h b/include/ipfs/cid/cid.h index 7969f1a..10e50d2 100644 --- a/include/ipfs/cid/cid.h +++ b/include/ipfs/cid/cid.h @@ -45,7 +45,7 @@ struct CidSet { * @param max_buffer_length the length of the buffer * @param bytes_written the number of bytes written */ -int ipfs_cid_protobuf_encode(struct Cid* incoming, unsigned char* buffer, size_t max_buffer_length, size_t* bytes_written); +int ipfs_cid_protobuf_encode(const struct Cid* incoming, unsigned char* buffer, size_t max_buffer_length, size_t* bytes_written); /*** * decode an array of bytes into a Cid structure diff --git a/include/ipfs/exchange/bitswap/message.h b/include/ipfs/exchange/bitswap/message.h index 616b822..721b58f 100644 --- a/include/ipfs/exchange/bitswap/message.h +++ b/include/ipfs/exchange/bitswap/message.h @@ -1,3 +1,4 @@ +#pragma once /*** * A protobuf-able Bitswap Message */ @@ -200,4 +201,18 @@ int ipfs_bitswap_message_protobuf_encode(struct BitswapMessage* message, unsigne */ int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_length, struct BitswapMessage** output); +/**** + * Add a vector of Cids to the bitswap message + * @param message the message + * @param cids a Libp2pVector of cids + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_message_add_wantlist_items(struct BitswapMessage* message, struct Libp2pVector* cids); +/*** + * Add the blocks to the BitswapMessage + * @param message the message + * @param blocks the requested blocks + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_message_add_blocks(struct BitswapMessage* message, struct Libp2pVector* blocks); diff --git a/include/ipfs/exchange/bitswap/network.h b/include/ipfs/exchange/bitswap/network.h index f1b1f8a..10f2013 100644 --- a/include/ipfs/exchange/bitswap/network.h +++ b/include/ipfs/exchange/bitswap/network.h @@ -4,6 +4,8 @@ */ #include "libp2p/conn/session.h" +#include "libp2p/peer/peer.h" +#include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/exchange/bitswap/message.h" struct BitswapRouting { @@ -61,3 +63,11 @@ struct BitswapNetwork { */ struct BitswapMessageSender* (*NewMessageSender)(struct SessionContext* context, unsigned char* peerId); }; + +/**** + * send a message to a particular peer + * @param context the BitswapContext + * @param peer the peer that is the recipient + * @param message the message to send + */ +int ipfs_bitswap_network_send_message(const struct BitswapContext* context, const struct Libp2pPeer* peer, const struct BitswapMessage* message); diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h index be15ca3..2386df4 100644 --- a/include/ipfs/exchange/bitswap/peer_request_queue.h +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -1,15 +1,17 @@ +#pragma once /*** * A queue for requests to/from remote peers * NOTE: This must handle multiple threads */ #include +#include "libp2p/peer/peer.h" #include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/blocks/block.h" struct PeerRequest { pthread_mutex_t request_mutex; - struct SessionContext* context; + struct Libp2pPeer* peer; struct Libp2pVector* cids; struct Libp2pVector* blocks; }; @@ -112,3 +114,13 @@ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry); * @returns true(1) on succes, otherwise false(0) */ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request); + +/*** + * Find a PeerRequest related to a peer. If one is not found, it is created. + * + * @param peer_request_queue the queue to look through + * @param peer the peer to look for + * @returns a PeerRequestEntry or NULL on error + */ +struct PeerRequest* ipfs_peer_request_queue_find_peer(struct PeerRequestQueue* queue, struct Libp2pPeer* peer); + diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index 2267c24..609a5db 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -37,7 +37,7 @@ struct IpfsRouting { * @param routing the context * @param key the information that is being looked for * @param key_size the size of param 2 - * @param peers a vector of peers found that can provide the value for the key + * @param peers a vector of Libp2pPeers found that can provide the value for the key * @returns true(1) on success, otherwise false(0) */ int (*FindProviders) (struct IpfsRouting* routing, const unsigned char* key, size_t key_size, struct Libp2pVector** peers); diff --git a/routing/online.c b/routing/online.c index b461bfd..0f023a6 100644 --- a/routing/online.c +++ b/routing/online.c @@ -36,8 +36,8 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon } // send the message, and expect the same back - sessionContext->default_stream->write(sessionContext->default_stream, protobuf, protobuf_size); - sessionContext->default_stream->read(sessionContext->default_stream, &results, &results_size, 5); + sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size); + sessionContext->default_stream->read(sessionContext, &results, &results_size, 5); // see if we can unprotobuf if (!libp2p_message_protobuf_decode(results, results_size, &return_message)) @@ -291,7 +291,7 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee size_t protobuf_size; if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(peer, 5)) + if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, 5)) return 0; } if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { @@ -411,7 +411,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k if (!libp2p_peer_is_connected(current_peer)) { // attempt to connect. If unsuccessful, continue in the loop. libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n"); - if (libp2p_peer_connect(current_peer, 5)) { + if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, 5)) { libp2p_logger_debug("online", "Peer connected\n"); if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) { libp2p_logger_debug("online", "Retrieved a value\n"); @@ -484,7 +484,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { return -1; // this should never happen } if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) - libp2p_peer_connect(peer, 5); + libp2p_peer_connect(&routing->local_node->identity->private_key, peer, 5); } } } diff --git a/test/exchange/test_bitswap.h b/test/exchange/test_bitswap.h index bdd1639..19291fa 100644 --- a/test/exchange/test_bitswap.h +++ b/test/exchange/test_bitswap.h @@ -164,9 +164,6 @@ int test_bitswap_retrieve_file_third_party() { int retVal = 0; /* - libp2p_logger_add_class("online"); - libp2p_logger_add_class("multistream"); - libp2p_logger_add_class("null"); libp2p_logger_add_class("dht_protocol"); libp2p_logger_add_class("providerstore"); libp2p_logger_add_class("peerstore"); @@ -174,6 +171,9 @@ int test_bitswap_retrieve_file_third_party() { libp2p_logger_add_class("peer"); */ libp2p_logger_add_class("test_bitswap"); + libp2p_logger_add_class("null"); + libp2p_logger_add_class("online"); + libp2p_logger_add_class("multistream"); // clean out repository char* ipfs_path = "/tmp/test1"; diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index 78575b7..ceaa6c0 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -233,7 +233,7 @@ int test_routing_find_providers() { struct Libp2pPeer *remote_peer = NULL; for(int i = 0; i < result->total; i++) { remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i); - if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(remote_peer, 5)) { + if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(&local_node.identity->private_key, remote_peer, 5)) { break; } remote_peer = NULL;