diff --git a/core/daemon.c b/core/daemon.c index a9c2351..e64133d 100644 --- a/core/daemon.c +++ b/core/daemon.c @@ -33,8 +33,8 @@ int ipfs_daemon_start(char* repo_path) { // Create pthread for swarm listener. if (pthread_create(&work_pths[count_pths++], NULL, local_node->routing->Listen, &listen_param)) { - libp2p_logger_error("daemon", "Error creating thread for ipfs null listen\n"); - goto exit; + libp2p_logger_error("daemon", "Error creating thread for ipfs null listen\n"); + goto exit; } local_node->routing->Bootstrap(local_node->routing); diff --git a/core/null.c b/core/null.c index 791aff2..899d018 100644 --- a/core/null.c +++ b/core/null.c @@ -50,7 +50,7 @@ int protocol_compare(const unsigned char* incoming, size_t incoming_size, const * @param incoming_size the size of the request in bytes * @param session the session context * @param connection_param the connection parameters - * @returns True(1) on success, False(0) on error + * @returns 1 to indicate it was handled, 0 to indicate that the daemon should no longer loop (future messages will be handled by another message loop), and -1 to indicate a problem */ int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct IpfsNode* local_node) { if (protocol_compare(incoming, incoming_size, "/secio")) { @@ -58,13 +58,14 @@ int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size if (!libp2p_secio_handshake(session, &local_node->identity->private_key, local_node->peerstore, 1)) { // rejecting connection libp2p_logger_debug("null", "Secure IO connection failed\n"); - return 0; + return -1; } libp2p_logger_debug("null", "Secure IO connection successful.\n"); + return 0; } else if (protocol_compare(incoming, incoming_size, "/nodeio")) { libp2p_logger_debug("null", "Attempting a nodeio connection.\n"); if (!libp2p_nodeio_handshake(session)) { - return 0; + return -1; } // loop through file requests int _continue = 1; @@ -97,18 +98,19 @@ int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size libp2p_logger_log("null", LOGLEVEL_DEBUG, "Attempting kademlia connection...\n"); if (!libp2p_routing_dht_handshake(session)) { libp2p_logger_log("null", LOGLEVEL_DEBUG, "kademlia connection handshake failed\n"); - return 0; + return -1; } // this handles 1 transaction libp2p_routing_dht_handle_message(session, local_node->peerstore, local_node->providerstore); libp2p_logger_log("null", LOGLEVEL_DEBUG, "kademlia message handled\n"); } else if (protocol_compare(incoming, incoming_size, "/ipfs/bitswap/")) { libp2p_logger_debug("null", "Attempting bitswap connection...\n"); - return ipfs_bitswap_network_handle_message(local_node, session, incoming, incoming_size); + if (!ipfs_bitswap_network_handle_message(local_node, session, incoming, incoming_size)) + return -1; } else { libp2p_logger_error("null", "There was a problem with this connection. It is nothing I can handle. Disconnecting.\n"); - return 0; + return -1; } return 1; } @@ -118,27 +120,22 @@ int ipfs_multistream_marshal(const unsigned char* incoming, size_t incoming_size * * @param ptr a pointer to a null_connection_params struct */ -void ipfs_null_connection (void *ptr) -{ +void ipfs_null_connection (void *ptr) { struct null_connection_params *connection_param = (struct null_connection_params*) ptr; - - // TODO: when should we exit the for loop and disconnect? + int retVal = 0; struct SessionContext* session = libp2p_session_context_new(); if (session == NULL) { - libp2p_logger_error("null", "Unable to allocate SessionContext. Out of memory?\n"); - return; + libp2p_logger_error("null", "Unable to allocate SessionContext. Out of memory?\n"); + return; } session->insecure_stream = libp2p_net_multistream_stream_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); - - libp2p_logger_debug("null", "%s null has a file descriptor of %d\n", connection_param->local_node->identity->peer->id, *((int*)session->insecure_stream->socket_descriptor) ); - session->default_stream = session->insecure_stream; session->datastore = connection_param->local_node->repo->config->datastore; session->filestore = connection_param->local_node->repo->config->filestore; - libp2p_logger_log("null", LOGLEVEL_INFO, "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); + libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); if (libp2p_net_multistream_negotiate(session)) { // Someone has connected and successfully negotiated multistream. Now talk to them... @@ -153,44 +150,64 @@ void ipfs_null_connection (void *ptr) // this service is shutting down. Ignore the request and exit the loop break; } - if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { - // the read was unsuccessful wait a sec + // see if we have something to read + retVal = session->default_stream->peek(session); + if (retVal < 0) { // error + libp2p_logger_debug("null", "Peer returned %d. Exiting loop\n", retVal); + retVal = -1; + break; + } + if (retVal == 0) { // nothing to read sleep(1); unsuccessful_counter++; - if (unsuccessful_counter >= unsuccessful_max) + if (unsuccessful_counter >= unsuccessful_max) { + libp2p_logger_debug("null", "We've tried %d times in the daemon loop. Exiting.\n", unsuccessful_counter); + retVal = -1; break; + } continue; } + if (retVal > 0 && !session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) { + // it said it was ready, but something happened + libp2p_logger_debug("null", "Peek said there was something there, but there was not. Exiting.\n"); + retVal = -1; + break; + } if (null_shutting_down) { libp2p_logger_debug("null", "%s null shutting down after read.\n", connection_param->local_node->identity->peer->id); // this service is shutting down. Ignore the request and exit the loop + retVal = -1; break; } - if (bytes_read == 0) { - // They did not ask for anything. There was a timeout. Wait again. - continue; - } // We actually got something. Process the request... unsuccessful_counter = 0; libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read); - int retVal = ipfs_multistream_marshal(results, bytes_read, session, connection_param->local_node); + retVal = ipfs_multistream_marshal(results, bytes_read, session, connection_param->local_node); free(results); - if (!retVal) { + if (retVal == -1) { libp2p_logger_debug("null", "ipfs_null_marshal returned false\n"); break; + } else if (retVal == 0) { + // clean up, but let someone else handle this from now on + libp2p_logger_debug("null", "ipfs_null_marshal returns 0. The daemon will no longer handle this.\n"); + break; + } else { + libp2p_logger_debug("null", "ipfs_null_marshal returned 1. Looping again."); } } } else { libp2p_logger_log("null", LOGLEVEL_DEBUG, "Multistream negotiation failed\n"); } - libp2p_logger_debug("null", "%s Freeing session context.\n", connection_param->local_node->identity->peer->id); - (*(connection_param->count))--; // update counter. - if (connection_param->ip != NULL) - free(connection_param->ip); - free (connection_param); - libp2p_session_context_free(session); + (*(connection_param->count))--; // update counter. + if (connection_param->ip != NULL) + free(connection_param->ip); + free (connection_param); + if (retVal != 0) { + libp2p_logger_debug("null", "%s Freeing session context.\n", connection_param->local_node->identity->peer->id); + libp2p_session_context_free(session); + } return; } @@ -215,14 +232,15 @@ void* ipfs_null_listen (void *ptr) libp2p_logger_error("null", "Ipfs listening on %d\n", listen_param->port); + // the main loop, listening for new connections for (;;) { - libp2p_logger_debug("null", "%s Attempting socket read with fd %d.\n", listen_param->local_node->identity->peer->id, socketfd); - int numDescriptors = socket_read_select4(socketfd, 2); - if (null_shutting_down) { - libp2p_logger_debug("null", "%s null_listen shutting down.\n", listen_param->local_node->identity->peer->id); - break; - } - if (numDescriptors > 0) { + //libp2p_logger_debug("null", "%s Attempting socket read with fd %d.\n", listen_param->local_node->identity->peer->id, socketfd); + int numDescriptors = socket_read_select4(socketfd, 2); + if (null_shutting_down) { + libp2p_logger_debug("null", "%s null_listen shutting down.\n", listen_param->local_node->identity->peer->id); + break; + } + if (numDescriptors > 0) { s = socket_accept4(socketfd, &(listen_param->ipv4), &(listen_param->port)); if (count >= CONNECTIONS) { // limit reached. close (s); @@ -245,7 +263,7 @@ void* ipfs_null_listen (void *ptr) // Create pthread for ipfs_null_connection. thpool_add_work(thpool, ipfs_null_connection, connection_param); } - } + } } thpool_destroy(thpool); diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 903ef46..d424512 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -3,6 +3,7 @@ */ #include #include // for sleep() +#include "libp2p/utils/logger.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/exchange/exchange.h" #include "ipfs/exchange/bitswap/bitswap.h" @@ -42,6 +43,7 @@ struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) { // Start the threads for the network ipfs_bitswap_engine_start(bitswapContext); + libp2p_logger_debug("bitswap", "Bitswap engine started\n"); } return exchange; } diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index be6ab1c..fa5e4fc 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -1,4 +1,5 @@ #include +#include "libp2p/utils/logger.h" #include "ipfs/core/null.h" #include "ipfs/exchange/bitswap/engine.h" #include "ipfs/exchange/bitswap/wantlist_queue.h" @@ -57,22 +58,80 @@ void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { struct BitswapContext* context = (struct BitswapContext*)ctx; // the loop - while (!context->bitswap_engine->shutting_down) { - struct PeerRequest* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue); - if (item != NULL) { - // did they send us something over the network? - unsigned char* buffer = NULL; - size_t buffer_len = 0; - if (item->peer->sessionContext->default_stream->read(item->peer->sessionContext, &buffer, &buffer_len, 1)) { - // handle it - ipfs_multistream_marshal(buffer, buffer_len, item->peer->sessionContext, context->ipfsNode); - } + struct Libp2pLinkedList* current = context->ipfsNode->peerstore->head_entry; + int did_some_processing = 0; + while (1) { + if (context->bitswap_engine->shutting_down) // system shutting down + break; - // if there is something on the queue process it. - ipfs_bitswap_peer_request_process_entry(context, item); + if (current == NULL) { // the PeerStore is empty + libp2p_logger_debug("bitswap_engine", "Peerstore is empty. Pausing.\n"); + sleep(1); + continue; + } + if (current->item == NULL) { + // error + libp2p_logger_error("bitswap_engine", "Peerstore has a null entry.\n"); + break; + } + // see if they want something + struct Libp2pPeer* current_peer_entry = ((struct PeerEntry*)current->item)->peer; + if (current_peer_entry == NULL) { + // error + libp2p_logger_error("bitswap_engine", "Peerstore has an item that is a null peer.\n"); + break; + } + if (current_peer_entry->connection_type == CONNECTION_TYPE_CONNECTED) { + libp2p_logger_debug("bitswap_engine", "We're connected to this peer. Lets see if there is a message waiting for us.\n"); + int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext); + if (retVal < 0) { + libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n"); + libp2p_peer_handle_connection_error(current_peer_entry); + } else if (retVal > 0) { + libp2p_logger_debug("bitswap_engine", "Something waiting on network for peer %s.\n", current_peer_entry->id); + unsigned char* buffer = NULL; + size_t buffer_len = 0; + if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) { + // handle it + int retVal = ipfs_multistream_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode); + did_some_processing = 1; + if (retVal == -1) { + // there was a problem. Clean up + current_peer_entry->connection_type = CONNECTION_TYPE_NOT_CONNECTED; + libp2p_session_context_free(current_peer_entry->sessionContext); + } + } + } } else { - // if there is nothing on the queue, wait... - sleep(2); + if (current_peer_entry->is_local) { + //libp2p_logger_debug("bitswap_engine", "Local peer %s. Skipping.\n", current_peer_entry->id); + } else + libp2p_logger_debug("bitswap_engine", "We are not connected to this peer %s.\n", current_peer_entry->id); + } + // attempt to get queue and process + struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_queue_find_entry(context->peerRequestQueue, current_peer_entry); + if (entry != NULL) { + libp2p_logger_debug("bitswap_engine", "Processing queue for peer %s.\n", current_peer_entry->id); + // we have a queue. Do some queue processing + struct PeerRequest* item = entry->current; + if (item != NULL) { + // if there is something on the queue process it. + if (ipfs_bitswap_peer_request_process_entry(context, item)) + did_some_processing = 1; + } + } + // get next peer (or reset to head entry) + if (current->next == NULL) { + current = context->ipfsNode->peerstore->head_entry; + if (!did_some_processing) { + // we did nothing in this run through the peerstore. sleep for a sec + sleep(1); + } + did_some_processing = 0; + } + else { + libp2p_logger_debug("bitswap_engine", "Moving on to the next peer.\n"); + current = current->next; } } return NULL; diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index d6c1b25..326f525 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -50,22 +50,30 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru * @param cid the cid to remove * @returns true(1) on success, false(0) otherwise */ -int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection, struct Cid* cid) { +int ipfs_bitswap_network_adjust_cid_queue(struct Libp2pVector* collection, struct Cid* cid, int cancel) { if (collection == NULL || cid == NULL) return 0; for(int i = 0; i < collection->total; collection++) { const struct CidEntry* current = (const struct CidEntry*)libp2p_utils_vector_get(collection, i); if (ipfs_cid_compare(current->cid, cid) == 0) { - libp2p_utils_vector_delete(collection, i); + if (cancel) + libp2p_utils_vector_delete(collection, i); return 1; } } + + // not found. Add it if we're not cancelling + if (!cancel) { + struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new(); + cidEntry->cid = cid; + cidEntry->cancel = 0; + libp2p_utils_vector_add(collection, cidEntry); + } + return 0; } - - /*** * Handle a raw incoming bitswap message from the network * @param node us @@ -133,14 +141,7 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc ipfs_cid_free(cid); return 0; } - if (entry->cancel) - ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid); - else { - struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new(); - cidEntry->cid = cid; - cidEntry->cancel = 0; - libp2p_utils_vector_add(queueEntry->current->cids_they_want, cidEntry); - } + ipfs_bitswap_network_adjust_cid_queue(queueEntry->current->cids_they_want, cid, entry->cancel); } } return 1; diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 0d449c0..e71a54f 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -5,6 +5,7 @@ #include #include "libp2p/conn/session.h" +#include "libp2p/utils/logger.h" #include "ipfs/cid/cid.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" #include "ipfs/exchange/bitswap/message.h" @@ -191,6 +192,19 @@ int ipfs_bitswap_peer_request_something_to_do(struct PeerRequestEntry* entry) { return 1; if (ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want)) return 1; + // is there something waiting for us on the network? + if (request->peer->connection_type == CONNECTION_TYPE_CONNECTED) { + int retVal = request->peer->sessionContext->default_stream->peek(request->peer->sessionContext); + if (retVal < 0) { + libp2p_logger_debug("peer_request_queue", "Connection returned %d. Marking connection NOT CONNECTED.\n", retVal); + libp2p_peer_handle_connection_error(request->peer); + return 0; + } + if (retVal > 0) { + libp2p_logger_debug("peer_request_queue", "We have something to read. %d bytes.\n", retVal); + } + return retVal; + } } return 0; } @@ -206,8 +220,8 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* pthread_mutex_lock(&queue->queue_mutex); struct PeerRequestEntry* entry = queue->first; if (entry != NULL) { - retVal = entry->current; if (ipfs_bitswap_peer_request_something_to_do(entry)) { + retVal = entry->current; // move to the end of the queue if (queue->first->next != NULL) { queue->first = queue->first->next; @@ -295,7 +309,7 @@ int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext* * Handle a PeerRequest * @param context the BitswapContext * @param request the request to process - * @returns true(1) on succes, otherwise false(0) + * @returns true(1) if something was done, otherwise false(0) */ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) { // determine if we have enough information to continue diff --git a/main/main.c b/main/main.c index 4d86cab..c50ecc3 100644 --- a/main/main.c +++ b/main/main.c @@ -1,6 +1,7 @@ #include #include +#include "libp2p/utils/logger.h" #include "ipfs/repo/init.h" #include "ipfs/importer/importer.h" #include "ipfs/importer/exporter.h" @@ -98,6 +99,14 @@ int parse_arguments(int argc, char** argv) { * The beginning */ int main(int argc, char** argv) { + // for debugging + libp2p_logger_add_class("null"); + libp2p_logger_add_class("bitswap"); + libp2p_logger_add_class("secio"); + libp2p_logger_add_class("peer_request_queue"); + libp2p_logger_add_class("bitswap_engine"); + libp2p_logger_add_class("peerstore"); + strip_quotes(argc, argv); int retVal = parse_arguments(argc, argv); switch (retVal) { @@ -126,4 +135,5 @@ int main(int argc, char** argv) { ipfs_ping(argc, argv); break; } + libp2p_logger_free(); } diff --git a/repo/fsrepo/fs_repo.c b/repo/fsrepo/fs_repo.c index 6d55b00..e89549b 100644 --- a/repo/fsrepo/fs_repo.c +++ b/repo/fsrepo/fs_repo.c @@ -408,8 +408,12 @@ int fs_repo_open_config(struct FSRepo* repo) { || strcmp((char*)test_peer_id, repo->config->identity->peer->id) != 0) { free(data); free(priv_key_base64); + free(test_peer_id); return 0; } + repo->config->identity->peer->is_local = 1; + free(test_peer_id); + // now the datastore //int datastore_position = _find_token(data, tokens, num_tokens, 0, "Datastore"); _get_json_string_value(data, tokens, num_tokens, curr_pos, "Type", &repo->config->datastore->type);