journalio working, needs tuning

This commit is contained in:
jmjatlanta 2017-08-31 06:41:54 -05:00
parent 49bd61feb1
commit 1eab27fa0e
21 changed files with 205 additions and 24 deletions

View file

@ -226,6 +226,25 @@ int ipfs_cid_hash_to_base58(const unsigned char* hash, size_t hash_length, unsig
return 1;
}
/***
* Turn the hash of this CID into a c string
* @param cid the cid
* @param result a place to allocate and store the string
* @returns a pointer to the string (*result) or NULL if there was a problem
*/
char* ipfs_cid_to_string(const struct Cid* cid, char **result) {
size_t str_len = libp2p_crypto_encoding_base58_encode_size(cid->hash_length) + 1;
char *str = (char*) malloc(str_len);
*result = str;
if (str != NULL) {
if (!libp2p_crypto_encoding_base58_encode(cid->hash, cid->hash_length, (unsigned char**)&str, &str_len)) {
free(str);
str = NULL;
}
}
return str;
}
/***
* Turn a multibase decoded string of bytes into a Cid struct
* @param incoming the multibase decoded array

View file

@ -130,13 +130,10 @@ void ipfs_null_connection (void *ptr) {
}
int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* peer) {
libp2p_logger_debug("null", "Attempting maintenance on peer %s.\n", libp2p_peer_id_to_string(peer));
if (peer == NULL) {
libp2p_logger_debug("null", "No maintenance ran on NULL node.\n");
return 0;
}
if (peer->is_local) {
libp2p_logger_debug("null", "No maintenance ran on local node.\n");
return 1;
}
// Is this peer one of our backup partners?
@ -145,10 +142,10 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee
// If so, has there been enough time since the last attempt a backup?
if (replication_peer != NULL) {
announce_secs -= os_utils_gmtime() - replication_peer->lastConnect;
libp2p_logger_debug("null", "Found replication peer. Announce secs are %lld.\n", announce_secs);
libp2p_logger_debug("null", "Checking to see if we should send backup notification to peer %s. Time since last backup: %lld.\n", libp2p_peer_id_to_string(replication_peer->peer), announce_secs);
}
// should we attempt to connect if we're not already?
if (replication_peer != NULL && announce_secs < 0) {
if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) {
// try to connect if we aren't already
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 2)) {
@ -156,14 +153,17 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee
}
}
// attempt a backup, don't forget to reset timer
libp2p_logger_debug("null", "Attempting a sync of node %s.\n", peer->id);
libp2p_logger_debug("null", "Attempting a sync of node %s.\n", libp2p_peer_id_to_string(peer));
ipfs_journal_sync(local_node, replication_peer);
libp2p_logger_debug("null", "Sync message sent. Maintenance complete for node %s.\n", libp2p_peer_id_to_string(peer));
} else {
// try a ping, but only if we're connected
libp2p_logger_debug("null", "Not replicating, attempt ping of %s.\n", peer->id);
if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) {
libp2p_logger_debug("null", "Attempted ping of %s failed.\n", peer->id);
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
if (peer->sessionContext != NULL && os_utils_gmtime() - peer->sessionContext->last_comm_epoch > 180) {
// try a ping, but only if we're connected
libp2p_logger_debug("null", "Attempting ping of %s.\n", peer->id);
if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) {
libp2p_logger_debug("null", "Attempted ping of %s failed.\n", peer->id);
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
}
}
}
return 1;

View file

@ -25,9 +25,20 @@ int ipfs_bitswap_shutdown_handler(void* context) {
return 1;
}
/***
* Handles the message
* @param incoming the incoming data buffer
* @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection
* @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int ipfs_bitswap_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
return ipfs_bitswap_network_handle_message(local_node, session_context, incoming, incoming_size);
int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, incoming, incoming_size);
if (retVal == 0)
return -1;
return retVal;
}
struct Libp2pProtocolHandler* ipfs_bitswap_build_protocol_handler(const struct IpfsNode* local_node) {
@ -69,6 +80,7 @@ struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) {
exchange->Close = ipfs_bitswap_close;
exchange->HasBlock = ipfs_bitswap_has_block;
exchange->GetBlock = ipfs_bitswap_get_block;
exchange->GetBlockAsync = ipfs_bitswap_get_block_async;
exchange->GetBlocks = ipfs_bitswap_get_blocks;
// Start the threads for the network
@ -130,6 +142,7 @@ int ipfs_bitswap_has_block(struct Exchange* exchange, struct Block* block) {
// add the block to the blockstore
struct BitswapContext* context = exchange->exchangeContext;
context->ipfsNode->blockstore->Put(context->ipfsNode->blockstore->blockstoreContext, block);
context->ipfsNode->repo->config->datastore->datastore_put(block->cid->hash, block->cid->hash_length, block->data, block->data_length, context->ipfsNode->repo->config->datastore);
// update requests
struct WantListQueueEntry* queueEntry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, block->cid);
if (queueEntry != NULL) {
@ -159,10 +172,10 @@ int ipfs_bitswap_get_block(struct Exchange* exchange, struct Cid* cid, struct Bl
int timeout = 60;
int waitSecs = 1;
int timeTaken = 0;
struct WantListSession wantlist_session;
wantlist_session.type = WANTLIST_SESSION_TYPE_LOCAL;
wantlist_session.context = (void*)bitswapContext->ipfsNode;
struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid, &wantlist_session);
struct WantListSession *wantlist_session = ipfs_bitswap_wantlist_session_new();
wantlist_session->type = WANTLIST_SESSION_TYPE_LOCAL;
wantlist_session->context = (void*)bitswapContext->ipfsNode;
struct WantListQueueEntry* want_entry = ipfs_bitswap_want_manager_add(bitswapContext, cid, wantlist_session);
if (want_entry != NULL) {
// loop waiting for it to fill
while(1) {
@ -189,6 +202,34 @@ int ipfs_bitswap_get_block(struct Exchange* exchange, struct Cid* cid, struct Bl
return 0;
}
/**
* Implements the Exchange->GetBlock method
* We're asking for this method to get the block from peers. Perhaps this should be
* taking in a pointer to a callback, as this could take a while (or fail).
* @param exchangeContext a BitswapContext
* @param cid the Cid to look for
* @param block a pointer to where to put the result
* @returns true(1) if found, false(0) if not
*/
int ipfs_bitswap_get_block_async(struct Exchange* exchange, struct Cid* cid, struct Block** block) {
struct BitswapContext* bitswapContext = (struct BitswapContext*)exchange->exchangeContext;
if (bitswapContext != NULL) {
// check locally first
struct Block* block;
if (bitswapContext->ipfsNode->blockstore->Get(bitswapContext->ipfsNode->blockstore->blockstoreContext, cid, &block)) {
return 1;
}
// now ask the network
struct WantListSession* wantlist_session = ipfs_bitswap_wantlist_session_new();
wantlist_session->type = WANTLIST_SESSION_TYPE_LOCAL;
wantlist_session->context = (void*)bitswapContext->ipfsNode;
ipfs_bitswap_want_manager_add(bitswapContext, cid, wantlist_session);
// TODO: return something that they can watch
return 1;
}
return 0;
}
/**
* Implements the Exchange->GetBlocks method
*/

View file

@ -16,6 +16,7 @@
* @param message the message to send
*/
int ipfs_bitswap_network_send_message(const struct BitswapContext* context, struct Libp2pPeer* peer, const struct BitswapMessage* message) {
libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer));
// get a connection to the peer
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, 10);

View file

@ -78,6 +78,7 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_add(struct WantListQueue*
entry = ipfs_bitswap_wantlist_queue_entry_new();
entry->cid = ipfs_cid_copy(cid);
entry->priority = 1;
libp2p_utils_vector_add(entry->sessionsRequesting, session);
libp2p_utils_vector_add(wantlist->queue, entry);
}
libp2p_utils_vector_add(entry->sessionsRequesting, session);
@ -214,6 +215,20 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const
}
}
/**
* Create a new WantListSession
* @returns the newly allocated WantListSession
*/
struct WantListSession* ipfs_bitswap_wantlist_session_new() {
struct WantListSession* ret = (struct WantListSession*) malloc(sizeof(struct WantListSession));
if (ret != NULL) {
ret->context = NULL;
ret->type = WANTLIST_SESSION_TYPE_LOCAL;
}
return ret;
}
/**
* determine if any of the sessions are referring to the local node
* @param sessions a vector of WantlistSession

View file

@ -295,7 +295,7 @@ int ipfs_import_file(const char* root_dir, const char* fileName, struct Hashtabl
// notify the network
struct HashtableNode *htn = *parent_node;
local_node->routing->Provide(local_node->routing, htn->hash, htn->hash_size);
// notif the network of the subnodes too
// notify the network of the subnodes too
struct NodeLink *nl = htn->head_link;
while (nl != NULL) {
local_node->routing->Provide(local_node->routing, nl->hash, nl->hash_size);

View file

@ -3,6 +3,7 @@
#include <stdlib.h>
#include "ipfs/importer/resolver.h"
#include "libp2p/utils/logger.h"
#include "libp2p/crypto/encoding/base58.h"
#include "libp2p/conn/session.h"
#include "libp2p/routing/dht_protocol.h"
@ -141,6 +142,11 @@ struct HashtableNode* ipfs_resolver_remote_get(const char* path, struct Hashtabl
message->message_type = MESSAGE_TYPE_GET_VALUE;
message->key = key;
message->key_size = strlen(key);
size_t b58size = 100;
uint8_t *b58key = (uint8_t *) malloc(b58size);
libp2p_crypto_encoding_base58_encode((unsigned char*)message->key, message->key_size, (unsigned char**) &b58key, &b58size);
libp2p_logger_debug("resolver", "Attempting to use kademlia to get key %s.\n", b58key);
free(b58key);
size_t message_protobuf_size = libp2p_message_protobuf_encode_size(message);
unsigned char message_protobuf[message_protobuf_size];
libp2p_message_protobuf_encode(message, message_protobuf, message_protobuf_size, &message_protobuf_size);

View file

@ -105,6 +105,14 @@ int ipfs_cid_decode_hash_from_base58(const unsigned char* incoming, size_t incom
*/
int ipfs_cid_hash_to_base58(const unsigned char* hash, size_t hash_length, unsigned char* buffer, size_t max_buffer_length);
/***
* Turn the hash of this CID into a c string
* @param cid the cid
* @param result a place to allocate and store the string
* @returns a pointer to the string (*result) or NULL if there was a problem
*/
char* ipfs_cid_to_string(const struct Cid* cid, char **result);
/***
* Turn a multibase decoded string of bytes into a Cid struct
* @param incoming the multibase decoded array

View file

@ -10,6 +10,7 @@
#include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/engine.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
struct Libp2pProtocolHandler* ipfs_bitswap_build_protocol_handler(const struct IpfsNode* local_node);
@ -66,6 +67,16 @@ int ipfs_bitswap_has_block(struct Exchange* exchange, struct Block* block);
*/
int ipfs_bitswap_get_block(struct Exchange* exchange, struct Cid* cid, struct Block** block);
/**
* Retrieve a block from the BitswapNetwork
*
* @param exchangeContext a pointer to a BitswapContext
* @param cid the Cid of the block we're looking for
* @param queue a pointer to the queue that will change if the block arrives
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_bitswap_get_block_async(struct Exchange* exchange, struct Cid* cid, struct Block** block);
/***
* Retrieve a collection of blocks from the BitswapNetwork
* Note: The return of false(0) means that not all blocks were found.

View file

@ -91,6 +91,12 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue
*/
int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const struct WantListSession* b);
/**
* Create a new WantListSession
* @returns the newly allocated WantListSession
*/
struct WantListSession* ipfs_bitswap_wantlist_session_new();
/**
* Called by the Bitswap engine, this processes an item on the WantListQueue
* @param context the context

View file

@ -29,6 +29,16 @@ struct Exchange {
*/
int (*GetBlock)(struct Exchange* exchange, struct Cid* cid, struct Block** block);
/**
* Retrieve a block from peers asynchronously
*
* @param context the context
* @param cid the hash of the block to retrieve
* @param queue_entry the queue entry to watch
* @returns true(1) on success, false(0) otherwise
*/
int (*GetBlockAsync)(struct Exchange* exchange, struct Cid* cid, struct Block** block);
/**
* Retrieve several blocks
* @param context the context

View file

@ -10,6 +10,7 @@ struct ReplicationPeer {
};
struct Replication {
int announce;
int announce_minutes;
struct Libp2pVector* replication_peers;
};

View file

@ -1,6 +1,7 @@
/**
* The journal protocol attempts to keep a journal in sync with other (approved) nodes
*/
#include "libp2p/crypto/encoding/base58.h"
#include "libp2p/os/utils.h"
#include "libp2p/utils/logger.h"
#include "ipfs/journal/journal.h"
@ -168,6 +169,12 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
return 0;
}
memcpy(entry->hash, rec->hash, entry->hash_size);
// debugging
size_t b58size = 100;
uint8_t *b58key = (uint8_t*) malloc(b58size);
libp2p_crypto_encoding_base58_encode(entry->hash, entry->hash_size, &b58key, &b58size);
free(b58key);
libp2p_logger_debug("journal", "Adding hash %s to JournalMessage.\n", b58key);
libp2p_utils_vector_add(message->journal_entries, entry);
}
// send the message
@ -313,7 +320,13 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
// go get a file
struct Block* block = NULL;
struct Cid* cid = ipfs_cid_new(0, curr->hash, curr->hash_size, CID_PROTOBUF);
if (local_node->exchange->GetBlock(local_node->exchange, cid, &block)) {
// debugging
char* str = NULL;
libp2p_logger_debug("journal", "Looking for block %s.\n", ipfs_cid_to_string(cid, &str));
if (str != NULL)
free(str);
if (local_node->exchange->GetBlockAsync(local_node->exchange, cid, &block)) {
// set timestamp
}
ipfs_cid_free(cid);

View file

@ -484,6 +484,7 @@ int fs_repo_open_config(struct FSRepo* repo) {
// announce minutes
curr_pos++;
_get_json_int_value(data, tokens, num_tokens, curr_pos, "AnnounceMinutes", &repo->config->replication->announce_minutes);
_get_json_int_value(data, tokens, num_tokens, curr_pos, "Announce", &repo->config->replication->announce);
// nodes list
int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Peers");
if (nodes_pos >= 0) {

View file

@ -12,6 +12,7 @@
#include "lmdb.h"
#include "libp2p/utils/logger.h"
#include "libp2p/crypto/encoding/base58.h"
#include "libp2p/os/utils.h"
#include "libp2p/db/datastore.h"
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
@ -76,7 +77,7 @@ int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct Datastore
}
memcpy(rec->value, &value->mv_data[varint_size], rec->value_size);
}
return 0;
return 1;
}
/***
@ -97,6 +98,13 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas
if (mdb_env == NULL)
return 0;
// debug
size_t b58size = 100;
uint8_t *b58key = (uint8_t *) malloc(b58size);
libp2p_crypto_encoding_base58_encode(key, key_size, &b58key, &b58size);
libp2p_logger_debug("lmdb_datastore", "Looking for key %s in datastore.\n", b58key);
free(b58key);
// open transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0)
return 0;
@ -177,8 +185,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
} else {
if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip.
retVal = 1;
else
else {
libp2p_logger_error("lmdb_datastore", "mdb_put returned %d.\n", retVal);
retVal = 0;
}
}
// cleanup

View file

@ -3,6 +3,7 @@
#include "varint.h"
#include "lmdb.h"
#include "libp2p/utils/logger.h"
#include "libp2p/crypto/encoding/base58.h"
#include "ipfs/repo/fsrepo/journalstore.h"
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
@ -40,6 +41,13 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp
record[0] = 1;
memcpy(&record[1], hash, hash_size);
// debug
size_t b58size = 100;
uint8_t *b58key = (uint8_t *) malloc(b58size);
libp2p_crypto_encoding_base58_encode(hash, hash_size, &b58key, &b58size);
libp2p_logger_debug("lmdb_journalstore", "Adding hash %s to journalstore.\n", b58key);
free(b58key);
// open the journal table
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {

View file

@ -1,6 +1,7 @@
#include <stdlib.h>
#include <math.h>
#include "libp2p/crypto/encoding/base58.h"
#include "ipfs/routing/routing.h"
#include "ipfs/core/null.h"
#include "libp2p/record/message.h"
@ -106,6 +107,11 @@ int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const
message->key_size = key_size;
message->key = malloc(message->key_size);
memcpy(message->key, key, message->key_size);
size_t b58size = 100;
uint8_t *b58key = (uint8_t *) malloc(b58size);
libp2p_crypto_encoding_base58_encode((unsigned char*)message->key, message->key_size, (unsigned char**) &b58key, &b58size);
libp2p_logger_debug("online", "find_remote_providers looking for key %s.\n", b58key);
free(b58key);
// loop through the connected peers, asking for the hash
struct Libp2pLinkedList* current_entry = routing->local_node->peerstore->head_entry;
while (current_entry != NULL) {

View file

@ -5,7 +5,7 @@
},
"Datastore": {
"Type": "lmdb",
"Path": "/Users/JohnJones/.ipfs/datastore",
"Path": "/tmp/ipfs_1/datastore",
"StorageMax": "10GB",
"StorageGCWatermark": 90,
"GCPeriod": "1h",
@ -69,6 +69,7 @@
"AddrFilters": null
},
"Replication": {
"Announce": 1,
"AnnounceMinutes": 60,
"Peers": [
"/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5"

View file

@ -5,7 +5,7 @@
},
"Datastore": {
"Type": "lmdb",
"Path": "/tmp/ipfstest1/datastore",
"Path": "/tmp/ipfs_2/datastore",
"StorageMax": "10GB",
"StorageGCWatermark": 90,
"GCPeriod": "1h",
@ -69,6 +69,7 @@
"AddrFilters": null
},
"Replication": {
"Announce": 1,
"AnnounceMinutes": 60,
"Peers": [
"/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4"

View file

@ -86,12 +86,19 @@ int test_journal_server_1() {
libp2p_logger_add_class("daemon");
libp2p_logger_add_class("online");
libp2p_logger_add_class("peer");
//libp2p_logger_add_class("null");
libp2p_logger_add_class("null");
libp2p_logger_add_class("replication");
libp2p_logger_add_class("fs_repo");
libp2p_logger_add_class("lmdb_journalstore");
libp2p_logger_add_class("lmdb_datastore");
libp2p_logger_add_class("secio");
libp2p_logger_add_class("socket");
libp2p_logger_add_class("protocol");
libp2p_logger_add_class("dht_protocol");
libp2p_logger_add_class("resolver");
libp2p_logger_add_class("unixfs");
libp2p_logger_add_class("bitswap_engine");
libp2p_logger_add_class("bitswap_network");
if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) {
ipfs_repo_fsrepo_free(fs_repo);
@ -147,11 +154,18 @@ int test_journal_server_2() {
libp2p_logger_add_class("daemon");
libp2p_logger_add_class("online");
libp2p_logger_add_class("peer");
//libp2p_logger_add_class("null");
libp2p_logger_add_class("null");
libp2p_logger_add_class("replication");
libp2p_logger_add_class("fs_repo");
libp2p_logger_add_class("lmdb_journalstore");
libp2p_logger_add_class("lmdb_datastore");
libp2p_logger_add_class("secio");
libp2p_logger_add_class("protocol");
libp2p_logger_add_class("dht_protocol");
libp2p_logger_add_class("resolver");
libp2p_logger_add_class("unixfs");
libp2p_logger_add_class("bitswap_engine");
libp2p_logger_add_class("bitswap_network");
if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) {
ipfs_repo_fsrepo_free(fs_repo);

View file

@ -6,7 +6,9 @@
#include <stdlib.h>
#include <string.h>
#include "libp2p/crypto/encoding/base58.h"
#include "libp2p/crypto/sha256.h"
#include "libp2p/utils/logger.h"
#include "ipfs/unixfs/unixfs.h"
#include "protobuf.h"
#include "varint.h"
@ -136,6 +138,13 @@ int ipfs_unixfs_add_data(unsigned char* data, size_t data_length, struct UnixFS*
return 0;
}
// debug: display hash
size_t b58size = 100;
uint8_t *b58key = (uint8_t *) malloc(b58size);
libp2p_crypto_encoding_base58_encode(unix_fs->hash, unix_fs->hash_length, &b58key, &b58size);
libp2p_logger_debug("unixfs", "Saving hash of %s to unixfs object.\n", b58key);
free(b58key);
return 1;
}