diff --git a/core/ipfs_node.c b/core/ipfs_node.c index 1d9ed5b..b5233c4 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -20,7 +20,7 @@ struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* n // bitswap libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node)); // multistream - libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(node)); + libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(retVal)); } return retVal; } diff --git a/core/null.c b/core/null.c index 2e80ded..0b68791 100644 --- a/core/null.c +++ b/core/null.c @@ -55,77 +55,30 @@ void ipfs_null_connection (void *ptr) { libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); - if (libp2p_net_multistream_negotiate(session)) { - // Someone has connected and successfully negotiated multistream. Now talk to them... - int unsuccessful_max = 30; - int unsuccessful_counter = 0; - for(;;) { - // Wait for them to ask something... - unsigned char* results = NULL; - size_t bytes_read = 0; - if (null_shutting_down) { - libp2p_logger_debug("null", "%s null shutting down before read.\n", connection_param->local_node->identity->peer->id); - // this service is shutting down. Ignore the request and exit the loop - break; - } - // see if we have something to read - retVal = session->default_stream->peek(session); - if (retVal < 0) { // error - libp2p_logger_debug("null", "Peer returned %d. Exiting loop\n", retVal); - retVal = -1; - break; - } - if (retVal == 0) { // nothing to read - sleep(1); - unsuccessful_counter++; - if (unsuccessful_counter >= unsuccessful_max) { - libp2p_logger_debug("null", "We've tried %d times in the daemon loop. Exiting.\n", unsuccessful_counter); - retVal = -1; - break; - } - continue; - } - if (retVal > 0 && !session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { - // it said it was ready, but something happened - libp2p_logger_debug("null", "Peek said there was something there, but there was not. Exiting.\n"); - retVal = -1; - break; - } - if (null_shutting_down) { - libp2p_logger_debug("null", "%s null shutting down after read.\n", connection_param->local_node->identity->peer->id); - // this service is shutting down. Ignore the request and exit the loop - retVal = -1; - break; - } - - // We actually got something. Process the request... - unsuccessful_counter = 0; - libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); - retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers); - free(results); - if (retVal == -1) { - libp2p_logger_debug("null", "protocol_marshal returned error.\n"); - break; - } else if (retVal == 0) { - // clean up, but let someone else handle this from now on - libp2p_logger_debug("null", "protocol_marshal returned 0. The daemon will no longer handle this.\n"); - break; - } else { - libp2p_logger_debug("null", "protocol_marshal returned 1. Looping again.\n"); - } - } - } else { - libp2p_logger_log("null", LOGLEVEL_DEBUG, "Multistream negotiation failed\n"); + // try to read from the network + uint8_t *results = 0; + size_t bytes_read = 0; + // handle the call + for(;;) { + // immediately attempt to negotiate multistream + if (!libp2p_net_multistream_send_protocol(session)) + break; + if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT)) { + // problem reading; + break; + } + retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers); + if (results != NULL) + free(results); + // exit the loop on error (or if they ask us to no longer loop by returning 0) + if (retVal <= 0) + break; } (*(connection_param->count))--; // update counter. if (connection_param->ip != NULL) free(connection_param->ip); free (connection_param); - if (retVal != 0) { - libp2p_logger_debug("null", "%s Freeing session context.\n", connection_param->local_node->identity->peer->id); - //libp2p_session_context_free(session); - } return; } @@ -148,7 +101,7 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) { // try to connect if we aren't already if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 2)) { + if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) { return 0; } } diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 45f5f52..a1d9dff 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -19,7 +19,7 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer)); // get a connection to the peer if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, 10); + libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10); if(peer->connection_type != CONNECTION_TYPE_CONNECTED) return 0; } diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 1664c1d..e2a381a 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -384,7 +384,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context if (need_to_connect) { if (!connected) { // connect - connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, 0); + connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0); } if (connected) { // build a message diff --git a/journal/journal.c b/journal/journal.c index 524e639..8b582a7 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -103,7 +103,7 @@ int ipfs_journal_free_records(struct Libp2pVector* records) { int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) { if (peer->connection_type != CONNECTION_TYPE_CONNECTED) - libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, 10); + libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, node->repo->config->datastore, 10); if (peer->connection_type != CONNECTION_TYPE_CONNECTED) return 0; // protobuf the message diff --git a/routing/online.c b/routing/online.c index 9026feb..b7ee7bb 100644 --- a/routing/online.c +++ b/routing/online.c @@ -317,7 +317,7 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee int retVal = 0; if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 5)) + if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) goto exit; } if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { @@ -438,7 +438,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k if (!libp2p_peer_is_connected(current_peer)) { // attempt to connect. If unsuccessful, continue in the loop. libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n"); - if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, 5)) { + if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) { libp2p_logger_debug("online", "Peer connected\n"); if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) { libp2p_logger_debug("online", "Retrieved a value\n"); @@ -512,7 +512,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { return -1; // this should never happen } if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) - if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 2)) { + if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) { libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer)); } } diff --git a/test/journal/test_journal.h b/test/journal/test_journal.h index 49eebea..91b86dd 100644 --- a/test/journal/test_journal.h +++ b/test/journal/test_journal.h @@ -121,7 +121,7 @@ int test_journal_server_1() { ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); ipfs_node_free(local_node); - libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n"); + libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n"); pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); thread_started = 1; @@ -182,7 +182,7 @@ int test_journal_server_2() { pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); thread_started = 1; - sleep(30); + sleep(120); retVal = 1; exit: diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index 949eaf7..b050bd5 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -233,7 +233,7 @@ int test_routing_find_providers() { struct Libp2pPeer *remote_peer = NULL; for(int i = 0; i < result->total; i++) { remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i); - if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(&local_node.identity->private_key, remote_peer, local_node.peerstore, 5)) { + if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(&local_node.identity->private_key, remote_peer, local_node.peerstore, local_node.repo->config->datastore, 5)) { break; } remote_peer = NULL; diff --git a/test/test_helper.c b/test/test_helper.c index ab99848..ec283f3 100644 --- a/test/test_helper.c +++ b/test/test_helper.c @@ -205,18 +205,24 @@ int drop_build_open_repo(const char* path, struct FSRepo** fs_repo, const char* if (config_filename_to_copy != NULL) { // attach config filename to path - char config[strlen(path) + 7]; + char *config = (char*) malloc(strlen(path) + 7); strcpy(config, path); // erase slash if there is one if (config[strlen(path)-1] == '/') config[strlen(path)-1] = 0; strcat(config, "/config"); // delete the old - if (unlink(config) != 0) + if (unlink(config) != 0) { + free(config); return 0; + } // copy pre-built config file into directory - if (cp(config, config_filename_to_copy) < 0) + if (cp(config, config_filename_to_copy) < 0) { + fprintf(stderr, "Unable to copy %s to %s. Error number %d.\n", config_filename_to_copy, config, errno); + free(config); return 0; + } + free(config); } if (!ipfs_repo_fsrepo_new(path, NULL, fs_repo))