From e5e565272eb790adbef8a8b5725f7a093e776689 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Wed, 2 Aug 2017 09:43:27 -0500 Subject: [PATCH] Continue to listen to connected peers within bitswap engine --- core/null.c | 12 ++++++------ exchange/bitswap/engine.c | 10 ++++++++++ exchange/bitswap/peer_request_queue.c | 12 +++++++++--- include/ipfs/core/null.h | 13 +++++++++++++ 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/core/null.c b/core/null.c index 6867891..791aff2 100644 --- a/core/null.c +++ b/core/null.c @@ -52,10 +52,10 @@ int protocol_compare(const unsigned char* incoming, size_t incoming_size, const * @param connection_param the connection parameters * @returns True(1) on success, False(0) on error */ -int ipfs_null_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct null_connection_params *connection_param) { +int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct IpfsNode* local_node) { if (protocol_compare(incoming, incoming_size, "/secio")) { libp2p_logger_debug("null", "Attempting secure io connection...\n"); - if (!libp2p_secio_handshake(session, &connection_param->local_node->identity->private_key, connection_param->local_node->peerstore, 1)) { + if (!libp2p_secio_handshake(session, &local_node->identity->private_key, local_node->peerstore, 1)) { // rejecting connection libp2p_logger_debug("null", "Secure IO connection failed\n"); return 0; @@ -79,7 +79,7 @@ int ipfs_null_marshal(const unsigned char* incoming, size_t incoming_size, struc else { // try to get the Node struct HashtableNode* node = NULL; - if (!ipfs_merkledag_get(hash, hash_length, &node, connection_param->local_node->repo)) { + if (!ipfs_merkledag_get(hash, hash_length, &node, local_node->repo)) { _continue = 0; continue; } @@ -100,11 +100,11 @@ int ipfs_null_marshal(const unsigned char* incoming, size_t incoming_size, struc return 0; } // this handles 1 transaction - libp2p_routing_dht_handle_message(session, connection_param->local_node->peerstore, connection_param->local_node->providerstore); + libp2p_routing_dht_handle_message(session, local_node->peerstore, local_node->providerstore); libp2p_logger_log("null", LOGLEVEL_DEBUG, "kademlia message handled\n"); } else if (protocol_compare(incoming, incoming_size, "/ipfs/bitswap/")) { libp2p_logger_debug("null", "Attempting bitswap connection...\n"); - return ipfs_bitswap_network_handle_message(connection_param->local_node, session, incoming, incoming_size); + return ipfs_bitswap_network_handle_message(local_node, session, incoming, incoming_size); } else { libp2p_logger_error("null", "There was a problem with this connection. It is nothing I can handle. Disconnecting.\n"); @@ -174,7 +174,7 @@ void ipfs_null_connection (void *ptr) // We actually got something. Process the request... unsuccessful_counter = 0; libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); - int retVal = ipfs_null_marshal(results, bytes_read, session, connection_param); + int retVal = ipfs_multistream_marshal(results, bytes_read, session, connection_param->local_node); free(results); if (!retVal) { libp2p_logger_debug("null", "ipfs_null_marshal returned false\n"); diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index e36a786..ea08d7d 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -1,4 +1,5 @@ #include +#include "ipfs/core/null.h" #include "ipfs/exchange/bitswap/engine.h" #include "ipfs/exchange/bitswap/wantlist_queue.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" @@ -59,6 +60,15 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { while (!context->bitswap_engine->shutting_down) { struct PeerRequest* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue); if (item != NULL) { + // Do they have something on the network to process? + // did they send us something over the network? + unsigned char* buffer = NULL; + size_t buffer_len = 0; + if (item->peer->sessionContext->default_stream->read(item->peer->sessionContext, &buffer, &buffer_len, 1)) { + // handle it + ipfs_multistream_marshal(buffer, buffer_len, item->peer->sessionContext, context->ipfsNode); + } + // if there is something on the queue process it. ipfs_bitswap_peer_request_process_entry(context, item); } else { diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 51aa6dc..2af20af 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -184,6 +184,7 @@ int ipfs_bitswap_peer_request_cids_waiting(struct Libp2pVector* cidEntries) { int ipfs_bitswap_peer_request_something_to_do(struct PeerRequestEntry* entry) { if (entry != NULL) { struct PeerRequest* request = entry->current; + // do we have something in the queue? if (request->blocks_we_want_to_send->total > 0) return 1; if (request->cids_we_want->total > 0) @@ -204,9 +205,14 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* if (queue != NULL) { pthread_mutex_lock(&queue->queue_mutex); struct PeerRequestEntry* entry = queue->first; - if (entry != NULL && ipfs_bitswap_peer_request_something_to_do(entry)) { - retVal = entry->current; - queue->first = queue->first->next; + if (entry != NULL) { + if (ipfs_bitswap_peer_request_something_to_do(entry)) { + retVal = entry->current; + // move to the end of the queue + queue->first = queue->first->next; + queue->last->next = entry; + queue->last = entry; + } } pthread_mutex_unlock(&queue->queue_mutex); // disable temporarily diff --git a/include/ipfs/core/null.h b/include/ipfs/core/null.h index fbb0ae2..1903206 100644 --- a/include/ipfs/core/null.h +++ b/include/ipfs/core/null.h @@ -1,6 +1,19 @@ #pragma once +#include "libp2p/conn/session.h" +#include "ipfs/core/ipfs_node.h" + void *ipfs_null_connection (void *ptr); void *ipfs_null_listen (void *ptr); int ipfs_null_shutdown(); +/*** + * Handle the incoming request from a Multistream + * @param incoming the incoming request + * @param incoming_size the size of the request in bytes + * @param session the session context + * @param connection_param the connection parameters + * @returns True(1) on success, False(0) on error + */ +int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct IpfsNode* local_node); +