Implemented client side of journaling protocol
This commit is contained in:
parent
60c6085469
commit
1b69cdf1e8
6 changed files with 210 additions and 26 deletions
45
core/null.c
45
core/null.c
|
@ -12,17 +12,19 @@
|
||||||
#include "libp2p/net/p2pnet.h"
|
#include "libp2p/net/p2pnet.h"
|
||||||
#include "libp2p/net/protocol.h"
|
#include "libp2p/net/protocol.h"
|
||||||
#include "libp2p/nodeio/nodeio.h"
|
#include "libp2p/nodeio/nodeio.h"
|
||||||
|
#include "libp2p/os/utils.h"
|
||||||
#include "libp2p/record/message.h"
|
#include "libp2p/record/message.h"
|
||||||
#include "libp2p/routing/dht_protocol.h"
|
#include "libp2p/routing/dht_protocol.h"
|
||||||
#include "libp2p/secio/secio.h"
|
#include "libp2p/secio/secio.h"
|
||||||
#include "libp2p/utils/logger.h"
|
#include "libp2p/utils/logger.h"
|
||||||
#include "ipfs/core/daemon.h"
|
#include "ipfs/core/daemon.h"
|
||||||
#include "ipfs/routing/routing.h"
|
|
||||||
#include "ipfs/core/ipfs_node.h"
|
#include "ipfs/core/ipfs_node.h"
|
||||||
|
#include "ipfs/exchange/bitswap/network.h"
|
||||||
|
#include "ipfs/journal/journal.h"
|
||||||
#include "ipfs/merkledag/merkledag.h"
|
#include "ipfs/merkledag/merkledag.h"
|
||||||
#include "ipfs/merkledag/node.h"
|
#include "ipfs/merkledag/node.h"
|
||||||
|
#include "ipfs/routing/routing.h"
|
||||||
#include "ipfs/util/thread_pool.h"
|
#include "ipfs/util/thread_pool.h"
|
||||||
#include "ipfs/exchange/bitswap/network.h"
|
|
||||||
|
|
||||||
#define BUF_SIZE 4096
|
#define BUF_SIZE 4096
|
||||||
|
|
||||||
|
@ -127,6 +129,38 @@ void ipfs_null_connection (void *ptr) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* peer) {
|
||||||
|
if (peer == NULL)
|
||||||
|
return 0;
|
||||||
|
if (peer->is_local)
|
||||||
|
return 1;
|
||||||
|
// Is this peer one of our backup partners?
|
||||||
|
struct ReplicationPeer* replication_peer = repo_config_get_replication_peer(local_node->repo->config->replication, peer);
|
||||||
|
long long announce_secs = local_node->repo->config->replication->announce_minutes * 60;
|
||||||
|
// If so, has there been enough time since the last attempt a backup?
|
||||||
|
if (replication_peer != NULL) {
|
||||||
|
announce_secs -= os_utils_gmtime() - repo_config_replication_last_attempt(local_node->repo->config->replication, peer);
|
||||||
|
}
|
||||||
|
// should we attempt to connect if we're not already?
|
||||||
|
if (replication_peer != NULL && announce_secs < 0) {
|
||||||
|
// try to connect if we aren't already
|
||||||
|
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
|
||||||
|
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 10)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// attempt a backup, don't forget to reset timer
|
||||||
|
ipfs_journal_sync(local_node, replication_peer);
|
||||||
|
} else {
|
||||||
|
// try a ping, but only if we're connected
|
||||||
|
if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) {
|
||||||
|
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* Called by the daemon to listen for connections
|
* Called by the daemon to listen for connections
|
||||||
* @param ptr a pointer to an IpfsNodeListenParams struct
|
* @param ptr a pointer to an IpfsNodeListenParams struct
|
||||||
|
@ -187,12 +221,7 @@ void* ipfs_null_listen (void *ptr)
|
||||||
} else {
|
} else {
|
||||||
// timeout... do maintenance
|
// timeout... do maintenance
|
||||||
struct PeerEntry* entry = current_peer_entry->item;
|
struct PeerEntry* entry = current_peer_entry->item;
|
||||||
if (current_peer_entry != NULL && !entry->peer->is_local && entry->peer->connection_type == CONNECTION_TYPE_CONNECTED) {
|
ipfs_null_do_maintenance(listen_param->local_node, entry->peer);
|
||||||
libp2p_logger_debug("null", "Attempting to ping %s.\n", entry->peer->id);
|
|
||||||
if (!listen_param->local_node->routing->Ping(listen_param->local_node->routing, entry->peer)) {
|
|
||||||
entry->peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (current_peer_entry != NULL)
|
if (current_peer_entry != NULL)
|
||||||
current_peer_entry = current_peer_entry->next;
|
current_peer_entry = current_peer_entry->next;
|
||||||
if (current_peer_entry == NULL)
|
if (current_peer_entry == NULL)
|
||||||
|
|
|
@ -44,8 +44,7 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* Send a journal message to a remote peer
|
* Send a journal message to a remote peer
|
||||||
* @param local_node the local node
|
* @param replication_peer the peer to send it to
|
||||||
* @param peer the peer to send it to
|
|
||||||
* @returns true(1) on success, false(0) otherwise.
|
* @returns true(1) on success, false(0) otherwise.
|
||||||
*/
|
*/
|
||||||
int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer);
|
int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer);
|
||||||
|
|
|
@ -1,10 +1,32 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
#include "libp2p/utils/vector.h"
|
#include "libp2p/utils/vector.h"
|
||||||
|
#include "libp2p/peer/peer.h"
|
||||||
|
|
||||||
|
struct ReplicationPeer {
|
||||||
|
struct Libp2pPeer* peer;
|
||||||
|
unsigned long long lastConnect;
|
||||||
|
unsigned long long lastJournalTime;
|
||||||
|
};
|
||||||
|
|
||||||
struct Replication {
|
struct Replication {
|
||||||
int announce_minutes;
|
int announce_minutes;
|
||||||
struct Libp2pVector* nodes;
|
struct Libp2pVector* replication_peers;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a new ReplicationPeer struct
|
||||||
|
* @returns the new struct
|
||||||
|
*/
|
||||||
|
struct ReplicationPeer* repo_config_replication_peer_new();
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Free the resources of a ReplicationPeer
|
||||||
|
* @param rp the ReplicationPeer to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int repo_config_replication_peer_free(struct ReplicationPeer* rp);
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* allocate memory and initialize the replication struct
|
* allocate memory and initialize the replication struct
|
||||||
* @param replication a pointer to the struct to be allocated
|
* @param replication a pointer to the struct to be allocated
|
||||||
|
@ -18,3 +40,36 @@ int repo_config_replication_new(struct Replication** replication);
|
||||||
* @returns true(1);
|
* @returns true(1);
|
||||||
*/
|
*/
|
||||||
int repo_config_replication_free(struct Replication* replication);
|
int repo_config_replication_free(struct Replication* replication);
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determines if this peer is one of our approved replication nodes
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns true(1) if this peer should be kept syncronized, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int repo_config_replication_approved_node(struct Replication* replication, struct Libp2pPeer* peer);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine the last time a replication was successful for this peer (at least we sent it something without error)
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns the time since the last replication, or the announce time if we have no record
|
||||||
|
*/
|
||||||
|
unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer);
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determine the last journal record time that was sent to this peer
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns the time of the last journal entry successfully sent
|
||||||
|
*/
|
||||||
|
unsigned long long repo_config_replication_last_journal_time(struct Replication* replication, struct Libp2pPeer* peer);
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Given a peer, return the ReplicationPeer record
|
||||||
|
* @param replication the context
|
||||||
|
* @param key the peer to look for
|
||||||
|
* @returns the ReplicationPeer struct, or NULL if not found
|
||||||
|
*/
|
||||||
|
struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* replication, struct Libp2pPeer* key);
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
#include "ipfs/journal/journal_message.h"
|
#include "ipfs/journal/journal_message.h"
|
||||||
#include "ipfs/journal/journal_entry.h"
|
#include "ipfs/journal/journal_entry.h"
|
||||||
#include "ipfs/repo/fsrepo/journalstore.h"
|
#include "ipfs/repo/fsrepo/journalstore.h"
|
||||||
|
#include "ipfs/repo/config/replication.h"
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* See if we can handle this message
|
* See if we can handle this message
|
||||||
|
@ -124,14 +125,14 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* Send a journal message to a remote peer
|
* Send a journal message to a remote peer
|
||||||
* @param peer the peer to send it to
|
* @param replication_peer the peer to send it to
|
||||||
* @returns true(1) on success, false(0) otherwise.
|
* @returns true(1) on success, false(0) otherwise.
|
||||||
*/
|
*/
|
||||||
int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer) {
|
int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer) {
|
||||||
// make sure we're connected securely
|
// make sure we're connected securely
|
||||||
if (peer->is_local)
|
if (replication_peer->peer->is_local)
|
||||||
return 0;
|
return 0;
|
||||||
if (peer->sessionContext->secure_stream == NULL)
|
if (replication_peer->peer->sessionContext->secure_stream == NULL)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// grab the last 10? files
|
// grab the last 10? files
|
||||||
|
@ -164,7 +165,11 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer) {
|
||||||
}
|
}
|
||||||
// send the message
|
// send the message
|
||||||
message->current_epoch = os_utils_gmtime();
|
message->current_epoch = os_utils_gmtime();
|
||||||
int retVal = ipfs_journal_send_message(local_node, peer, message);
|
int retVal = ipfs_journal_send_message(local_node, replication_peer->peer, message);
|
||||||
|
if (retVal) {
|
||||||
|
replication_peer->lastConnect = message->current_epoch;
|
||||||
|
replication_peer->lastJournalTime = message->end_epoch;
|
||||||
|
}
|
||||||
// clean up
|
// clean up
|
||||||
ipfs_journal_message_free(message);
|
ipfs_journal_message_free(message);
|
||||||
ipfs_journal_free_records(journal_records);
|
ipfs_journal_free_records(journal_records);
|
||||||
|
|
|
@ -1,7 +1,37 @@
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
#include "multiaddr/multiaddr.h"
|
#include "multiaddr/multiaddr.h"
|
||||||
|
#include "libp2p/utils/linked_list.h"
|
||||||
#include "ipfs/repo/config/replication.h"
|
#include "ipfs/repo/config/replication.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate a new ReplicationPeer struct
|
||||||
|
* @returns the new struct
|
||||||
|
*/
|
||||||
|
struct ReplicationPeer* repo_config_replication_peer_new() {
|
||||||
|
struct ReplicationPeer* rp = (struct ReplicationPeer*) malloc(sizeof(struct ReplicationPeer));
|
||||||
|
if (rp != NULL) {
|
||||||
|
rp->lastConnect = 0;
|
||||||
|
rp->lastJournalTime = 0;
|
||||||
|
rp->peer = NULL;
|
||||||
|
}
|
||||||
|
return rp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Free the resources of a ReplicationPeer
|
||||||
|
* @param rp the ReplicationPeer to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int repo_config_replication_peer_free(struct ReplicationPeer* rp) {
|
||||||
|
if (rp != NULL) {
|
||||||
|
// we allocated the peer structure, so we must remove it
|
||||||
|
libp2p_peer_free(rp->peer);
|
||||||
|
free(rp);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* allocate memory and initialize the replication struct
|
* allocate memory and initialize the replication struct
|
||||||
* @param replication a pointer to the struct to be allocated
|
* @param replication a pointer to the struct to be allocated
|
||||||
|
@ -13,7 +43,7 @@ int repo_config_replication_new(struct Replication** replication) {
|
||||||
return 0;
|
return 0;
|
||||||
struct Replication* out = *replication;
|
struct Replication* out = *replication;
|
||||||
out->announce_minutes = 0;
|
out->announce_minutes = 0;
|
||||||
out->nodes = NULL;
|
out->replication_peers = NULL;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,15 +55,77 @@ int repo_config_replication_new(struct Replication** replication) {
|
||||||
int repo_config_replication_free(struct Replication* replication) {
|
int repo_config_replication_free(struct Replication* replication) {
|
||||||
if (replication != NULL) {
|
if (replication != NULL) {
|
||||||
// free the vector
|
// free the vector
|
||||||
if (replication->nodes != NULL) {
|
if (replication->replication_peers != NULL) {
|
||||||
for(int i = 0; i < replication->nodes->total; i++) {
|
for(int i = 0; i < replication->replication_peers->total; i++) {
|
||||||
struct MultiAddress* currAddr = (struct MultiAddress*)libp2p_utils_vector_get(replication->nodes, i);
|
struct ReplicationPeer* currAddr = (struct ReplicationPeer*)libp2p_utils_vector_get(replication->replication_peers, i);
|
||||||
multiaddress_free(currAddr);
|
repo_config_replication_peer_free(currAddr);
|
||||||
}
|
}
|
||||||
libp2p_utils_vector_free(replication->nodes);
|
libp2p_utils_vector_free(replication->replication_peers);
|
||||||
}
|
}
|
||||||
// free the struct
|
// free the struct
|
||||||
free(replication);
|
free(replication);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Given a peer, return the ReplicationPeer record
|
||||||
|
* @param replication the context
|
||||||
|
* @param key the peer to look for
|
||||||
|
* @returns the ReplicationPeer struct, or NULL if not found
|
||||||
|
*/
|
||||||
|
struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* replication, struct Libp2pPeer* key) {
|
||||||
|
if (replication != NULL) {
|
||||||
|
if (replication->replication_peers != NULL) {
|
||||||
|
for(int i = 0; i < replication->replication_peers->total; i++) {
|
||||||
|
struct ReplicationPeer* confAddr = (struct ReplicationPeer*) libp2p_utils_vector_get(replication->replication_peers, i);
|
||||||
|
if (libp2p_peer_compare(confAddr->peer, key) == 0)
|
||||||
|
return confAddr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determines if this peer is one of our approved replication nodes
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns true(1) if this peer should be kept syncronized, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int repo_config_replication_approved_node(struct Replication* replication, struct Libp2pPeer* peer) {
|
||||||
|
if (replication != NULL) {
|
||||||
|
if (repo_config_get_replication_peer(replication, peer) != NULL) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine the last time a replication was successful for this peer (at least we sent it something without error)
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns the time since the last replication, or the announce time if we have no record
|
||||||
|
*/
|
||||||
|
unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer) {
|
||||||
|
struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer);
|
||||||
|
if (rp != NULL) {
|
||||||
|
return rp->lastConnect;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Determine the last journal record time that was sent to this peer
|
||||||
|
* @param replication the replication struct
|
||||||
|
* @param peer the peer to examine
|
||||||
|
* @returns the time of the last journal entry successfully sent
|
||||||
|
*/
|
||||||
|
unsigned long long repo_config_replication_last_journal_time(struct Replication* replication, struct Libp2pPeer* peer) {
|
||||||
|
struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer);
|
||||||
|
if (rp != NULL) {
|
||||||
|
return rp->lastJournalTime;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -484,11 +484,11 @@ int fs_repo_open_config(struct FSRepo* repo) {
|
||||||
curr_pos++;
|
curr_pos++;
|
||||||
_get_json_int_value(data, tokens, num_tokens, curr_pos, "AnnounceMinutes", &repo->config->replication->announce_minutes);
|
_get_json_int_value(data, tokens, num_tokens, curr_pos, "AnnounceMinutes", &repo->config->replication->announce_minutes);
|
||||||
// nodes list
|
// nodes list
|
||||||
int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Nodes");
|
int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Peers");
|
||||||
if (nodes_pos >= 0) {
|
if (nodes_pos >= 0) {
|
||||||
if (tokens[nodes_pos].type == JSMN_ARRAY) {
|
if (tokens[nodes_pos].type == JSMN_ARRAY) {
|
||||||
int nodes_size = tokens[nodes_pos].size;
|
int nodes_size = tokens[nodes_pos].size;
|
||||||
repo->config->replication->nodes = libp2p_utils_vector_new(nodes_size);
|
repo->config->replication->replication_peers = libp2p_utils_vector_new(nodes_size);
|
||||||
nodes_pos++;
|
nodes_pos++;
|
||||||
for(int i = 0; i < nodes_size; i++) {
|
for(int i = 0; i < nodes_size; i++) {
|
||||||
char* val = NULL;
|
char* val = NULL;
|
||||||
|
@ -497,7 +497,11 @@ int fs_repo_open_config(struct FSRepo* repo) {
|
||||||
struct MultiAddress* cur = multiaddress_new_from_string(val);
|
struct MultiAddress* cur = multiaddress_new_from_string(val);
|
||||||
if (cur == NULL)
|
if (cur == NULL)
|
||||||
continue;
|
continue;
|
||||||
libp2p_utils_vector_add(repo->config->replication->nodes, cur);
|
// make multiaddress a peer
|
||||||
|
struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur);
|
||||||
|
struct ReplicationPeer* rp = repo_config_replication_peer_new();
|
||||||
|
rp->peer = peer;
|
||||||
|
libp2p_utils_vector_add(repo->config->replication->replication_peers, rp);
|
||||||
free(val);
|
free(val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue