Starting to handle get_value request

yamux
John Jones 2017-02-27 12:27:40 -05:00
parent 7a6b138444
commit f1aac5d707
15 changed files with 281 additions and 31 deletions

View File

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -O0 -I../../include -I../../../c-libp2p/include -I../../../c-protobuf -Wall
CFLAGS = -O0 -I../../include -I../../../c-libp2p/include -I../../../c-multiaddr/include -I../../../c-protobuf -Wall
ifdef DEBUG
CFLAGS += -g3

View File

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-protobuf -Wall
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-protobuf -Wall
ifdef DEBUG
CFLAGS += -g3

View File

@ -9,8 +9,7 @@
#include "ipfs/core/ipfs_node.h"
#include "ipfs/repo/fsrepo/fs_repo.h"
int ipfs_daemon (int argc, char **argv)
{
int ipfs_daemon_start(char* repo_path) {
int count_pths = 0;
pthread_t work_pths[MAX];
struct IpfsNodeListenParams listen_param;
@ -19,7 +18,7 @@ int ipfs_daemon (int argc, char **argv)
// read the configuration
struct FSRepo* fs_repo;
if (!ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo))
if (!ipfs_repo_fsrepo_new(repo_path, NULL, &fs_repo))
return 0;
// open the repository and read the file
@ -58,4 +57,10 @@ int ipfs_daemon (int argc, char **argv)
// All pthreads aborted?
return 0;
}
int ipfs_daemon (int argc, char **argv)
{
return ipfs_daemon_start(NULL);
}

View File

@ -32,7 +32,7 @@ void *ipfs_null_connection (void *ptr)
fprintf(stderr, "Connection %d, count %d\n", connection_param->socket, *(connection_param->count));
if (libp2p_net_multistream_negotiate(stream)) {
routing = ipfs_routing_new_online(connection_param->local_node->repo, &connection_param->local_node->identity->private_key, stream);
routing = ipfs_routing_new_online(connection_param->local_node, &connection_param->local_node->identity->private_key, stream);
for(;;) {
struct Libp2pMessage* msg = libp2p_net_multistream_get_message(stream);
@ -41,9 +41,17 @@ void *ipfs_null_connection (void *ptr)
case (MESSAGE_TYPE_PING):
routing->Ping(routing, msg);
break;
case (MESSAGE_TYPE_GET_VALUE):
routing->GetValue(routing, msg->key, msg->key_size, NULL, NULL);
case (MESSAGE_TYPE_GET_VALUE): {
unsigned char* val;
size_t val_size = 0;
routing->GetValue(routing, msg->key, msg->key_size, (void**)&val, &val_size);
if (val == NULL) {
stream->write(stream, 0, 1);
} else {
stream->write(stream, val, val_size);
}
break;
}
default:
break;
}

View File

@ -2,10 +2,13 @@
#include <string.h>
#include <stdlib.h>
#include "ipfs/importer/resolver.h"
#include "libp2p/crypto/encoding/base58.h"
#include "ipfs/merkledag/node.h"
#include "ipfs/merkledag/merkledag.h"
#include "ipfs/repo/fsrepo/fs_repo.h"
#include "libp2p/net/multistream.h"
#include "libp2p/record/message.h"
/**
* return the next chunk of a path
@ -32,10 +35,12 @@ int ipfs_resolver_next_path(const char* path, char** next_part) {
}
/**
* Remove preceding slash and "/ipfs/" or "/ipns/"
* @param path
* Remove preceding slash and "/ipfs/" or "/ipns/" as well as the local multihash (if it is local)
* @param path the path from the command line
* @param fs_repo the local repo
* @returns the modified path
*/
const char* ipfs_resolver_remove_path_prefix(const char* path) {
const char* ipfs_resolver_remove_path_prefix(const char* path, const struct FSRepo* fs_repo) {
int pos = 0;
int first_non_slash = -1;
while(&path[pos] != NULL) {
@ -48,6 +53,8 @@ const char* ipfs_resolver_remove_path_prefix(const char* path) {
if (pos == first_non_slash && (strncmp(&path[pos], "ipfs", 4) == 0 || strncmp(&path[pos], "ipns", 4) == 0) ) {
// ipfs or ipns should be up front. Otherwise, it could be part of the path
pos += 4;
} else if (strncmp(&path[pos], fs_repo->config->identity->peer_id, strlen(fs_repo->config->identity->peer_id)) == 0) {
pos += strlen(fs_repo->config->identity->peer_id) + 1; // the slash
} else {
return &path[pos];
}
@ -56,6 +63,115 @@ const char* ipfs_resolver_remove_path_prefix(const char* path) {
return NULL;
}
/**
* Determine if this path is a remote path
* @param path the path to examine
* @param fs_repo the local repo
* @returns true(1) if this path is a remote path
*/
int ipfs_resolver_is_remote(const char* path, const struct FSRepo* fs_repo) {
int pos = 0;
// skip the first slash
while (&path[pos] != NULL && path[pos] == '/') {
pos++;
}
if (&path[pos] == NULL)
return 0;
// skip the ipfs prefix
if (strncmp(&path[pos], "ipfs/", 5) == 0 || strncmp(&path[pos], "ipns/", 5) == 0) {
pos += 5; //the word plus the slash
} else
return 0;
// if this is a Qm code, see if it is a local Qm code
if (path[pos] == 'Q' && path[pos+1] == 'm') {
if (strncmp(&path[pos], fs_repo->config->identity->peer_id, strlen(fs_repo->config->identity->peer_id)) != 0) {
return 1;
}
}
return 0;
}
/**
* This is a hack to get ip4/tcp working
* TODO: this should be moved further down in the networking stack and generified for different multiaddresses
* This makes too many assumptions
*/
int ipfs_resolver_multiaddress_parse_tcp(struct MultiAddress* address, char** ip, int* port) {
// ip
char* str = malloc(strlen(address->string));
strcpy(str, &address->string[5]); // gets rid of /ip4/
char* pos = strchr(str, '/');
pos[0] = 0;
*ip = malloc(strlen(str) + 1);
strcpy(*ip, str);
free(str);
// port
str = strstr(address->string, "/tcp/");
str += 5;
*port = atoi(str);
return 1;
}
/**
* Retrieve a node from a remote source
* @param path the path to retrieve
* @param from where to start
* @param fs_repo the local repo
* @returns the node, or NULL if not found
*/
struct Node* ipfs_resolver_remote_get(const char* path, struct Node* from, const struct IpfsNode* ipfs_node) {
// parse the path
const char* temp = ipfs_resolver_remove_path_prefix(path, ipfs_node->repo);
if (temp == NULL)
return NULL;
char* pos = strchr(temp, '/');
if (pos == NULL || pos - temp > 254)
return NULL;
char id[255];
size_t id_size = pos - temp;
strncpy(id, temp, id_size);
id[id_size] = 0;
char* key = &pos[1];
pos = strchr(key, '/');
if (pos == NULL || pos - temp > 254)
return NULL;
pos[0] = '\0';
// get the multiaddress for this
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(ipfs_node->peerstore, (unsigned char*)id, id_size);
if (peer == NULL)
return NULL;
// connect to the peer
struct MultiAddress* address = peer->addr_head->item;
char* ip;
int port;
ipfs_resolver_multiaddress_parse_tcp(address, &ip, &port);
struct Stream* stream = libp2p_net_multistream_connect(ip, port);
free(ip);
// build the request
struct Libp2pMessage* message = libp2p_message_new();
message->message_type = MESSAGE_TYPE_GET_VALUE;
message->key = key;
message->key_size = strlen(key);
size_t message_protobuf_size = libp2p_message_protobuf_encode_size(message);
unsigned char message_protobuf[message_protobuf_size];
libp2p_message_protobuf_encode(message, message_protobuf, message_protobuf_size, &message_protobuf_size);
stream->write(stream, message_protobuf, message_protobuf_size);
unsigned char* response;
size_t response_size;
// we should get back a protobuf'd record
stream->read(stream, &response, &response_size);
if (response_size == 1)
return NULL;
// turn the protobuf into a Node
struct Node* node;
ipfs_node_protobuf_decode(response, response_size, &node);
return node;
}
/**
* Interogate the path and the current node, looking
* for the desired node.
@ -63,8 +179,14 @@ const char* ipfs_resolver_remove_path_prefix(const char* path) {
* @param from the current node (or NULL if it is the first call)
* @returns what we are looking for, or NULL if it wasn't found
*/
struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct FSRepo* fs_repo) {
struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct IpfsNode* ipfs_node) {
struct FSRepo* fs_repo = ipfs_node->repo;
// shortcut for remote files
if (from == NULL && ipfs_resolver_is_remote(path, fs_repo)) {
return ipfs_resolver_remote_get(path, from, ipfs_node);
}
/**
* Memory management notes:
* If we find what we're looking for, we clean up "from" and return the object
@ -73,7 +195,7 @@ struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct
*/
// remove unnecessary stuff
if (from == NULL)
path = ipfs_resolver_remove_path_prefix(path);
path = ipfs_resolver_remove_path_prefix(path, fs_repo);
// grab the portion of the path to work with
char* path_section;
if (ipfs_resolver_next_path(path, &path_section) == 0)
@ -102,7 +224,7 @@ struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct
} else {
// look on...
free(path_section);
struct Node* newNode = ipfs_resolver_get(&path[pos+1], current_node, fs_repo); // the +1 is the slash
struct Node* newNode = ipfs_resolver_get(&path[pos+1], current_node, ipfs_node); // the +1 is the slash
return newNode;
}
} else {
@ -134,7 +256,7 @@ struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct
// if we're at the end of the path, return the node
// continue looking for the next part of the path
ipfs_node_free(from);
struct Node* newNode = ipfs_resolver_get(next_path_section, current_node, fs_repo);
struct Node* newNode = ipfs_resolver_get(next_path_section, current_node, ipfs_node);
return newNode;
}
}

View File

@ -25,5 +25,6 @@
void *ipfs_null_connection (void *ptr);
void *ipfs_null_listen (void *ptr);
int ipfs_daemon (int argc, char **argv);
int ipfs_daemon_start(char* repo_path);
int ipfs_ping (int argc, char **argv);
#endif // DAEMON_H

View File

@ -1,6 +1,7 @@
#pragma once
#include "ipfs/merkledag/node.h"
#include "ipfs/core/ipfs_node.h"
/**
* Interogate the path and the current node, looking
@ -9,4 +10,4 @@
* @param from the current node (or NULL if it is the first call)
* @returns what we are looking for, or NULL if it wasn't found
*/
struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct FSRepo* fs_repo);
struct Node* ipfs_resolver_get(const char* path, struct Node* from, const struct IpfsNode* ipfs_node);

View File

@ -2,12 +2,13 @@
#include "libp2p/crypto/rsa.h"
#include "libp2p/record/message.h"
#include "ipfs/core/ipfs_node.h"
// offlineRouting implements the IpfsRouting interface,
// but only provides the capability to Put and Get signed dht
// records to and from the local datastore.
struct s_ipfs_routing {
struct FSRepo* datastore;
struct IpfsNode* local_node;
size_t ds_len;
struct RsaPrivateKey* sk;
struct Stream* stream;
@ -49,7 +50,7 @@ struct s_ipfs_routing {
typedef struct s_ipfs_routing ipfs_routing;
// offline routing routines.
ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key);
ipfs_routing* ipfs_routing_new_online (struct FSRepo* ds, struct RsaPrivateKey* private_key, struct Stream* stream);
ipfs_routing* ipfs_routing_new_offline (struct IpfsNode* local_node, struct RsaPrivateKey *private_key);
ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key, struct Stream* stream);
int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen);int ipfs_routing_generic_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void **val, size_t *vlen);

View File

@ -57,6 +57,7 @@ void strip_quotes(int argc, char** argv) {
#define CAT 5
#define DAEMON 6
#define PING 7
#define GET 8
/***
* Basic parsing of command line arguments to figure out where the user wants to go
@ -87,6 +88,9 @@ int parse_arguments(int argc, char** argv) {
if (strcmp("ping", argv[1]) == 0) {
return PING;
}
if (strcmp("get", argv[1]) == 0) {
return GET;
}
return -1;
}
@ -106,6 +110,8 @@ int main(int argc, char** argv) {
case (OBJECT_GET):
ipfs_exporter_object_get(argc, argv);
break;
case(GET):
//ipfs_exporter_get(argc, argv);
case (CAT):
ipfs_exporter_object_cat(argc, argv);
break;

View File

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-protobuf -Wall
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-protobuf -Wall
ifdef DEBUG
CFLAGS += -g3

View File

@ -6,6 +6,7 @@
#include "ipfs/datastore/ds_helper.h"
#include "ipfs/merkledag/merkledag.h"
#include "ipfs/routing/routing.h"
#include "ipfs/importer/resolver.h"
int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen)
{
@ -36,10 +37,21 @@ int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, char *key, siz
return 0; // success.
}
int ipfs_routing_generic_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void **val, size_t *vlen)
int ipfs_routing_generic_get_value (ipfs_routing* routing, char *key, size_t key_size, void **val, size_t *vlen)
{
// TODO: Read from db, validate and decode before return.
return -1;
char key_str[key_size + 1];
strncpy(key_str, key, key_size);
key_str[key_size] = 0;
struct Node* node = ipfs_resolver_get(key_str, NULL, routing->local_node);
if (node == NULL)
return -1;
// protobuf the node
int protobuf_size = ipfs_node_protobuf_encode_size(node);
*val = malloc(protobuf_size);
if (ipfs_node_protobuf_encode(node, *val, protobuf_size, vlen) == 0)
return -1;
return 0;
}
int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, void *ret, size_t *rlen)
@ -67,12 +79,12 @@ int ipfs_routing_offline_bootstrap (ipfs_routing* offlineRouting)
return ErrOffline;
}
ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key)
ipfs_routing* ipfs_routing_new_offline (struct IpfsNode* local_node, struct RsaPrivateKey *private_key)
{
ipfs_routing *offlineRouting = malloc (sizeof(ipfs_routing));
if (offlineRouting) {
offlineRouting->datastore = ds;
offlineRouting->local_node = local_node;
offlineRouting->sk = private_key;
offlineRouting->stream = NULL;

View File

@ -41,11 +41,11 @@ int ipfs_routing_online_bootstrap(struct s_ipfs_routing* routing) {
* @param private_key the local private key
* @reurns the ipfs_routing struct that handles messages
*/
ipfs_routing* ipfs_routing_new_online (struct FSRepo* fs_repo, struct RsaPrivateKey *private_key, struct Stream* stream) {
ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey *private_key, struct Stream* stream) {
ipfs_routing *onlineRouting = malloc (sizeof(ipfs_routing));
if (onlineRouting) {
onlineRouting->datastore = fs_repo;
onlineRouting->local_node = local_node;
onlineRouting->sk = private_key;
onlineRouting->stream = stream;

View File

@ -8,6 +8,8 @@ OBJS = testit.o test_helper.o \
../cmd/ipfs/init.o \
../commands/argument.o ../commands/command_option.o ../commands/command.o ../commands/cli/parse.o \
../core/builder.o \
../core/daemon.o \
../core/null.o \
../datastore/ds_helper.o \
../flatfs/flatfs.o \
../importer/importer.o ../importer/exporter.o ../importer/resolver.o \
@ -19,6 +21,8 @@ OBJS = testit.o test_helper.o \
../repo/config/config.o ../repo/config/identity.o \
../repo/config/bootstrap_peers.o ../repo/config/datastore.o ../repo/config/gateway.o \
../repo/config/addresses.o ../repo/config/swarm.o ../repo/config/peer.o \
../routing/offline.o \
../routing/online.o \
../thirdparty/ipfsaddr/ipfs_addr.o \
../unixfs/unixfs.o \
../../c-protobuf/protobuf.o ../../c-protobuf/varint.o

View File

@ -1,5 +1,9 @@
#include <pthread.h>
#include "ipfs/importer/resolver.h"
#include "ipfs/os/utils.h"
#include "multiaddr/multiaddr.h"
#include "ipfs/core/daemon.h"
int test_resolver_get() {
// clean out repository
@ -27,8 +31,11 @@ int test_resolver_get() {
ipfs_repo_fsrepo_new(ipfs_path, NULL, &fs_repo);
ipfs_repo_fsrepo_open(fs_repo);
struct IpfsNode ipfs_node;
ipfs_node.repo = fs_repo;
// find something that is already in the repository
struct Node* result = ipfs_resolver_get("/ipfs/QmbMecmXESf96ZNry7hRuzaRkEBhjqXpoYfPCwgFzVGDzB", NULL, fs_repo);
struct Node* result = ipfs_resolver_get("QmbMecmXESf96ZNry7hRuzaRkEBhjqXpoYfPCwgFzVGDzB", NULL, &ipfs_node);
if (result == NULL) {
free(test_dir);
ipfs_repo_fsrepo_free(fs_repo);
@ -37,8 +44,21 @@ int test_resolver_get() {
ipfs_node_free(result);
// find something where path includes the local node
char path[255];
strcpy(path, "/ipfs/");
strcat(path, fs_repo->config->identity->peer_id);
strcat(path, "/QmbMecmXESf96ZNry7hRuzaRkEBhjqXpoYfPCwgFzVGDzB");
result = ipfs_resolver_get(path, NULL, &ipfs_node);
if (result == NULL) {
free(test_dir);
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
ipfs_node_free(result);
// find something by path
result = ipfs_resolver_get("/ipfs/QmZBvycPAYScBoPEzm35zXHt6gYYV5t9PyWmr4sksLPNFS/hello_world.txt", NULL, fs_repo);
result = ipfs_resolver_get("QmZBvycPAYScBoPEzm35zXHt6gYYV5t9PyWmr4sksLPNFS/hello_world.txt", NULL, &ipfs_node);
if (result == NULL) {
free(test_dir);
ipfs_repo_fsrepo_free(fs_repo);
@ -51,3 +71,71 @@ int test_resolver_get() {
return 1;
}
void* test_resolver_daemon_start(void* arg) {
ipfs_daemon_start((char*)arg);
return NULL;
}
int test_resolver_remote_get() {
// clean out repository
const char* ipfs_path = "/tmp/.ipfs";
os_utils_setenv("IPFS_PATH", ipfs_path, 1);
char remote_peer_id[255];
char path[255];
drop_and_build_repository(ipfs_path);
// start the daemon in a separate thread
pthread_t thread;
int rc = pthread_create(&thread, NULL, test_resolver_daemon_start, (void*)ipfs_path);
// this should point to a test directory with files and directories
char* home_dir = os_utils_get_homedir();
char* test_dir = malloc(strlen(home_dir) + 10);
os_utils_filepath_join(home_dir, "ipfstest", test_dir, strlen(home_dir) + 10);
int argc = 4;
char* argv[argc];
argv[0] = "ipfs";
argv[1] = "add";
argv[2] = "-r";
argv[3] = test_dir;
ipfs_import_files(argc, (char**)argv);
struct FSRepo* fs_repo;
ipfs_repo_fsrepo_new(ipfs_path, NULL, &fs_repo);
ipfs_repo_fsrepo_open(fs_repo);
// put the server in the peer store and change our peer id so we think it is remote (hack for now)
strcpy(remote_peer_id, fs_repo->config->identity->peer_id);
struct MultiAddress* remote_addr = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/4001");
struct Peerstore* peerstore = libp2p_peerstore_new();
struct Libp2pPeer* peer = libp2p_peer_new_from_data(remote_peer_id, strlen(remote_peer_id), remote_addr);
libp2p_peerstore_add_peer(peerstore, peer);
strcpy(fs_repo->config->identity->peer_id, "QmABCD");
struct IpfsNode local_node;
local_node.mode = MODE_ONLINE;
local_node.peerstore = peerstore;
local_node.repo = fs_repo;
local_node.identity = fs_repo->config->identity;
// find something by remote path
strcpy(path, "/ipfs/");
strcat(path, remote_peer_id);
strcat(path, "/QmZBvycPAYScBoPEzm35zXHt6gYYV5t9PyWmr4sksLPNFS/hello_world.txt");
struct Node* result = ipfs_resolver_get(path, NULL, &local_node);
if (result == NULL) {
ipfs_repo_fsrepo_free(fs_repo);
pthread_cancel(thread);
return 0;
}
ipfs_node_free(result);
ipfs_repo_fsrepo_free(fs_repo);
pthread_cancel(thread);
return 1;
}

View File

@ -57,7 +57,8 @@ const char* names[] = {
"test_resolver_get",
"test_unixfs_encode_decode",
"test_unixfs_encode_smallfile",
"test_ping"
"test_ping",
"test_resolver_remote_get"
};
int (*funcs[])(void) = {
@ -92,7 +93,8 @@ int (*funcs[])(void) = {
test_resolver_get,
test_unixfs_encode_decode,
test_unixfs_encode_smallfile,
test_ping
test_ping,
test_resolver_remote_get
};
/**