From 7a6b1384448a303f115e9bc5fd6eb273e40eaf8f Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 23 Feb 2017 15:15:33 -0500 Subject: [PATCH] more ping cleanup and friendly interface to streams --- core/null.c | 74 ++++++++++++++++++++----------- core/ping.c | 10 ++--- include/ipfs/core/ipfs_node.h | 4 ++ include/ipfs/routing/routing.h | 81 ++++++++++++++++++++++------------ routing/Makefile | 2 +- routing/offline.c | 48 ++++++++++---------- routing/online.c | 62 ++++++++++++++++++++++++++ 7 files changed, 198 insertions(+), 83 deletions(-) create mode 100644 routing/online.c diff --git a/core/null.c b/core/null.c index bfc3768..a72faa8 100644 --- a/core/null.c +++ b/core/null.c @@ -8,43 +8,67 @@ #include "libp2p/record/message.h" #include "libp2p/net/multistream.h" #include "ipfs/core/daemon.h" +#include "ipfs/routing/routing.h" +#include "ipfs/core/ipfs_node.h" #define BUF_SIZE 4096 +/** + * We've received a connection. Find out what they want + */ void *ipfs_null_connection (void *ptr) { - struct null_connection_params *connection_param; - //char b[BUF_SIZE]; - //int len; + struct null_connection_params *connection_param = NULL; + struct s_ipfs_routing* routing = NULL; + struct Stream* stream = NULL; connection_param = (struct null_connection_params*) ptr; // TODO: multistream + secio + message. + // TODO: when should we exit the for loop and disconnect? + + stream = libp2p_net_multistream_stream_new(connection_param->socket); + fprintf(stderr, "Connection %d, count %d\n", connection_param->socket, *(connection_param->count)); - for(;;) { - if (libp2p_net_multistream_negotiate(connection_param->socket)) { - // we negotiated, now find out what they want - libp2p_net_multistream_handle_message(connection_param->socket); - } else { - break; - } - /* - len = socket_read(connection_param->socket, b, sizeof(b)-1, 0); - if (len > 0) { - while (b[len-1] == '\r' || b[len-1] == '\n') len--; - b[len] = '\0'; - fprintf(stderr, "Recv: '%s'\n", b); - if (strcmp (b, "ping") == 0) { - socket_write(connection_param->socket, "pong", 4, 0); - } - } else if(len < 0) { - break; - } - */ - } + if (libp2p_net_multistream_negotiate(stream)) { + routing = ipfs_routing_new_online(connection_param->local_node->repo, &connection_param->local_node->identity->private_key, stream); - close (connection_param->socket); // close socket. + for(;;) { + struct Libp2pMessage* msg = libp2p_net_multistream_get_message(stream); + if (msg != NULL) { + switch(msg->message_type) { + case (MESSAGE_TYPE_PING): + routing->Ping(routing, msg); + break; + case (MESSAGE_TYPE_GET_VALUE): + routing->GetValue(routing, msg->key, msg->key_size, NULL, NULL); + break; + default: + break; + } + } else { + break; + } + } + } + /* + len = socket_read(connection_param->socket, b, sizeof(b)-1, 0); + if (len > 0) { + while (b[len-1] == '\r' || b[len-1] == '\n') len--; + b[len] = '\0'; + fprintf(stderr, "Recv: '%s'\n", b); + if (strcmp (b, "ping") == 0) { + socket_write(connection_param->socket, "pong", 4, 0); + } + } else if(len < 0) { + break; + } + */ + + if (stream != NULL) { + stream->close(stream); + } (*(connection_param->count))--; // update counter. free (connection_param); return (void*) 1; diff --git a/core/ping.c b/core/ping.c index c970ddc..c6a09e1 100644 --- a/core/ping.c +++ b/core/ping.c @@ -12,7 +12,7 @@ int ipfs_ping (int argc, char **argv) { - char* results = NULL; + unsigned char* results = NULL; size_t results_size = 0; //TODO: handle multiaddress @@ -20,8 +20,8 @@ int ipfs_ping (int argc, char **argv) //TODO: Error checking char* ip = argv[2]; int port = atoi(argv[3]); - int socket_fd = libp2p_net_multistream_connect(ip, port); - if (socket_fd < 0) { + struct Stream* stream = libp2p_net_multistream_connect(ip, port); + if (stream == NULL) { fprintf(stderr, "Unable to connect to %s on port %s", ip, argv[3]); } @@ -32,8 +32,8 @@ int ipfs_ping (int argc, char **argv) size_t protobuf_size = libp2p_message_protobuf_encode_size(msg); unsigned char protobuf[protobuf_size]; libp2p_message_protobuf_encode(msg, &protobuf[0], protobuf_size, &protobuf_size); - libp2p_net_multistream_send(socket_fd, protobuf, protobuf_size); - libp2p_net_multistream_receive(socket_fd, &results, &results_size); + libp2p_net_multistream_write(stream, protobuf, protobuf_size); + libp2p_net_multistream_read(stream, &results, &results_size); if (results_size != protobuf_size) { fprintf(stderr, "PING unsuccessful. Original size: %lu, returned size: %lu\n", protobuf_size, results_size); diff --git a/include/ipfs/core/ipfs_node.h b/include/ipfs/core/ipfs_node.h index 9a878bd..b76c33d 100644 --- a/include/ipfs/core/ipfs_node.h +++ b/include/ipfs/core/ipfs_node.h @@ -1,5 +1,9 @@ #pragma once +#include "ipfs/repo/config/identity.h" +#include "ipfs/repo/fsrepo/fs_repo.h" +#include "libp2p/peer/peerstore.h" + enum NodeMode { MODE_OFFLINE, MODE_ONLINE }; struct IpfsNode { diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index 9373f55..833843d 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -1,32 +1,55 @@ -#ifndef ROUTING_H - #define ROUTING_H +#pragma once - #include "libp2p/crypto/rsa.h" +#include "libp2p/crypto/rsa.h" +#include "libp2p/record/message.h" - // offlineRouting implements the IpfsRouting interface, - // but only provides the capability to Put and Get signed dht - // records to and from the local datastore. - struct s_ipfs_routing { - struct FSRepo* datastore; - size_t ds_len; - struct RsaPrivateKey* sk; - int (*PutValue) (struct s_ipfs_routing*, char*, size_t, void*, size_t); - int (*GetValue) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); - int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); - int (*FindPeer) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); - int (*Provide) (struct s_ipfs_routing*, char*); - int (*Ping) (struct s_ipfs_routing*, char*, size_t); - int (*Bootstrap) (struct s_ipfs_routing*); - }; - typedef struct s_ipfs_routing ipfs_routing; +// offlineRouting implements the IpfsRouting interface, +// but only provides the capability to Put and Get signed dht +// records to and from the local datastore. +struct s_ipfs_routing { + struct FSRepo* datastore; + size_t ds_len; + struct RsaPrivateKey* sk; + struct Stream* stream; + + /** + * Put a value in the datastore + * @param 1 the struct that contains connection information + * @param 2 the key + * @param 3 the size of the key + * @param 4 the value + * @param 5 the size of the value + * @returns 0 on success, otherwise -1 + */ + int (*PutValue) (struct s_ipfs_routing*, char*, size_t, void*, size_t); + /** + * Get a value from the datastore + * @param 1 the struct that contains the connection information + * @param 2 the key to look for + * @param 3 the size of the key + * @param 4 a place to store the value + * @param 5 the size of the value + */ + int (*GetValue) (struct s_ipfs_routing*, char*, size_t, void**, size_t*); + /** + * Find a provider + */ + int (*FindProviders) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); + /** + * Find a peer + */ + int (*FindPeer) (struct s_ipfs_routing*, char*, size_t, void*, size_t*); + int (*Provide) (struct s_ipfs_routing*, char*); + /** + * Ping this instance + */ + int (*Ping) (struct s_ipfs_routing*, struct Libp2pMessage* message); + int (*Bootstrap) (struct s_ipfs_routing*); +}; +typedef struct s_ipfs_routing ipfs_routing; + +// offline routing routines. +ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key); +ipfs_routing* ipfs_routing_new_online (struct FSRepo* ds, struct RsaPrivateKey* private_key, struct Stream* stream); +int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen);int ipfs_routing_generic_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void **val, size_t *vlen); - // offline routing routines. - ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key); - int ipfs_routing_offline_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen); - int ipfs_routing_offline_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t *vlen); - int ipfs_routing_offline_find_providers (ipfs_routing* offlineRouting, char *key, size_t key_size, void *ret, size_t *rlen); - int ipfs_routing_offline_find_peer (ipfs_routing* offlineRouting, char *peer_id, size_t pid_size, void *ret, size_t *rlen); - int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, char *cid); - int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, char *peer_id, size_t pid_size); - int ipfs_routing_offline_bootstrap (ipfs_routing* offlineRouting); -#endif // ROUTING_H diff --git a/routing/Makefile b/routing/Makefile index 8ae3fdb..8efa4a2 100644 --- a/routing/Makefile +++ b/routing/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = offline.o +OBJS = offline.o online.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/routing/offline.c b/routing/offline.c index 3f867c8..b55e740 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -5,28 +5,9 @@ #include "libp2p/record/record.h" #include "ipfs/datastore/ds_helper.h" #include "ipfs/merkledag/merkledag.h" +#include "ipfs/routing/routing.h" -ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key) -{ - ipfs_routing *offlineRouting = malloc (sizeof(ipfs_routing)); - - if (offlineRouting) { - offlineRouting->datastore = ds; - offlineRouting->sk = private_key; - - offlineRouting->PutValue = ipfs_routing_offline_put_value; - offlineRouting->GetValue = ipfs_routing_offline_get_value; - offlineRouting->FindProviders = ipfs_routing_offline_find_providers; - offlineRouting->FindPeer = ipfs_routing_offline_find_peer; - offlineRouting->Provide = ipfs_routing_offline_provide; - offlineRouting->Ping = ipfs_routing_offline_ping; - offlineRouting->Bootstrap = ipfs_routing_offline_bootstrap; - } - - return offlineRouting; -} - -int ipfs_routing_offline_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen) +int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t vlen) { int err; char *record, *nkey; @@ -55,7 +36,7 @@ int ipfs_routing_offline_put_value (ipfs_routing* offlineRouting, char *key, siz return 0; // success. } -int ipfs_routing_offline_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void *val, size_t *vlen) +int ipfs_routing_generic_get_value (ipfs_routing* offlineRouting, char *key, size_t key_size, void **val, size_t *vlen) { // TODO: Read from db, validate and decode before return. return -1; @@ -76,7 +57,7 @@ int ipfs_routing_offline_provide (ipfs_routing* offlineRouting, char *cid) return ErrOffline; } -int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, char *peer_id, size_t pid_size) +int ipfs_routing_offline_ping (ipfs_routing* offlineRouting, struct Libp2pMessage* message) { return ErrOffline; } @@ -85,3 +66,24 @@ int ipfs_routing_offline_bootstrap (ipfs_routing* offlineRouting) { return ErrOffline; } + +ipfs_routing* ipfs_routing_new_offline (struct FSRepo* ds, struct RsaPrivateKey *private_key) +{ + ipfs_routing *offlineRouting = malloc (sizeof(ipfs_routing)); + + if (offlineRouting) { + offlineRouting->datastore = ds; + offlineRouting->sk = private_key; + offlineRouting->stream = NULL; + + offlineRouting->PutValue = ipfs_routing_generic_put_value; + offlineRouting->GetValue = ipfs_routing_generic_get_value; + offlineRouting->FindProviders = ipfs_routing_offline_find_providers; + offlineRouting->FindPeer = ipfs_routing_offline_find_peer; + offlineRouting->Provide = ipfs_routing_offline_provide; + offlineRouting->Ping = ipfs_routing_offline_ping; + offlineRouting->Bootstrap = ipfs_routing_offline_bootstrap; + } + + return offlineRouting; +} diff --git a/routing/online.c b/routing/online.c new file mode 100644 index 0000000..722979c --- /dev/null +++ b/routing/online.c @@ -0,0 +1,62 @@ +#include + +#include "ipfs/routing/routing.h" +#include "libp2p/record/message.h" +#include "libp2p/net/stream.h" + +/** + * Implements the routing interface for network clients + */ + +int ipfs_routing_online_find_providers(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { + return 0; +} +int ipfs_routing_online_find_peer(struct s_ipfs_routing* routing, char* val1, size_t val2, void* val3, size_t* val4) { + return 0; +} +int ipfs_routing_online_provide(struct s_ipfs_routing* routing, char* val1) { + return 0; +} +int ipfs_routing_online_ping(struct s_ipfs_routing* routing, struct Libp2pMessage* message) { + // send the same stuff back + size_t protobuf_size = libp2p_message_protobuf_encode_size(message); + unsigned char protobuf[protobuf_size]; + + if (!libp2p_message_protobuf_encode(message, protobuf, protobuf_size, &protobuf_size)) { + return -1; + } + + int retVal = routing->stream->write(routing->stream, protobuf, protobuf_size); + if (retVal == 0) + retVal = -1; + return retVal; +} +int ipfs_routing_online_bootstrap(struct s_ipfs_routing* routing) { + return 0; +} + +/** + * Create a new ipfs_routing struct for online clients + * @param fs_repo the repo + * @param private_key the local private key + * @reurns the ipfs_routing struct that handles messages + */ +ipfs_routing* ipfs_routing_new_online (struct FSRepo* fs_repo, struct RsaPrivateKey *private_key, struct Stream* stream) { + ipfs_routing *onlineRouting = malloc (sizeof(ipfs_routing)); + + if (onlineRouting) { + onlineRouting->datastore = fs_repo; + onlineRouting->sk = private_key; + onlineRouting->stream = stream; + + onlineRouting->PutValue = ipfs_routing_generic_put_value; + onlineRouting->GetValue = ipfs_routing_generic_get_value; + onlineRouting->FindProviders = ipfs_routing_online_find_providers; + onlineRouting->FindPeer = ipfs_routing_online_find_peer; + onlineRouting->Provide = ipfs_routing_online_provide; + onlineRouting->Ping = ipfs_routing_online_ping; + onlineRouting->Bootstrap = ipfs_routing_online_bootstrap; + } + + return onlineRouting; +}