solved pthread_mutex being shared across processes

yamux
John Jones 2017-09-25 11:25:34 -05:00
parent 982c7e9e6e
commit 5fc40e51ee
4 changed files with 148 additions and 5 deletions

View File

@ -594,13 +594,16 @@ quit:
free(req.buf);
if (inet_ntop(AF_INET, &( params->this_node->api_context->conns[params->index]->ipv4), client, INET_ADDRSTRLEN) == NULL)
strcpy(client, "UNKNOW");
libp2p_logger_error("api", "Closing client connection %s:%d (%d).\n", client, params->this_node->api_context->conns[params->index]->port, params->index+1);
libp2p_logger_debug("api", "Closing client connection %s:%d (%d).\n", client, params->this_node->api_context->conns[params->index]->port, params->index+1);
libp2p_logger_debug("api", "api_connection_thread: Attempting lock of mutex.\n");
pthread_mutex_lock(&params->this_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_connection_thread: Lock successful.\n");
close(s);
free ( params->this_node->api_context->conns[params->index]);
params->this_node->api_context->conns[params->index] = NULL;
params->this_node->api_context->conns_count--;
pthread_mutex_unlock(&params->this_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_connection_thread: Unlock successful.\n");
free(params);
return NULL;
}
@ -612,7 +615,9 @@ void api_connections_cleanup (struct IpfsNode* local_node)
{
int i;
libp2p_logger_debug("api", "api_connections_cleanup: Attempting lock.\n");
pthread_mutex_lock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_connections_cleanup: Lock successful.\n");
if (local_node->api_context->conns_count > 0 && local_node->api_context->conns) {
for (i = 0 ; i < local_node->api_context->max_conns ; i++) {
if (local_node->api_context->conns[i]->pthread) {
@ -629,6 +634,7 @@ void api_connections_cleanup (struct IpfsNode* local_node)
local_node->api_context->conns = NULL;
}
pthread_mutex_unlock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_connections_cleanup: Unlock successful\n");
}
/**
@ -658,12 +664,15 @@ void *api_listen_thread (void *ptr)
continue;
}
libp2p_logger_debug("api", "api_listen_thread: Lock\n");
pthread_mutex_lock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_listen_thread: Lock Complete\n");
for (i = 0 ; i < local_node->api_context->max_conns && local_node->api_context->conns[i] ; i++);
local_node->api_context->conns[i] = malloc (sizeof (struct s_conns));
if (!local_node->api_context->conns[i]) {
libp2p_logger_error("api", "Fail to allocate memory to accept connection.\n");
pthread_mutex_unlock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_listen_thread: memory failure, unlock successful.\n");
close (s);
continue;
}
@ -677,6 +686,7 @@ void *api_listen_thread (void *ptr)
if (connection_param == NULL) {
libp2p_logger_error("api", "api_listen_thread: Unable to allocate memory.\n");
pthread_mutex_unlock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_listen_thread: memory failure 2 unlock successful.\n");
close (s);
continue;
}
@ -691,8 +701,9 @@ void *api_listen_thread (void *ptr)
} else {
local_node->api_context->conns_count++;
}
libp2p_logger_debug("api", "API for %s: Accept connection %s:%d (%d/%d), pthread %d.\n", client, port, local_node->api_context->conns_count, local_node->api_context->max_conns, i+1);
libp2p_logger_debug("api", "API for %s: Accept connection %s:%d (%d/%d), pthread %d.\n", local_node->identity->peer->id, client, port, local_node->api_context->conns_count, local_node->api_context->max_conns, i+1);
pthread_mutex_unlock(&local_node->api_context->conns_lock);
libp2p_logger_debug("api", "api_listen_thread:Api connection cleanup unlock successful.\n");
}
api_connections_cleanup (local_node);
return NULL;
@ -708,6 +719,7 @@ struct ApiContext* api_context_new() {
context->port = 0;
context->socket = 0;
context->timeout = 0;
pthread_mutex_init(&context->conns_lock, NULL);
}
return context;
}

View File

@ -17,15 +17,15 @@ struct ApiContext {
uint16_t port;
int max_conns;
int timeout;
pthread_mutex_t conns_lock;
int conns_count;
pthread_t api_thread;
struct s_conns {
int socket;
uint32_t ipv4;
uint16_t port;
pthread_t pthread;
} **conns;
pthread_mutex_t conns_lock;
int conns_count;
pthread_t api_thread;
};
struct s_request {

View File

@ -217,3 +217,128 @@ int test_core_api_name_resolve() {
return retVal;
}
/**
* Like test_core_api_name_resolve, but split into 2 pieces
*/
int test_core_api_name_resolve_1() {
int retVal = 0;
pthread_t daemon_thread1;
int thread_started1 = 0;
char* ipfs_path1 = "/tmp/ipfs_1";
char* config_file1 = "config.test1.wo_journal";
struct FSRepo* fs_repo = NULL;
char hash[256] = "";
char peer_id1[256] = "";
struct CliArguments* args = NULL;
libp2p_logger_add_class("api");
libp2p_logger_add_class("test_api");
// repo 1
if (!drop_build_open_repo(ipfs_path1, &fs_repo, config_file1)) {
ipfs_repo_fsrepo_free(fs_repo);
libp2p_logger_error("test_api", "Unable to drop and build repository at %s\n", ipfs_path1);
goto exit;
}
sprintf(peer_id1, "/ipns/%s", fs_repo->config->identity->peer->id);
ipfs_repo_fsrepo_free(fs_repo);
// add a file to the first repo
uint8_t *bytes = (unsigned char*)"hello, world!\n";
char* filename = "test1.txt";
create_file(filename, bytes, strlen((char*)bytes));
struct HashtableNode* node;
size_t bytes_written;
struct IpfsNode *local_node = NULL;
ipfs_node_offline_new(ipfs_path1, &local_node);
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
memset(hash, 0, 256);
ipfs_cid_hash_to_base58(node->hash, node->hash_size, (unsigned char*)hash, 256);
libp2p_logger_debug("test_api", "The hash is %s.\n", hash);
ipfs_node_free(local_node);
ipfs_hashtable_node_free(node);
libp2p_logger_debug("test_api", "*** Firing up daemon ***\n");
pthread_create(&daemon_thread1, NULL, test_daemon_start, (void*)ipfs_path1);
thread_started1 = 1;
sleep(45);
retVal = 1;
exit:
cli_arguments_free(args);
ipfs_daemon_stop();
if (thread_started1)
pthread_join(daemon_thread1, NULL);
return retVal;
}
/***
* This should already be running before you run test_core_api_name_resolve_2
*/
int test_core_api_name_resolve_2() {
int retVal = 0;
pthread_t daemon_thread2;
int thread_started2 = 0;
char* ipfs_path2 = "/tmp/ipfs_2";
char* config_file2 = "config.test2.wo_journal";
struct FSRepo* fs_repo = NULL;
char peer_id1[256] = "QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4";
char* resolve_args[] = {"ipfs", "--config", ipfs_path2, "name", "resolve", peer_id1 };
struct CliArguments* args = NULL;
libp2p_logger_add_class("test_api");
libp2p_logger_add_class("api");
// build 2 repos... repo 2
if (!drop_build_open_repo(ipfs_path2, &fs_repo, config_file2)) {
ipfs_repo_fsrepo_free(fs_repo);
libp2p_logger_error("test_api", "Unable to drop and build repository at %s\n", ipfs_path2);
goto exit;
}
libp2p_logger_debug("test_api", "Changed the server id to %s.\n", fs_repo->config->identity->peer->id);
ipfs_repo_fsrepo_free(fs_repo);
libp2p_logger_debug("test_api", "*** Firing up daemon ***\n");
pthread_create(&daemon_thread2, NULL, test_daemon_start, (void*)ipfs_path2);
thread_started2 = 1;
libp2p_logger_debug("test_api", "Waiting 45 secs...\n");
sleep(45);
libp2p_logger_debug("test_api", "Wait is over\n");
// use a client of server2 to to ask for the "name resolve" on server 1
args = cli_arguments_new(6, resolve_args);
if (ipfs_name(args) == 0) {
goto exit;
}
retVal = 1;
exit:
cli_arguments_free(args);
ipfs_daemon_stop();
if (thread_started2)
pthread_join(daemon_thread2, NULL);
return retVal;
}
// publish a name to server 1
int test_core_api_name_resolve_3()
{
char* hash = "QmcbMZW8hcd46AfUsJUxQYTanHYDpeUq3pBuX5nihPEKD9";
char* publish_args[] = {"ipfs", "--config", "/tmp/ipfs_1", "name", "publish", hash };
struct CliArguments* args = NULL;
libp2p_logger_add_class("api");
libp2p_logger_add_class("test_api");
// publish name on server 1
args = cli_arguments_new(6, publish_args);
int retVal = ipfs_name(args);
cli_arguments_free(args);
args = NULL;
return retVal;
}

View File

@ -52,6 +52,9 @@ const char* names[] = {
"test_core_api_startup_shutdown",
"test_core_api_object_cat",
"test_core_api_name_resolve",
"test_core_api_name_resolve_1",
"test_core_api_name_resolve_2",
"test_core_api_name_resolve_3",
"test_daemon_startup_shutdown",
"test_datastore_list_journal",
"test_journal_db",
@ -118,6 +121,9 @@ int (*funcs[])(void) = {
test_core_api_startup_shutdown,
test_core_api_object_cat,
test_core_api_name_resolve,
test_core_api_name_resolve_1,
test_core_api_name_resolve_2,
test_core_api_name_resolve_3,
test_daemon_startup_shutdown,
test_datastore_list_journal,
test_journal_db,