more changes to implement api for dht provide and get

yamux
jmjatlanta 2017-09-27 11:45:36 -05:00
parent 76b860c06f
commit f5250a71f3
5 changed files with 201 additions and 98 deletions

View File

@ -505,26 +505,24 @@ void *api_connection_thread (void *ptr)
// now do something with the request we have built
struct HttpRequest* http_request = api_build_http_request(&req);
if (http_request != NULL) {
char* response_text = NULL;
if (!ipfs_core_http_request_process(params->this_node, http_request, &response_text)) {
struct HttpResponse* http_response = NULL;
if (!ipfs_core_http_request_process(params->this_node, http_request, &http_response)) {
libp2p_logger_error("api", "ipfs_core_http_request_process returned false.\n");
// 404
write_str(s, HTTP_404);
} else {
// now send the results
snprintf(resp, sizeof(resp), "%s 200 OK\r\n" \
"Content-Type: application/json\r\n"
"Server: c-ipfs/0.0.0-dev\r\n"
"X-Chunked-Output: 1\r\n"
"Connection: close\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"%x\r\n"
"%s\r\n"
"0\r\n\r\n"
,req.buf + req.http_ver, strlen(response_text), response_text);
if (response_text != NULL)
free(response_text);
"Content-Type: %s\r\n"
"Server: c-ipfs/0.0.0-dev\r\n"
"X-Chunked-Output: 1\r\n"
"Connection: close\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"%x\r\n"
"%s\r\n"
"0\r\n\r\n"
,req.buf + req.http_ver, http_response->content_type, (unsigned int)http_response->bytes_size, http_response->bytes);
ipfs_core_http_response_free(http_response);
write_str (s, resp);
libp2p_logger_debug("api", "resp = {\n%s\n}\n", resp);
}

View File

@ -9,6 +9,7 @@
#include "ipfs/importer/exporter.h"
#include "ipfs/namesys/resolver.h"
#include "ipfs/namesys/publisher.h"
#include "ipfs/routing/routing.h"
/**
* Handles HttpRequest and HttpParam
@ -57,6 +58,24 @@ void ipfs_core_http_request_free(struct HttpRequest* request) {
}
}
struct HttpResponse* ipfs_core_http_response_new() {
struct HttpResponse* response = (struct HttpResponse*) malloc(sizeof(struct HttpResponse));
if (response != NULL) {
response->content_type = NULL;
response->bytes = NULL;
}
return response;
}
void ipfs_core_http_response_free(struct HttpResponse* response) {
if (response != NULL) {
// NOTE: content_type should not be dynamically allocated
if (response->bytes != NULL)
free(response->bytes);
free(response);
}
}
/***
* Build a new HttpParam
* @returns a newly allocated HttpParam struct
@ -91,7 +110,7 @@ void ipfs_core_http_param_free(struct HttpParam* param) {
* @param response the response
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_core_http_process_name(struct IpfsNode* local_node, struct HttpRequest* request, char** response) {
int ipfs_core_http_process_name(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
int retVal = 0;
char* path = NULL;
char* result = NULL;
@ -103,15 +122,23 @@ int ipfs_core_http_process_name(struct IpfsNode* local_node, struct HttpRequest*
if (strcmp(request->sub_command, "resolve") == 0) {
retVal = ipfs_namesys_resolver_resolve(local_node, path, 1, &result);
if (retVal) {
*response = (char*) malloc(strlen(result) + 20);
sprintf(*response, "{ \"Path\": \"%s\" }", result);
*response = ipfs_core_http_response_new();
struct HttpResponse* res = *response;
res->content_type = "application/json";
res->bytes = (uint8_t*) malloc(strlen(local_node->identity->peer->id) + strlen(path) + 30);
sprintf((char*)res->bytes, "{ \"Path\": \"%s\" }", result);
res->bytes_size = strlen((char*)res->bytes);
}
free(result);
} else if (strcmp(request->sub_command, "publish") == 0) {
retVal = ipfs_namesys_publisher_publish(local_node, path);
if (retVal) {
*response = (char*) malloc(strlen(local_node->identity->peer->id) + strlen(path) + 30);
sprintf(*response, "{ \"Name\": \"%s\"\n \"Value\": \"%s\" }", local_node->identity->peer->id, path);
*response = ipfs_core_http_response_new();
struct HttpResponse* res = *response;
res->content_type = "application/json";
res->bytes = (uint8_t*) malloc(strlen(local_node->identity->peer->id) + strlen(path) + 30);
sprintf((char*)res->bytes, "{ \"Name\": \"%s\"\n \"Value\": \"%s\" }", local_node->identity->peer->id, path);
res->bytes_size = strlen((char*)res->bytes);
}
}
return retVal;
@ -124,16 +151,18 @@ int ipfs_core_http_process_name(struct IpfsNode* local_node, struct HttpRequest*
* @param response the response
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_core_http_process_object(struct IpfsNode* local_node, struct HttpRequest* request, char** response) {
int ipfs_core_http_process_object(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
int retVal = 0;
if (strcmp(request->sub_command, "get") == 0) {
// do an object_get
if (request->arguments->total == 1) {
char* hash = (char*)libp2p_utils_vector_get(request->arguments, 0);
struct Cid* cid;
ipfs_cid_decode_hash_from_base58(hash, strlen(hash), &cid);
size_t size;
FILE* response_file = open_memstream(response, &size);
ipfs_cid_decode_hash_from_base58((unsigned char*)hash, strlen(hash), &cid);
*response = ipfs_core_http_response_new();
struct HttpResponse* res = *response;
res->content_type = "application/json";
FILE* response_file = open_memstream((char**)&res->bytes, &res->bytes_size);
retVal = ipfs_exporter_object_cat_to_file(local_node, cid->hash, cid->hash_length, response_file);
ipfs_cid_free(cid);
fclose(response_file);
@ -142,6 +171,87 @@ int ipfs_core_http_process_object(struct IpfsNode* local_node, struct HttpReques
return retVal;
}
int ipfs_core_http_process_dht_provide(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
int failedCount = 0;
for (int i = 0; i < request->arguments->total; i++) {
char* hash = (char*)libp2p_utils_vector_get(request->arguments, i);
struct Cid* cid;
if (!ipfs_cid_decode_hash_from_base58((unsigned char*)hash, strlen(hash), &cid)) {
failedCount++;
continue;
}
if (!local_node->routing->Provide(local_node->routing, cid->hash, cid->hash_length)) {
failedCount++;
continue;
}
}
*response = ipfs_core_http_response_new();
struct HttpResponse* res = *response;
res->content_type = "application/json";
res->bytes = (uint8_t*) malloc(1024);
if (!failedCount) {
// complete success
// TODO: do the right thing
snprintf((char*)res->bytes, 1024, "{\n\t\"ID\": \"<string>\"\n" \
"\t\"Type\": \"<int>\"\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\"\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t]\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
} else {
// at least some failed
// TODO: do the right thing
snprintf((char*)res->bytes, 1024, "{\n\t\"ID\": \"<string>\",\n" \
"\t\"Type\": \"<int>\",\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\",\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t],\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
}
res->bytes_size = strlen((char*)res->bytes);
return failedCount < request->arguments->total;
}
int ipfs_core_http_process_dht_get(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
int failedCount = 0;
// for now, we can only handle 1 argument at a time
if (request->arguments != NULL && request->arguments->total != 1)
return 0;
*response = ipfs_core_http_response_new();
struct HttpResponse* res = *response;
res->content_type = "application/octet-stream";
for (int i = 0; i < request->arguments->total; i++) {
char* hash = (char*)libp2p_utils_vector_get(request->arguments, i);
struct Cid* cid;
if (!ipfs_cid_decode_hash_from_base58((unsigned char*)hash, strlen(hash), &cid)) {
failedCount++;
continue;
}
if (!local_node->routing->GetValue(local_node->routing, cid->hash, cid->hash_length, (void**)&res->bytes, &res->bytes_size)) {
failedCount++;
continue;
}
//TODO: we need to handle multiple arguments
}
return failedCount < request->arguments->total;
}
/***
* process dht commands
* @param local_node the context
@ -149,59 +259,16 @@ int ipfs_core_http_process_object(struct IpfsNode* local_node, struct HttpReques
* @param response where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_core_http_process_dht(struct IpfsNode* local_node, struct HttpRequest* request, char** response) {
int failedCount = 0;
int ipfs_core_http_process_dht(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
int retVal = 0;
if (strcmp(request->sub_command, "provide") == 0) {
// do a dht provide
for (int i = 0; i < request->arguments->total; i++) {
char* hash = (char*)libp2p_utils_vector_get(request->arguments, i);
struct Cid* cid;
if (!ipfs_cid_decode_hash_from_base58(hash, strlen(hash), &cid)) {
failedCount++;
continue;
}
if (!ipfs_routing_online_provide(local_node->routing, cid->hash, cid->hash_length)) {
failedCount++;
continue;
}
}
if (!failedCount) {
// complete success
// TODO: do the right thing
*response = (char*) malloc(1024);
snprintf(*response, 1024, "{\n\t\"ID\": \"<string>\"\n" \
"\t\"Type\": \"<int>\"\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\"\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t]\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
} else {
// at least some failed
// TODO: do the right thing
*response = (char*) malloc(1024);
snprintf(*response, 1024, "{\n\t\"ID\": \"<string>\",\n" \
"\t\"Type\": \"<int>\",\n"
"\t\"Responses\": [\n"
"\t\t{\n"
"\t\t\t\"ID\": \"<string>\",\n"
"\t\t\t\"Addrs\": [\n"
"\t\t\t\t\"<object>\"\n"
"\t\t\t]\n"
"\t\t}\n"
"\t],\n"
"\t\"Extra\": \"<string>\"\n"
"}\n"
);
}
retVal = ipfs_core_http_process_dht_provide(local_node, request, response);
} else if (strcmp(request->sub_command, "get") == 0) {
// do a dht get
retVal = ipfs_core_http_process_dht_get(local_node, request, response);
}
return failedCount < request->arguments->total;
return retVal;
}
/***
@ -211,7 +278,7 @@ int ipfs_core_http_process_dht(struct IpfsNode* local_node, struct HttpRequest*
* @param response the response
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_core_http_request_process(struct IpfsNode* local_node, struct HttpRequest* request, char** response) {
int ipfs_core_http_request_process(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response) {
if (request == NULL)
return 0;
int retVal = 0;

View File

@ -20,6 +20,15 @@ struct HttpRequest {
struct Libp2pVector* arguments; // a collection of chars that are arguments
};
/***
* A struct to hold the response to be sent via http
*/
struct HttpResponse {
char* content_type; // a const char, not dynamically allocated
uint8_t* bytes; // dynamically allocated
size_t bytes_size;
};
/***
* Build a new HttpRequest
* @returns the newly allocated HttpRequest struct
@ -32,6 +41,10 @@ struct HttpRequest* ipfs_core_http_request_new();
*/
void ipfs_core_http_request_free(struct HttpRequest* request);
struct HttpResponse* ipfs_core_http_response_new();
void ipfs_core_http_response_free(struct HttpResponse* response);
/***
* Build a new HttpParam
* @returns a newly allocated HttpParam struct
@ -51,7 +64,7 @@ void ipfs_core_http_param_free(struct HttpParam* param);
* @param response the response
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_core_http_request_process(struct IpfsNode* local_node, struct HttpRequest* request, char** response);
int ipfs_core_http_request_process(struct IpfsNode* local_node, struct HttpRequest* request, struct HttpResponse** response);
/**
* Do an HTTP Get to the local API

View File

@ -51,29 +51,52 @@ int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, const unsigned
*/
int ipfs_routing_generic_get_value (ipfs_routing* routing, const unsigned char *key, size_t key_size, void **val, size_t *vlen)
{
struct HashtableNode* node = NULL;
*val = NULL;
int retVal = 0;
int retVal = 0;
if (!ipfs_merkledag_get(key, key_size, &node, routing->local_node->repo)) {
goto exit;
}
if (routing->local_node->mode == MODE_API_AVAILABLE) {
unsigned char buffer[256];
if (!ipfs_cid_hash_to_base58(key, key_size, &buffer[0], 256)) {
libp2p_logger_error("offline", "Unable to convert hash to its Base58 representation.\n");
return 0;
}
// protobuf the node
int protobuf_size = ipfs_hashtable_node_protobuf_encode_size(node);
*val = malloc(protobuf_size);
char* response;
struct HttpRequest* req = ipfs_core_http_request_new();
req->command = "dht";
req->sub_command = "get";
req->arguments = libp2p_utils_vector_new(1);
libp2p_utils_vector_add(req->arguments, buffer);
if (!ipfs_core_http_request_get(routing->local_node, req, &response)) {
libp2p_logger_error("offline", "Unable to call API for dht get.\n");
return 0;
}
//TODO: put results in val
fprintf(stdout, "%s", response);
return 1;
} else {
struct HashtableNode* node = NULL;
*val = NULL;
if (ipfs_hashtable_node_protobuf_encode(node, *val, protobuf_size, vlen) == 0) {
goto exit;
}
if (!ipfs_merkledag_get(key, key_size, &node, routing->local_node->repo)) {
goto exit;
}
retVal = 1;
exit:
if (node != NULL)
ipfs_hashtable_node_free(node);
if (retVal != 1 && *val != NULL) {
free(*val);
*val = NULL;
// protobuf the node
int protobuf_size = ipfs_hashtable_node_protobuf_encode_size(node);
*val = malloc(protobuf_size);
if (ipfs_hashtable_node_protobuf_encode(node, *val, protobuf_size, vlen) == 0) {
goto exit;
}
retVal = 1;
exit:
if (node != NULL)
ipfs_hashtable_node_free(node);
if (retVal != 1 && *val != NULL) {
free(*val);
*val = NULL;
}
}
return retVal;
}

View File

@ -407,8 +407,11 @@ int test_routing_provide() {
// add a file, to prime the connection to peer 1
//TODO: Find a better way to do this...
size_t bytes_written = 0;
ipfs_node_online_new(ipfs_path, &local_node2);
ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, local_node2, &bytes_written, 0);
ipfs_node_offline_new(ipfs_path, &local_node2);
uint8_t *bytes = (unsigned char*)"hello, world!\n";
char* filename = "test1.txt";
create_file(filename, bytes, strlen((char*)bytes));
ipfs_import_file(NULL, filename, &node, local_node2, &bytes_written, 0);
ipfs_node_free(local_node2);
// start the daemon in a separate thread
if (pthread_create(&thread2, NULL, test_daemon_start, (void*)ipfs_path) < 0) {
@ -418,7 +421,6 @@ int test_routing_provide() {
thread2_started = 1;
// wait for everything to start up
// JMJ debugging =
sleep(3);
retVal = 1;