diff --git a/core/daemon.c b/core/daemon.c index a34e649..bfcd0e6 100644 --- a/core/daemon.c +++ b/core/daemon.c @@ -19,7 +19,7 @@ int ipfs_daemon_start(char* repo_path) { struct IpfsNodeListenParams listen_param; struct MultiAddress* ma = NULL; - libp2p_logger_info("daemon", "Initializing daemon...\n"); + libp2p_logger_info("daemon", "Initializing daemon for %s...\n", repo_path); struct IpfsNode* local_node = NULL; if (!ipfs_node_online_new(repo_path, &local_node)) @@ -37,7 +37,7 @@ int ipfs_daemon_start(char* repo_path) { goto exit; } - local_node->routing->Bootstrap (local_node->routing); + local_node->routing->Bootstrap(local_node->routing); libp2p_logger_info("daemon", "Daemon for %s is ready on port %d\n", listen_param.local_node->identity->peer_id, listen_param.port); @@ -51,7 +51,7 @@ int ipfs_daemon_start(char* repo_path) { retVal = 1; exit: - libp2p_logger_debug("daemon", "Cleaning up daemon processes\n"); + libp2p_logger_debug("daemon", "Cleaning up daemon processes for %s\n", repo_path); // clean up if (ma != NULL) multiaddress_free(ma); diff --git a/core/null.c b/core/null.c index 8d0d909..34b91fb 100644 --- a/core/null.c +++ b/core/null.c @@ -7,16 +7,17 @@ #include #include -#include "libp2p/net/p2pnet.h" -#include "libp2p/record/message.h" +#include "libp2p/conn/session.h" #include "libp2p/net/multistream.h" +#include "libp2p/net/p2pnet.h" +#include "libp2p/nodeio/nodeio.h" +#include "libp2p/record/message.h" +#include "libp2p/routing/dht_protocol.h" +#include "libp2p/secio/secio.h" #include "libp2p/utils/logger.h" #include "ipfs/core/daemon.h" #include "ipfs/routing/routing.h" #include "ipfs/core/ipfs_node.h" -#include "libp2p/secio/secio.h" -#include "libp2p/nodeio/nodeio.h" -#include "libp2p/routing/dht_protocol.h" #include "ipfs/merkledag/merkledag.h" #include "ipfs/merkledag/node.h" #include "ipfs/util/thread_pool.h" @@ -109,37 +110,51 @@ int ipfs_null_marshal(const unsigned char* incoming, size_t incoming_size, struc } /** - * We've received a connection. Find out what they want + * We've received a connection. Find out what they want. + * + * @param ptr a pointer to a null_connection_params struct */ void ipfs_null_connection (void *ptr) { - struct null_connection_params *connection_param = NULL; - - connection_param = (struct null_connection_params*) ptr; + struct null_connection_params *connection_param = (struct null_connection_params*) ptr; // TODO: when should we exit the for loop and disconnect? - struct SessionContext session; - session.insecure_stream = libp2p_net_multistream_stream_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); - session.default_stream = session.insecure_stream; - session.datastore = connection_param->local_node->repo->config->datastore; - session.filestore = connection_param->local_node->repo->config->filestore; + struct SessionContext* session = libp2p_session_context_new(); + if (session == NULL) { + libp2p_logger_error("null", "Unable to allocate SessionContext. Out of memory?\n"); + return; + } + + session->insecure_stream = libp2p_net_multistream_stream_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); + + libp2p_logger_debug("null", "%s null has a file descriptor of %d\n", connection_param->local_node->identity->peer_id, *((int*)session->insecure_stream->socket_descriptor) ); + + session->default_stream = session->insecure_stream; + session->datastore = connection_param->local_node->repo->config->datastore; + session->filestore = connection_param->local_node->repo->config->filestore; libp2p_logger_log("null", LOGLEVEL_INFO, "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); - if (libp2p_net_multistream_negotiate(&session)) { + if (libp2p_net_multistream_negotiate(session)) { // Someone has connected and successfully negotiated multistream. Now talk to them... for(;;) { // Wait for them to ask something... unsigned char* results = NULL; size_t bytes_read = 0; - if (!session.default_stream->read(&session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { - // the read was unsuccessful. We should close the connection. - libp2p_logger_debug("null", "stream transaction read returned false\n"); + if (null_shutting_down) { + libp2p_logger_debug("null", "%s null shutting down before read.\n", connection_param->local_node->identity->peer_id); + // this service is shutting down. Ignore the request and exit the loop break; } + if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { + // the read was unsuccessful. We should close the connection. + libp2p_logger_debug("null", "%s stream transaction read returned false.\n", connection_param->local_node->identity->peer_id); + continue; + } if (null_shutting_down) { + libp2p_logger_debug("null", "%s null shutting down after read.\n", connection_param->local_node->identity->peer_id); // this service is shutting down. Ignore the request and exit the loop break; } @@ -151,7 +166,7 @@ void ipfs_null_connection (void *ptr) // We actually got something. Process the request... libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); - int retVal = ipfs_null_marshal(results, bytes_read, &session, connection_param); + int retVal = ipfs_null_marshal(results, bytes_read, session, connection_param); free(results); if (!retVal) { libp2p_logger_debug("null", "ipfs_null_marshal returned false\n"); @@ -162,19 +177,20 @@ void ipfs_null_connection (void *ptr) libp2p_logger_log("null", LOGLEVEL_DEBUG, "Multistream negotiation failed\n"); } - if (session.default_stream != NULL) { - session.default_stream->close(&session); - } - if (session.insecure_stream != NULL) { - libp2p_net_multistream_stream_free(session.insecure_stream); - } + libp2p_logger_debug("null", "%s Freeing session context.\n", connection_param->local_node->identity->peer_id); (*(connection_param->count))--; // update counter. if (connection_param->ip != NULL) free(connection_param->ip); free (connection_param); + libp2p_session_context_free(session); return; } +/*** + * Called by the daemon to listen for connections + * @param ptr a pointer to an IpfsNodeListenParams struct + * @returns nothing useful. + */ void* ipfs_null_listen (void *ptr) { int socketfd, s, count = 0; @@ -192,9 +208,10 @@ void* ipfs_null_listen (void *ptr) libp2p_logger_error("null", "Ipfs listening on %d\n", listen_param->port); for (;;) { - libp2p_logger_debug("null", "Attempting socket read\n"); + libp2p_logger_debug("null", "%s Attempting socket read\n", listen_param->local_node->identity->peer_id); int numDescriptors = socket_read_select4(socketfd, 2); if (null_shutting_down) { + libp2p_logger_debug("null", "%s null_listen shutting down.\n", listen_param->local_node->identity->peer_id); break; } if (numDescriptors > 0) { diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 1456800..b74fcf5 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -53,6 +53,9 @@ struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) { int ipfs_bitswap_free(struct Exchange* exchange) { if (exchange != NULL) { if (exchange->exchangeContext != NULL) { + struct BitswapContext* bitswapContext = (struct BitswapContext*) exchange->exchangeContext; + if (bitswapContext != NULL) + ipfs_bitswap_engine_stop(bitswapContext); free(exchange->exchangeContext); } free(exchange); diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index 4877673..3ff1458 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -101,5 +101,7 @@ int ipfs_bitswap_engine_stop(const struct BitswapContext* context) { int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread, NULL); int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread, NULL); + ipfs_bitswap_engine_free(context->bitswap_engine); + return !error1 && !error2; } diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index 760fcf2..979d555 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -129,7 +129,7 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_pop(struct WantListQueue* wantlist) { struct WantListQueueEntry* entry = NULL; - if (wantlist->queue->total == 0) + if (wantlist == NULL || wantlist->queue == NULL || wantlist->queue->total == 0) return entry; //TODO: This should be a linked list, not an array diff --git a/test/exchange/test_bitswap.h b/test/exchange/test_bitswap.h index 3b464f4..bdd1639 100644 --- a/test/exchange/test_bitswap.h +++ b/test/exchange/test_bitswap.h @@ -118,7 +118,6 @@ int test_bitswap_protobuf() { int test_bitswap_retrieve_file() { int retVal = 0; - struct Exchange* exchange = NULL; struct IpfsNode* localNode = NULL; const char* ipfs_path = "/tmp/ipfstest1"; struct HashtableNode* node = NULL; // the node created by adding the file @@ -140,11 +139,8 @@ int test_bitswap_retrieve_file() if (cid == NULL) goto exit; - // fire up the exchange - exchange = ipfs_bitswap_new(localNode); - // attempt to retrieve the file - if (!exchange->GetBlock(exchange, cid, &block)) { + if (!localNode->exchange->GetBlock(localNode->exchange, cid, &block)) { goto exit; } @@ -157,9 +153,6 @@ int test_bitswap_retrieve_file() ipfs_cid_free(cid); if (node != NULL) ipfs_hashtable_node_free(node); - if (exchange != NULL) { - exchange->Close(exchange); - } ipfs_node_free(localNode); return retVal; } diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index c341dae..78575b7 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -313,6 +313,9 @@ int test_routing_provide() { struct Libp2pVector* ma_vector2 = NULL; struct HashtableNode* node = NULL; + libp2p_logger_add_class("daemon"); + libp2p_logger_add_class("null"); + // create peer 1 drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1); char multiaddress_string[255]; @@ -366,6 +369,8 @@ int test_routing_provide() { } if (node != NULL) ipfs_hashtable_node_free(node); + if (ma_peer1 != NULL) + multiaddress_free(ma_peer1); return retVal; } diff --git a/test/testit.c b/test/testit.c index 182d568..f9aa439 100644 --- a/test/testit.c +++ b/test/testit.c @@ -71,9 +71,9 @@ const char* names[] = { "test_merkledag_add_node", "test_merkledag_add_node_with_links", "test_resolver_get", - "test_routing_find_peer"/*, + "test_routing_find_peer", + "test_routing_provide" /*, "test_routing_find_providers", - "test_routing_provide", "test_routing_supernode_get_value", "test_routing_supernode_get_remote_value", "test_routing_retrieve_file_third_party", @@ -125,7 +125,8 @@ int (*funcs[])(void) = { test_merkledag_add_node, test_merkledag_add_node_with_links, test_resolver_get, - test_routing_find_peer/*, + test_routing_find_peer, + test_routing_provide /*, test_routing_find_providers, test_routing_provide, test_routing_supernode_get_value,