Adding IpfsNode to api startup

This commit is contained in:
jmjatlanta 2017-09-20 07:32:12 -05:00
parent 262216f6db
commit c0419f2424
3 changed files with 60 additions and 25 deletions

View file

@ -274,7 +274,7 @@ size_t boundary_size(char *str, char *boundary, size_t limit)
* @param path is the ipfs address, obj is a pointer to be allocated and will be the return of the data, size is a pointer to return the data length. * @param path is the ipfs address, obj is a pointer to be allocated and will be the return of the data, size is a pointer to return the data length.
* @returns 1 when success is 0 when failure. * @returns 1 when success is 0 when failure.
*/ */
int get_object(char *path, unsigned char **obj, size_t *size) int get_object(struct IpfsNode* local_node, char *path, unsigned char **obj, size_t *size)
{ {
FILE* memstream_file = NULL; FILE* memstream_file = NULL;
char* memstream_char = NULL; char* memstream_char = NULL;
@ -327,15 +327,20 @@ int send_object(int socket, unsigned char *obj, size_t size)
return 0; // fail. return 0; // fail.
} }
struct ApiConnectionParam {
int index;
struct IpfsNode* this_node;
};
/** /**
* Pthread to take care of each client connection. * Pthread to take care of each client connection.
* @param ptr is the connection index in api_list, integer not pointer, cast required. * @param ptr an ApiConnectionParam
* @returns nothing * @returns nothing
*/ */
void *api_connection_thread (void *ptr) void *api_connection_thread (void *ptr)
{ {
int timeout, s, r; int timeout, s, r;
const INT_TYPE i = (INT_TYPE) ptr; struct ApiConnectionParam* params = (struct ApiConnectionParam*)ptr;
char resp[MAX_READ+1], buf[MAX_READ+1], *p, *body; char resp[MAX_READ+1], buf[MAX_READ+1], *p, *body;
char client[INET_ADDRSTRLEN]; char client[INET_ADDRSTRLEN];
struct s_request req; struct s_request req;
@ -345,7 +350,7 @@ void *api_connection_thread (void *ptr)
buf[MAX_READ] = '\0'; buf[MAX_READ] = '\0';
s = api_list.conns[i]->socket; s = api_list.conns[params->index]->socket;
timeout = api_list.timeout; timeout = api_list.timeout;
if (socket_read_select4(s, timeout) <= 0) { if (socket_read_select4(s, timeout) <= 0) {
@ -427,7 +432,7 @@ void *api_connection_thread (void *ptr)
path = req.buf + req.path; path = req.buf + req.path;
} }
if (get_object(path, &obj, &size)) { if (get_object(params->this_node, path, &obj, &size)) {
if (!send_object(s, obj, size)) { if (!send_object(s, obj, size)) {
libp2p_logger_error("api", "fail send_object.\n"); libp2p_logger_error("api", "fail send_object.\n");
} }
@ -504,16 +509,16 @@ void *api_connection_thread (void *ptr)
quit: quit:
if (req.buf) if (req.buf)
free(req.buf); free(req.buf);
if (inet_ntop(AF_INET, &(api_list.conns[i]->ipv4), client, INET_ADDRSTRLEN) == NULL) if (inet_ntop(AF_INET, &(api_list.conns[params->index]->ipv4), client, INET_ADDRSTRLEN) == NULL)
strcpy(client, "UNKNOW"); strcpy(client, "UNKNOW");
libp2p_logger_error("api", "Closing client connection %s:%d (%d).\n", client, api_list.conns[i]->port, i+1); libp2p_logger_error("api", "Closing client connection %s:%d (%d).\n", client, api_list.conns[params->index]->port, params->index+1);
pthread_mutex_lock(&conns_lock); pthread_mutex_lock(&conns_lock);
close(s); close(s);
free (api_list.conns[i]); free (api_list.conns[params->index]);
api_list.conns[i] = NULL; api_list.conns[params->index] = NULL;
conns_count--; conns_count--;
pthread_mutex_unlock(&conns_lock); pthread_mutex_unlock(&conns_lock);
free(params);
return NULL; return NULL;
} }
@ -555,6 +560,7 @@ void *api_listen_thread (void *ptr)
uint32_t ipv4; uint32_t ipv4;
uint16_t port; uint16_t port;
char client[INET_ADDRSTRLEN]; char client[INET_ADDRSTRLEN];
struct IpfsNode* local_node = (struct IpfsNode*)ptr;
conns_count = 0; conns_count = 0;
@ -583,7 +589,17 @@ void *api_listen_thread (void *ptr)
api_list.conns[i]->socket = s; api_list.conns[i]->socket = s;
api_list.conns[i]->ipv4 = ipv4; api_list.conns[i]->ipv4 = ipv4;
api_list.conns[i]->port = port; api_list.conns[i]->port = port;
if (pthread_create(&(api_list.conns[i]->pthread), NULL, api_connection_thread, (void*)i)) { // create a struct, which the thread is responsible to destroy
struct ApiConnectionParam* connection_param = (struct ApiConnectionParam*) malloc(sizeof(struct ApiConnectionParam));
if (connection_param == NULL) {
libp2p_logger_error("api", "api_listen_thread: Unable to allocate memory.\n");
pthread_mutex_unlock(&conns_lock);
close (s);
continue;
}
connection_param->index = i;
connection_param->this_node = local_node;
if (pthread_create(&(api_list.conns[i]->pthread), NULL, api_connection_thread, (void*)connection_param)) {
libp2p_logger_error("api", "Create pthread fail.\n"); libp2p_logger_error("api", "Create pthread fail.\n");
free (api_list.conns[i]); free (api_list.conns[i]);
api_list.conns[i] = NULL; api_list.conns[i] = NULL;
@ -601,17 +617,22 @@ void *api_listen_thread (void *ptr)
/** /**
* Start API interface daemon. * Start API interface daemon.
* @param port. * @param local_node the context
* @param max_conns. * @param max_conns.
* @param timeout time out of client connection. * @param timeout time out of client connection.
* @returns 0 when failure or 1 if success. * @returns 0 when failure or 1 if success.
*/ */
int api_start (uint16_t port, int max_conns, int timeout) int api_start (struct IpfsNode* local_node, int max_conns, int timeout)
{ {
int s; int s;
size_t alloc_size = sizeof(void*) * max_conns; size_t alloc_size = sizeof(void*) * max_conns;
api_list.ipv4 = hostname_to_ip("127.0.0.1"); // api is listening only on loopback. struct MultiAddress* my_address = multiaddress_new_from_string(local_node->repo->config->addresses->api);
char* ip = multiaddress_ip(my_address);
int port = multiaddress_port(my_address);
api_list.ipv4 = hostname_to_ip(ip); // api is listening only on loopback.
api_list.port = port; api_list.port = port;
if (listen_thread != 0) { if (listen_thread != 0) {
@ -636,7 +657,7 @@ int api_start (uint16_t port, int max_conns, int timeout)
} }
memset(api_list.conns, 0, alloc_size); memset(api_list.conns, 0, alloc_size);
if (pthread_create(&listen_thread, NULL, api_listen_thread, NULL)) { if (pthread_create(&listen_thread, NULL, api_listen_thread, (void*)local_node)) {
close (s); close (s);
free (api_list.conns); free (api_list.conns);
api_list.conns = NULL; api_list.conns = NULL;

View file

@ -80,6 +80,9 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) {
local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key); local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key);
local_node->exchange = ipfs_bitswap_new(local_node); local_node->exchange = ipfs_bitswap_new(local_node);
// fire up the API
api_start(local_node, 10, 5);
return 1; return 1;
} }
@ -138,6 +141,7 @@ int ipfs_node_offline_new(const char* repo_path, struct IpfsNode** node) {
*/ */
int ipfs_node_free(struct IpfsNode* node) { int ipfs_node_free(struct IpfsNode* node) {
if (node != NULL) { if (node != NULL) {
api_stop();
if (node->exchange != NULL) { if (node->exchange != NULL) {
node->exchange->Close(node->exchange); node->exchange->Close(node->exchange);
} }

View file

@ -2,14 +2,24 @@
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
int test_core_api_startup_shutdown() { int test_core_api_startup_shutdown() {
if (!api_start(1234, 10, 5)) { struct IpfsNode* local_node = NULL;
libp2p_logger_error("test_api", "api_start failed.\n"); char* repo_path = "/tmp/ipfs_1";
return 0; char* peer_id = NULL;
} int retVal = 0;
sleep(5);
if (!api_stop()) { if (!drop_and_build_repository(repo_path, 4001, NULL, &peer_id))
libp2p_logger_error("test_api", "api_stop failed.\n"); goto exit;
return 0;
} // this should start the api
return 1; if (!ipfs_node_online_new(repo_path, &local_node))
goto exit;
// TODO: test to see if it works
// TODO shut down
retVal = 1;
exit:
return retVal;
} }