Testing journalio protocol

This commit is contained in:
jmjatlanta 2017-08-30 11:10:14 -05:00
parent b3af783f4e
commit 49bd61feb1
16 changed files with 280 additions and 57 deletions

View file

@ -80,6 +80,54 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) {
return 1;
}
/***
* build an offline IpfsNode
* @param repo_path where the IPFS repository directory is
* @param node the completed IpfsNode struct
* @returns true(1) on success
*/
int ipfs_node_offline_new(const char* repo_path, struct IpfsNode** node) {
struct FSRepo* fs_repo = NULL;
*node = (struct IpfsNode*)malloc(sizeof(struct IpfsNode));
if(*node == NULL)
return 0;
struct IpfsNode* local_node = *node;
local_node->identity = NULL;
local_node->peerstore = NULL;
local_node->providerstore = NULL;
local_node->repo = NULL;
local_node->routing = NULL;
local_node->exchange = NULL;
// build the struct
if (!ipfs_repo_fsrepo_new(repo_path, NULL, &fs_repo)) {
ipfs_node_free(local_node);
*node = NULL;
return 0;
}
// open the repo
if (!ipfs_repo_fsrepo_open(fs_repo)) {
ipfs_node_free(local_node);
*node = NULL;
return 0;
}
// fill in the node
local_node->repo = fs_repo;
local_node->identity = fs_repo->config->identity;
local_node->peerstore = libp2p_peerstore_new(local_node->identity->peer);
local_node->providerstore = libp2p_providerstore_new(fs_repo->config->datastore, local_node->identity->peer);
local_node->blockstore = ipfs_blockstore_new(fs_repo);
local_node->protocol_handlers = ipfs_node_online_build_protocol_handlers(local_node);
local_node->mode = MODE_OFFLINE;
local_node->routing = ipfs_routing_new_offline(local_node, &fs_repo->config->identity->private_key);
local_node->exchange = ipfs_bitswap_new(local_node);
return 1;
}
/***
* Free resources from the creation of an IpfsNode
* @param node the node to free

View file

@ -130,30 +130,39 @@ void ipfs_null_connection (void *ptr) {
}
int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* peer) {
if (peer == NULL)
libp2p_logger_debug("null", "Attempting maintenance on peer %s.\n", libp2p_peer_id_to_string(peer));
if (peer == NULL) {
libp2p_logger_debug("null", "No maintenance ran on NULL node.\n");
return 0;
if (peer->is_local)
}
if (peer->is_local) {
libp2p_logger_debug("null", "No maintenance ran on local node.\n");
return 1;
}
// Is this peer one of our backup partners?
struct ReplicationPeer* replication_peer = repo_config_get_replication_peer(local_node->repo->config->replication, peer);
long long announce_secs = local_node->repo->config->replication->announce_minutes * 60;
// If so, has there been enough time since the last attempt a backup?
if (replication_peer != NULL) {
announce_secs -= os_utils_gmtime() - repo_config_replication_last_attempt(local_node->repo->config->replication, peer);
announce_secs -= os_utils_gmtime() - replication_peer->lastConnect;
libp2p_logger_debug("null", "Found replication peer. Announce secs are %lld.\n", announce_secs);
}
// should we attempt to connect if we're not already?
if (replication_peer != NULL && announce_secs < 0) {
// try to connect if we aren't already
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 10)) {
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 2)) {
return 0;
}
}
// attempt a backup, don't forget to reset timer
libp2p_logger_debug("null", "Attempting a sync of node %s.\n", peer->id);
ipfs_journal_sync(local_node, replication_peer);
} else {
// try a ping, but only if we're connected
libp2p_logger_debug("null", "Not replicating, attempt ping of %s.\n", peer->id);
if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) {
libp2p_logger_debug("null", "Attempted ping of %s failed.\n", peer->id);
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
}
}

View file

@ -26,7 +26,7 @@ int ipfs_exporter_get_node(struct IpfsNode* local_node, const unsigned char* has
unsigned char *buffer = NULL;
size_t buffer_size = 0;
int retVal = 0;
struct Libp2pMessage* msg = NULL;
struct KademliaMessage* msg = NULL;
if (local_node->routing->GetValue(local_node->routing, hash, hash_size, (void**)&buffer, &buffer_size)) {
libp2p_logger_debug("exporter", "get_node got a value. Converting it to a HashtableNode\n");

View file

@ -137,7 +137,7 @@ struct HashtableNode* ipfs_resolver_remote_get(const char* path, struct Hashtabl
struct Stream* stream = libp2p_net_multistream_connect(ip, port);
free(ip);
// build the request
struct Libp2pMessage* message = libp2p_message_new();
struct KademliaMessage* message = libp2p_message_new();
message->message_type = MESSAGE_TYPE_GET_VALUE;
message->key = key;
message->key_size = strlen(key);

View file

@ -32,6 +32,15 @@ struct IpfsNode {
* @returns true(1) on success
*/
int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node);
/***
* build an offline IpfsNode
* @param repo_path where the IPFS repository directory is
* @param node the completed IpfsNode struct
* @returns true(1) on success
*/
int ipfs_node_offline_new(const char* repo_path, struct IpfsNode** node);
/***
* Free resources from the creation of an IpfsNode
* @param node the node to free

View file

@ -3,9 +3,6 @@
#include "lmdb.h"
#include "libp2p/db/datastore.h"
static const char* DATASTORE_DB = "DATASTORE";
static const char* JOURNAL_DB = "JOURNAL";
struct lmdb_trans_cursor {
MDB_txn* transaction;
MDB_cursor* cursor;

View file

@ -18,9 +18,10 @@
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) {
if (incoming_size < 8)
return 0;
char* result = strstr((char*)incoming, "/ipfs/journal/1.0.0");
char* result = strstr((char*)incoming, "/ipfs/journalio/1.0.0");
if(result == NULL || result != (char*)incoming)
return 0;
libp2p_logger_debug("journal", "Handling incoming message.\n");
return 1;
}
@ -58,11 +59,14 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I
struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
struct Libp2pVector* vector = libp2p_utils_vector_new(1);
if (vector != NULL) {
void* cursor;
if (!repo_journalstore_cursor_open(database, &cursor))
void* cursor = NULL;
if (!repo_journalstore_cursor_open(database, &cursor)) {
libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n");
return NULL;
}
struct JournalRecord* rec = NULL;
if (!repo_journalstore_cursor_get(database, cursor, CURSOR_LAST, &rec)) {
libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n");
libp2p_utils_vector_free(vector);
repo_journalstore_cursor_close(database, cursor);
return NULL;
@ -70,13 +74,17 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
// we've got one, now start the loop
int i = 0;
do {
libp2p_logger_debug("journal", "Adding record to the vector.\n");
libp2p_utils_vector_add(vector, rec);
if (!repo_journalstore_cursor_get(database, cursor, CURSOR_PREVIOUS, &rec)) {
break;
}
i++;
} while(i < n);
libp2p_logger_debug("journal", "Closing journalstore cursor.\n");
repo_journalstore_cursor_close(database, cursor);
} else {
libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n");
}
return vector;
}
@ -103,11 +111,11 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st
if (!ipfs_journal_message_encode(message, &msg[0], msg_size, &msg_size))
return 0;
// send the header
char* header = "/ipfs/journalio/1.0.0/n";
if (!peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, (unsigned char*)header, strlen(header)))
char* header = "/ipfs/journalio/1.0.0\n";
if (!peer->sessionContext->default_stream->write(peer->sessionContext, (unsigned char*)header, strlen(header)))
return 0;
// send the message
return peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, msg, msg_size);
return peer->sessionContext->default_stream->write(peer->sessionContext, msg, msg_size);
}
/***
@ -116,16 +124,28 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer) {
libp2p_logger_debug("journal", "Attempting replication for peer %s.\n", libp2p_peer_id_to_string(replication_peer->peer));
// get the real peer object from the peersstore
struct Libp2pPeer *peer = libp2p_peerstore_get_peer(local_node->peerstore, (unsigned char*)replication_peer->peer->id, replication_peer->peer->id_size);
if (peer == NULL) {
libp2p_logger_error("journal", "Unable to find peer %s in peerstore.\n", libp2p_peer_id_to_string(replication_peer->peer));
return 0;
}
// make sure we're connected securely
if (replication_peer->peer->is_local)
if (peer->is_local) {
libp2p_logger_debug("journal", "Cannot replicate a local peer.\n");
return 0;
if (replication_peer->peer->sessionContext->secure_stream == NULL)
}
if (peer->sessionContext == NULL || peer->sessionContext->secure_stream == NULL) {
libp2p_logger_debug("journal", "Cannot replicate over an insecure stream.\n");
return 0;
}
// grab the last 10? files
struct Libp2pVector* journal_records = ipfs_journal_get_last(local_node->repo->config->datastore, 10);
if (journal_records == NULL || journal_records->total == 0) {
// nothing to do
libp2p_logger_debug("journal", "There are no journal records to process.\n");
return 1;
}
// build the message
@ -152,7 +172,8 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
}
// send the message
message->current_epoch = os_utils_gmtime();
int retVal = ipfs_journal_send_message(local_node, replication_peer->peer, message);
libp2p_logger_debug("journal", "Sending message to %s.\n", peer->id);
int retVal = ipfs_journal_send_message(local_node, peer, message);
if (retVal) {
replication_peer->lastConnect = message->current_epoch;
replication_peer->lastJournalTime = message->end_epoch;
@ -193,6 +214,13 @@ int ipfs_journal_todo_free(struct JournalToDo *in) {
return 1;
}
/***
* Loop through the incoming message, looking for what may need to change
* @param local_node the context
* @param incoming the incoming JournalMessage
* @param todo_vector a Libp2pVector that gets allocated and filled with JournalToDo structs
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage* incoming, struct Libp2pVector** todo_vector) {
*todo_vector = libp2p_utils_vector_new(1);
if (*todo_vector == NULL)
@ -239,10 +267,29 @@ int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage*
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
// remove protocol
uint8_t *incoming_pos = (uint8_t*) incoming;
size_t pos_size = incoming_size;
int second_read = 0;
for(int i = 0; i < incoming_size; i++) {
if (incoming[i] == '\n') {
if (incoming_size > i + 1) {
incoming_pos = (uint8_t *)&incoming[i+1];
pos_size = incoming_size - i;
break;
} else {
// read next segment from network
if (!session_context->default_stream->read(session_context, &incoming_pos, &pos_size, 10))
return -1;
second_read = 1;
}
}
}
libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id);
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
// un-protobuf the message
struct JournalMessage* message = NULL;
if (!ipfs_journal_message_decode(incoming, incoming_size, &message))
if (!ipfs_journal_message_decode(incoming_pos, pos_size, &message))
return -1;
// see if the remote's time is within 5 minutes of now
unsigned long long start_time = os_utils_gmtime();
@ -251,17 +298,42 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
// if it is positive, our clock is faster than theirs.
if ( llabs(our_time_diff) > 300) {
libp2p_logger_error("journal", "The clock of peer %s is out of 5 minute range. Seconds difference: %llu", session_context->remote_peer_id, our_time_diff);
if (second_read) {
free(incoming_pos);
}
return -1;
}
// TODO: get our records for the same period
// TODO: compare the two sets of records
// we will build a list of todo items:
// ask for files
// adjust time on files
// notify remote that we have files that they probably do not have
struct Libp2pVector* todo_vector = NULL;
ipfs_journal_build_todo(local_node, message, &todo_vector);
// set new values in their ReplicationPeer struct
// loop through todo items, and do the right thing
for(int i = 0; i < todo_vector->total; i++) {
struct JournalToDo *curr = (struct JournalToDo*) libp2p_utils_vector_get(todo_vector, i);
switch (curr->action) {
case (JOURNAL_ENTRY_NEEDED): {
// go get a file
struct Block* block = NULL;
struct Cid* cid = ipfs_cid_new(0, curr->hash, curr->hash_size, CID_PROTOBUF);
if (local_node->exchange->GetBlock(local_node->exchange, cid, &block)) {
// set timestamp
}
ipfs_cid_free(cid);
ipfs_block_free(block);
}
break;
case (JOURNAL_TIME_ADJUST): {
}
break;
case (JOURNAL_REMOTE_NEEDS): {
}
break;
}
}
//TODO: set new values in their ReplicationPeer struct
if (second_read)
free(incoming_pos);
return 1;
}

View file

@ -2,6 +2,7 @@
#include "multiaddr/multiaddr.h"
#include "libp2p/utils/linked_list.h"
#include "libp2p/utils/logger.h"
#include "ipfs/repo/config/replication.h"
/**
@ -79,11 +80,12 @@ struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* rep
if (replication->replication_peers != NULL) {
for(int i = 0; i < replication->replication_peers->total; i++) {
struct ReplicationPeer* confAddr = (struct ReplicationPeer*) libp2p_utils_vector_get(replication->replication_peers, i);
if (libp2p_peer_compare(confAddr->peer, key) == 0)
if (libp2p_peer_matches_id(confAddr->peer, (unsigned char*)key->id, key->id_size)) {
return confAddr;
}
}
}
}
return NULL;
}
@ -108,6 +110,7 @@ int repo_config_replication_approved_node(struct Replication* replication, struc
* @param peer the peer to examine
* @returns the time since the last replication, or the announce time if we have no record
*/
/*
unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer) {
struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer);
if (rp != NULL) {
@ -115,6 +118,7 @@ unsigned long long repo_config_replication_last_attempt(struct Replication* repl
}
return 0;
}
*/
/***
* Determine the last journal record time that was sent to this peer

View file

@ -1,6 +1,7 @@
#include <stdio.h>
#include <sys/stat.h>
#include "libp2p/utils/logger.h"
#include "libp2p/crypto/encoding/base64.h"
#include "libp2p/crypto/key.h"
#include "libp2p/peer/peer.h"
@ -486,13 +487,14 @@ int fs_repo_open_config(struct FSRepo* repo) {
// nodes list
int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Peers");
if (nodes_pos >= 0) {
nodes_pos++;
if (tokens[nodes_pos].type == JSMN_ARRAY) {
int nodes_size = tokens[nodes_pos].size;
repo->config->replication->replication_peers = libp2p_utils_vector_new(nodes_size);
nodes_pos++;
for(int i = 0; i < nodes_size; i++) {
char* val = NULL;
if (!_get_json_string_value(data, tokens, num_tokens, nodes_pos + i, NULL, &val))
if (!_get_json_string_value(data, tokens, num_tokens, nodes_pos, NULL, &val))
break;
struct MultiAddress* cur = multiaddress_new_from_string(val);
if (cur == NULL)
@ -501,10 +503,15 @@ int fs_repo_open_config(struct FSRepo* repo) {
struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur);
struct ReplicationPeer* rp = repo_config_replication_peer_new();
rp->peer = peer;
libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer));
libp2p_utils_vector_add(repo->config->replication->replication_peers, rp);
free(val);
}
} else {
libp2p_logger_debug("fs_repo", "Replication|Peers is not an array.\n");
}
} else {
libp2p_logger_debug("fs_repo", "No replication peers found.\n");
}
}
// free the memory used reading the json file

View file

@ -101,7 +101,7 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0)
return 0;
if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
mdb_txn_commit(mdb_txn);
return 0;
}
@ -150,7 +150,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
if (retVal != 0)
return 0;
retVal = mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi);
retVal = mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi);
if (retVal != 0)
return 0;
@ -247,7 +247,7 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
return 0;
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
mdb_txn_commit(mdb_txn);
return 0;
}

View file

@ -1,9 +1,11 @@
#include <string.h>
#include "varint.h"
#include "lmdb.h"
#include "libp2p/utils/logger.h"
#include "ipfs/repo/fsrepo/journalstore.h"
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
#include "lmdb.h"
#include "varint.h"
#include <string.h>
int journal_record_free(struct JournalRecord* rec) {
if (rec != NULL) {
@ -40,7 +42,7 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp
// open the journal table
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
return 0;
}
@ -72,10 +74,13 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
*crsr = malloc(sizeof(struct lmdb_trans_cursor));
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr;
// open transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) {
libp2p_logger_error("lmdb_journalstore", "Unable to start a transaction.\n");
return 0;
}
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
libp2p_logger_error("lmdb_journalstore", "Unable to open the dbi to the journalstore");
mdb_txn_commit(mdb_txn);
return 0;
}
@ -85,7 +90,11 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
return 0;
}
return 1;
} else {
libp2p_logger_error("lmdb_journalstore", "Attempted to open a new cursor but there is something already there.\n");
}
} else {
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor on null db handle.\n");
}
return 0;

View file

@ -1,4 +1,5 @@
#include <stdlib.h>
#include <math.h>
#include "ipfs/routing/routing.h"
#include "ipfs/core/null.h"
@ -20,10 +21,11 @@
* @param message what to send
* @returns what was received
*/
struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionContext* sessionContext, struct Libp2pMessage* message) {
struct KademliaMessage* ipfs_routing_online_send_receive_message(struct SessionContext* sessionContext, struct KademliaMessage* message) {
size_t protobuf_size = 0, results_size = 0;
unsigned char* protobuf = NULL, *results = NULL;
struct Libp2pMessage* return_message = NULL;
struct KademliaMessage* return_message = NULL;
//unsigned char* protocol = (unsigned char*)"/ipfs/kad/1.0.0\n";
protobuf_size = libp2p_message_protobuf_encode_size(message);
protobuf = (unsigned char*)malloc(protobuf_size);
@ -35,6 +37,10 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon
goto exit;
}
//if (!sessionContext->default_stream->write(sessionContext, protocol, strlen( (char*)protocol))) {
// libp2p_logger_error("online", "Unable to switch to kademlia protocol.\n");
//}
// send the message, and expect the same back
if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) {
libp2p_logger_error("online", "Attempted to write to Kademlia stream, but could not.\n");
@ -45,10 +51,30 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon
}
if (results_size == 0) {
libp2p_logger_error("online", "reading kademlia response returned nothing.\n");
libp2p_logger_error("online", "reading kademlia header response returned nothing.\n");
goto exit;
}
// remove protocol
uint8_t *pos = &results[0];
size_t pos_length = results_size;
int results_max = fmin(results_size, 30);
for (int i = 0; i < results_max; i++) {
if (results[i] == '\n') {
if (i < results_size - 1) {
// there's more left
pos = &results[i];
pos_length = results_size - i;
} else {
// we've run out of buffer. See if we have more on the network.
if (!sessionContext->default_stream->read(sessionContext, &results, &results_size, 5)) {
// we don't have more. This is a problem.
libp2p_logger_error("online", "Reading kademlia response returned nothing.\n");
}
}
}
}
// see if we can unprotobuf
if (!libp2p_message_protobuf_decode(results, results_size, &return_message)) {
libp2p_logger_error("online", "Received kademlia response, but cannot decode it.\n");
@ -75,7 +101,7 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon
int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const unsigned char* key, size_t key_size, struct Libp2pVector** peers) {
int found = 0;
// build the message to be transmitted
struct Libp2pMessage* message = libp2p_message_new();
struct KademliaMessage* message = libp2p_message_new();
message->message_type = MESSAGE_TYPE_GET_PROVIDERS;
message->key_size = key_size;
message->key = malloc(message->key_size);
@ -87,7 +113,7 @@ int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// Ask for hash, if it has it, break out of the loop and stop looking
libp2p_logger_debug("online", "FindRemoteProviders: Asking for who can provide\n");
struct Libp2pMessage* return_message = ipfs_routing_online_send_receive_message(peer->sessionContext, message);
struct KademliaMessage* return_message = ipfs_routing_online_send_receive_message(peer->sessionContext, message);
if (return_message != NULL && return_message->provider_peer_head != NULL) {
libp2p_logger_debug("online", "FindRemoteProviders: Return value is not null\n");
found = 1;
@ -163,7 +189,7 @@ int ipfs_routing_online_find_providers(struct IpfsRouting* routing, const unsign
*/
int ipfs_routing_online_ask_peer_for_peer(struct Libp2pPeer* whoToAsk, const unsigned char* peer_id, size_t peer_id_size, struct Libp2pPeer **result) {
int retVal = 0;
struct Libp2pMessage *message = NULL, *return_message = NULL;
struct KademliaMessage *message = NULL, *return_message = NULL;
if (whoToAsk->connection_type == CONNECTION_TYPE_CONNECTED) {
message = libp2p_message_new();
@ -261,7 +287,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char
struct Libp2pPeer* local_peer = ipfs_routing_online_build_local_peer(routing);
// create the message
struct Libp2pMessage* msg = libp2p_message_new();
struct KademliaMessage* msg = libp2p_message_new();
msg->key_size = key_size;
msg->key = malloc(msg->key_size);
memcpy(msg->key, key, msg->key_size);
@ -280,7 +306,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char
// notify everyone we're connected to
if (current_peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// ignoring results is okay this time
struct Libp2pMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->sessionContext, msg);
struct KademliaMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->sessionContext, msg);
if (rslt != NULL)
libp2p_message_free(rslt);
}
@ -301,7 +327,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) {
struct Libp2pMessage *outMsg = NULL, *inMsg = NULL;
struct KademliaMessage *outMsg = NULL, *inMsg = NULL;
int retVal = 0;
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
@ -348,14 +374,14 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee
*/
int ipfs_routing_online_get_peer_value(ipfs_routing* routing, const struct Libp2pPeer* peer, const unsigned char* key, size_t key_size, void** buffer, size_t *buffer_size) {
// build message
struct Libp2pMessage* msg = libp2p_message_new();
struct KademliaMessage* msg = libp2p_message_new();
msg->key_size = key_size;
msg->key = malloc(msg->key_size);
memcpy(msg->key, key, msg->key_size);
msg->message_type = MESSAGE_TYPE_GET_VALUE;
// send message and receive results
struct Libp2pMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->sessionContext, msg);
struct KademliaMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->sessionContext, msg);
libp2p_message_free(msg);
if (ret_msg == NULL)
@ -500,7 +526,9 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) {
return -1; // this should never happen
}
if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier)
libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 5);
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 2)) {
libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer));
}
}
}
}

View file

@ -39,6 +39,7 @@
"ResolveCacheSize": 128
},
"Bootstrap": [
"/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5"
],
"Tour": {
"Last": ""
@ -68,8 +69,8 @@
"AddrFilters": null
},
"Replication": {
"AnnounceMinutes": "60",
"Nodes": [
"AnnounceMinutes": 60,
"Peers": [
"/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5"
]
}

View file

@ -39,6 +39,7 @@
"ResolveCacheSize": 128
},
"Bootstrap": [
"/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4"
],
"Tour": {
"Last": ""
@ -68,8 +69,8 @@
"AddrFilters": null
},
"Replication": {
"AnnounceMinutes": "60",
"Nodes": [
"AnnounceMinutes": 60,
"Peers": [
"/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4"
]
}

View file

@ -19,7 +19,7 @@
int test_ping() {
int retVal = 0;
struct FSRepo* fs_repo = NULL;
struct Libp2pMessage* message = NULL;
struct KademliaMessage* message = NULL;
//struct IpfsNode local_node;
struct Libp2pPeer* remote_peer = NULL;
struct Dialer* dialer = NULL;

View file

@ -82,6 +82,16 @@ int test_journal_server_1() {
struct FSRepo* fs_repo = NULL;
libp2p_logger_add_class("test_journal");
libp2p_logger_add_class("journal");
libp2p_logger_add_class("daemon");
libp2p_logger_add_class("online");
libp2p_logger_add_class("peer");
//libp2p_logger_add_class("null");
libp2p_logger_add_class("replication");
libp2p_logger_add_class("fs_repo");
libp2p_logger_add_class("lmdb_journalstore");
libp2p_logger_add_class("secio");
libp2p_logger_add_class("socket");
if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) {
ipfs_repo_fsrepo_free(fs_repo);
@ -93,9 +103,24 @@ int test_journal_server_1() {
ipfs_repo_fsrepo_free(fs_repo);
// add some files to the datastore
uint8_t *bytes = (unsigned char*)"hello, world!\n";
char* filename = "test1.txt";
create_file(filename, bytes, strlen((char*)bytes));
struct HashtableNode* node;
size_t bytes_written;
struct IpfsNode *local_node = NULL;
ipfs_node_offline_new(ipfs_path, &local_node);
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
ipfs_node_free(local_node);
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n");
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
thread_started = 1;
sleep(45);
retVal = 1;
exit:
ipfs_daemon_stop();
@ -118,6 +143,15 @@ int test_journal_server_2() {
struct FSRepo* fs_repo = NULL;
libp2p_logger_add_class("test_journal");
libp2p_logger_add_class("journal");
libp2p_logger_add_class("daemon");
libp2p_logger_add_class("online");
libp2p_logger_add_class("peer");
//libp2p_logger_add_class("null");
libp2p_logger_add_class("replication");
libp2p_logger_add_class("fs_repo");
libp2p_logger_add_class("lmdb_journalstore");
libp2p_logger_add_class("secio");
if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) {
ipfs_repo_fsrepo_free(fs_repo);
@ -129,9 +163,13 @@ int test_journal_server_2() {
ipfs_repo_fsrepo_free(fs_repo);
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n");
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
thread_started = 1;
sleep(30);
retVal = 1;
exit:
ipfs_daemon_stop();