From 5fc40e51eeb829a7ec6d4fd8f8882ca38be8ab6e Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 25 Sep 2017 11:25:34 -0500 Subject: [PATCH] solved pthread_mutex being shared across processes --- core/api.c | 16 ++++- include/ipfs/core/api.h | 6 +- test/core/test_api.h | 125 ++++++++++++++++++++++++++++++++++++++++ test/testit.c | 6 ++ 4 files changed, 148 insertions(+), 5 deletions(-) diff --git a/core/api.c b/core/api.c index 6534041..86fe481 100644 --- a/core/api.c +++ b/core/api.c @@ -594,13 +594,16 @@ quit: free(req.buf); if (inet_ntop(AF_INET, &( params->this_node->api_context->conns[params->index]->ipv4), client, INET_ADDRSTRLEN) == NULL) strcpy(client, "UNKNOW"); - libp2p_logger_error("api", "Closing client connection %s:%d (%d).\n", client, params->this_node->api_context->conns[params->index]->port, params->index+1); + libp2p_logger_debug("api", "Closing client connection %s:%d (%d).\n", client, params->this_node->api_context->conns[params->index]->port, params->index+1); + libp2p_logger_debug("api", "api_connection_thread: Attempting lock of mutex.\n"); pthread_mutex_lock(¶ms->this_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_connection_thread: Lock successful.\n"); close(s); free ( params->this_node->api_context->conns[params->index]); params->this_node->api_context->conns[params->index] = NULL; params->this_node->api_context->conns_count--; pthread_mutex_unlock(¶ms->this_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_connection_thread: Unlock successful.\n"); free(params); return NULL; } @@ -612,7 +615,9 @@ void api_connections_cleanup (struct IpfsNode* local_node) { int i; + libp2p_logger_debug("api", "api_connections_cleanup: Attempting lock.\n"); pthread_mutex_lock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_connections_cleanup: Lock successful.\n"); if (local_node->api_context->conns_count > 0 && local_node->api_context->conns) { for (i = 0 ; i < local_node->api_context->max_conns ; i++) { if (local_node->api_context->conns[i]->pthread) { @@ -629,6 +634,7 @@ void api_connections_cleanup (struct IpfsNode* local_node) local_node->api_context->conns = NULL; } pthread_mutex_unlock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_connections_cleanup: Unlock successful\n"); } /** @@ -658,12 +664,15 @@ void *api_listen_thread (void *ptr) continue; } + libp2p_logger_debug("api", "api_listen_thread: Lock\n"); pthread_mutex_lock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_listen_thread: Lock Complete\n"); for (i = 0 ; i < local_node->api_context->max_conns && local_node->api_context->conns[i] ; i++); local_node->api_context->conns[i] = malloc (sizeof (struct s_conns)); if (!local_node->api_context->conns[i]) { libp2p_logger_error("api", "Fail to allocate memory to accept connection.\n"); pthread_mutex_unlock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_listen_thread: memory failure, unlock successful.\n"); close (s); continue; } @@ -677,6 +686,7 @@ void *api_listen_thread (void *ptr) if (connection_param == NULL) { libp2p_logger_error("api", "api_listen_thread: Unable to allocate memory.\n"); pthread_mutex_unlock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_listen_thread: memory failure 2 unlock successful.\n"); close (s); continue; } @@ -691,8 +701,9 @@ void *api_listen_thread (void *ptr) } else { local_node->api_context->conns_count++; } - libp2p_logger_debug("api", "API for %s: Accept connection %s:%d (%d/%d), pthread %d.\n", client, port, local_node->api_context->conns_count, local_node->api_context->max_conns, i+1); + libp2p_logger_debug("api", "API for %s: Accept connection %s:%d (%d/%d), pthread %d.\n", local_node->identity->peer->id, client, port, local_node->api_context->conns_count, local_node->api_context->max_conns, i+1); pthread_mutex_unlock(&local_node->api_context->conns_lock); + libp2p_logger_debug("api", "api_listen_thread:Api connection cleanup unlock successful.\n"); } api_connections_cleanup (local_node); return NULL; @@ -708,6 +719,7 @@ struct ApiContext* api_context_new() { context->port = 0; context->socket = 0; context->timeout = 0; + pthread_mutex_init(&context->conns_lock, NULL); } return context; } diff --git a/include/ipfs/core/api.h b/include/ipfs/core/api.h index 6575df6..f0a5b93 100644 --- a/include/ipfs/core/api.h +++ b/include/ipfs/core/api.h @@ -17,15 +17,15 @@ struct ApiContext { uint16_t port; int max_conns; int timeout; + pthread_mutex_t conns_lock; + int conns_count; + pthread_t api_thread; struct s_conns { int socket; uint32_t ipv4; uint16_t port; pthread_t pthread; } **conns; - pthread_mutex_t conns_lock; - int conns_count; - pthread_t api_thread; }; struct s_request { diff --git a/test/core/test_api.h b/test/core/test_api.h index 9d541dd..e712d5b 100644 --- a/test/core/test_api.h +++ b/test/core/test_api.h @@ -217,3 +217,128 @@ int test_core_api_name_resolve() { return retVal; } + +/** + * Like test_core_api_name_resolve, but split into 2 pieces + */ +int test_core_api_name_resolve_1() { + int retVal = 0; + pthread_t daemon_thread1; + int thread_started1 = 0; + char* ipfs_path1 = "/tmp/ipfs_1"; + char* config_file1 = "config.test1.wo_journal"; + struct FSRepo* fs_repo = NULL; + char hash[256] = ""; + char peer_id1[256] = ""; + struct CliArguments* args = NULL; + + libp2p_logger_add_class("api"); + libp2p_logger_add_class("test_api"); + + // repo 1 + if (!drop_build_open_repo(ipfs_path1, &fs_repo, config_file1)) { + ipfs_repo_fsrepo_free(fs_repo); + libp2p_logger_error("test_api", "Unable to drop and build repository at %s\n", ipfs_path1); + goto exit; + } + sprintf(peer_id1, "/ipns/%s", fs_repo->config->identity->peer->id); + ipfs_repo_fsrepo_free(fs_repo); + + // add a file to the first repo + uint8_t *bytes = (unsigned char*)"hello, world!\n"; + char* filename = "test1.txt"; + create_file(filename, bytes, strlen((char*)bytes)); + struct HashtableNode* node; + size_t bytes_written; + struct IpfsNode *local_node = NULL; + ipfs_node_offline_new(ipfs_path1, &local_node); + ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); + memset(hash, 0, 256); + ipfs_cid_hash_to_base58(node->hash, node->hash_size, (unsigned char*)hash, 256); + libp2p_logger_debug("test_api", "The hash is %s.\n", hash); + ipfs_node_free(local_node); + ipfs_hashtable_node_free(node); + + libp2p_logger_debug("test_api", "*** Firing up daemon ***\n"); + pthread_create(&daemon_thread1, NULL, test_daemon_start, (void*)ipfs_path1); + thread_started1 = 1; + + + sleep(45); + + retVal = 1; + exit: + cli_arguments_free(args); + ipfs_daemon_stop(); + if (thread_started1) + pthread_join(daemon_thread1, NULL); + return retVal; +} + +/*** + * This should already be running before you run test_core_api_name_resolve_2 + */ +int test_core_api_name_resolve_2() { + int retVal = 0; + pthread_t daemon_thread2; + int thread_started2 = 0; + char* ipfs_path2 = "/tmp/ipfs_2"; + char* config_file2 = "config.test2.wo_journal"; + struct FSRepo* fs_repo = NULL; + char peer_id1[256] = "QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4"; + char* resolve_args[] = {"ipfs", "--config", ipfs_path2, "name", "resolve", peer_id1 }; + struct CliArguments* args = NULL; + + libp2p_logger_add_class("test_api"); + libp2p_logger_add_class("api"); + + // build 2 repos... repo 2 + if (!drop_build_open_repo(ipfs_path2, &fs_repo, config_file2)) { + ipfs_repo_fsrepo_free(fs_repo); + libp2p_logger_error("test_api", "Unable to drop and build repository at %s\n", ipfs_path2); + goto exit; + } + libp2p_logger_debug("test_api", "Changed the server id to %s.\n", fs_repo->config->identity->peer->id); + ipfs_repo_fsrepo_free(fs_repo); + + libp2p_logger_debug("test_api", "*** Firing up daemon ***\n"); + pthread_create(&daemon_thread2, NULL, test_daemon_start, (void*)ipfs_path2); + thread_started2 = 1; + + libp2p_logger_debug("test_api", "Waiting 45 secs...\n"); + sleep(45); + libp2p_logger_debug("test_api", "Wait is over\n"); + + // use a client of server2 to to ask for the "name resolve" on server 1 + args = cli_arguments_new(6, resolve_args); + if (ipfs_name(args) == 0) { + goto exit; + } + + retVal = 1; + exit: + cli_arguments_free(args); + ipfs_daemon_stop(); + if (thread_started2) + pthread_join(daemon_thread2, NULL); + return retVal; +} + +// publish a name to server 1 +int test_core_api_name_resolve_3() +{ + char* hash = "QmcbMZW8hcd46AfUsJUxQYTanHYDpeUq3pBuX5nihPEKD9"; + char* publish_args[] = {"ipfs", "--config", "/tmp/ipfs_1", "name", "publish", hash }; + struct CliArguments* args = NULL; + + libp2p_logger_add_class("api"); + libp2p_logger_add_class("test_api"); + + // publish name on server 1 + args = cli_arguments_new(6, publish_args); + int retVal = ipfs_name(args); + cli_arguments_free(args); + args = NULL; + return retVal; + +} diff --git a/test/testit.c b/test/testit.c index aad602c..337bcb8 100644 --- a/test/testit.c +++ b/test/testit.c @@ -52,6 +52,9 @@ const char* names[] = { "test_core_api_startup_shutdown", "test_core_api_object_cat", "test_core_api_name_resolve", + "test_core_api_name_resolve_1", + "test_core_api_name_resolve_2", + "test_core_api_name_resolve_3", "test_daemon_startup_shutdown", "test_datastore_list_journal", "test_journal_db", @@ -118,6 +121,9 @@ int (*funcs[])(void) = { test_core_api_startup_shutdown, test_core_api_object_cat, test_core_api_name_resolve, + test_core_api_name_resolve_1, + test_core_api_name_resolve_2, + test_core_api_name_resolve_3, test_daemon_startup_shutdown, test_datastore_list_journal, test_journal_db,