diff --git a/core/null.c b/core/null.c index e06298d..8b4e553 100644 --- a/core/null.c +++ b/core/null.c @@ -12,17 +12,19 @@ #include "libp2p/net/p2pnet.h" #include "libp2p/net/protocol.h" #include "libp2p/nodeio/nodeio.h" +#include "libp2p/os/utils.h" #include "libp2p/record/message.h" #include "libp2p/routing/dht_protocol.h" #include "libp2p/secio/secio.h" #include "libp2p/utils/logger.h" #include "ipfs/core/daemon.h" -#include "ipfs/routing/routing.h" #include "ipfs/core/ipfs_node.h" +#include "ipfs/exchange/bitswap/network.h" +#include "ipfs/journal/journal.h" #include "ipfs/merkledag/merkledag.h" #include "ipfs/merkledag/node.h" +#include "ipfs/routing/routing.h" #include "ipfs/util/thread_pool.h" -#include "ipfs/exchange/bitswap/network.h" #define BUF_SIZE 4096 @@ -127,6 +129,38 @@ void ipfs_null_connection (void *ptr) { return; } +int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* peer) { + if (peer == NULL) + return 0; + if (peer->is_local) + return 1; + // Is this peer one of our backup partners? + struct ReplicationPeer* replication_peer = repo_config_get_replication_peer(local_node->repo->config->replication, peer); + long long announce_secs = local_node->repo->config->replication->announce_minutes * 60; + // If so, has there been enough time since the last attempt a backup? + if (replication_peer != NULL) { + announce_secs -= os_utils_gmtime() - repo_config_replication_last_attempt(local_node->repo->config->replication, peer); + } + // should we attempt to connect if we're not already? + if (replication_peer != NULL && announce_secs < 0) { + // try to connect if we aren't already + if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { + if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 10)) { + return 0; + } + } + // attempt a backup, don't forget to reset timer + ipfs_journal_sync(local_node, replication_peer); + } else { + // try a ping, but only if we're connected + if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) { + peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; + } + } + return 1; +} + + /*** * Called by the daemon to listen for connections * @param ptr a pointer to an IpfsNodeListenParams struct @@ -187,12 +221,7 @@ void* ipfs_null_listen (void *ptr) } else { // timeout... do maintenance struct PeerEntry* entry = current_peer_entry->item; - if (current_peer_entry != NULL && !entry->peer->is_local && entry->peer->connection_type == CONNECTION_TYPE_CONNECTED) { - libp2p_logger_debug("null", "Attempting to ping %s.\n", entry->peer->id); - if (!listen_param->local_node->routing->Ping(listen_param->local_node->routing, entry->peer)) { - entry->peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; - } - } + ipfs_null_do_maintenance(listen_param->local_node, entry->peer); if (current_peer_entry != NULL) current_peer_entry = current_peer_entry->next; if (current_peer_entry == NULL) diff --git a/include/ipfs/journal/journal.h b/include/ipfs/journal/journal.h index 764656c..f08150f 100644 --- a/include/ipfs/journal/journal.h +++ b/include/ipfs/journal/journal.h @@ -44,8 +44,7 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I /*** * Send a journal message to a remote peer - * @param local_node the local node - * @param peer the peer to send it to + * @param replication_peer the peer to send it to * @returns true(1) on success, false(0) otherwise. */ -int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer); +int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer); diff --git a/include/ipfs/repo/config/replication.h b/include/ipfs/repo/config/replication.h index b7f1204..9add2b2 100644 --- a/include/ipfs/repo/config/replication.h +++ b/include/ipfs/repo/config/replication.h @@ -1,10 +1,32 @@ +#pragma once + #include "libp2p/utils/vector.h" +#include "libp2p/peer/peer.h" + +struct ReplicationPeer { + struct Libp2pPeer* peer; + unsigned long long lastConnect; + unsigned long long lastJournalTime; +}; struct Replication { int announce_minutes; - struct Libp2pVector* nodes; + struct Libp2pVector* replication_peers; }; +/** + * Allocate a new ReplicationPeer struct + * @returns the new struct + */ +struct ReplicationPeer* repo_config_replication_peer_new(); + +/*** + * Free the resources of a ReplicationPeer + * @param rp the ReplicationPeer to free + * @returns true(1) + */ +int repo_config_replication_peer_free(struct ReplicationPeer* rp); + /*** * allocate memory and initialize the replication struct * @param replication a pointer to the struct to be allocated @@ -18,3 +40,36 @@ int repo_config_replication_new(struct Replication** replication); * @returns true(1); */ int repo_config_replication_free(struct Replication* replication); + +/*** + * Determines if this peer is one of our approved replication nodes + * @param replication the replication struct + * @param peer the peer to examine + * @returns true(1) if this peer should be kept syncronized, false(0) otherwise + */ +int repo_config_replication_approved_node(struct Replication* replication, struct Libp2pPeer* peer); + +/** + * Determine the last time a replication was successful for this peer (at least we sent it something without error) + * @param replication the replication struct + * @param peer the peer to examine + * @returns the time since the last replication, or the announce time if we have no record + */ +unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer); + +/*** + * Determine the last journal record time that was sent to this peer + * @param replication the replication struct + * @param peer the peer to examine + * @returns the time of the last journal entry successfully sent + */ +unsigned long long repo_config_replication_last_journal_time(struct Replication* replication, struct Libp2pPeer* peer); + +/*** + * Given a peer, return the ReplicationPeer record + * @param replication the context + * @param key the peer to look for + * @returns the ReplicationPeer struct, or NULL if not found + */ +struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* replication, struct Libp2pPeer* key); + diff --git a/journal/journal.c b/journal/journal.c index 3539fd8..20e844e 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -6,6 +6,7 @@ #include "ipfs/journal/journal_message.h" #include "ipfs/journal/journal_entry.h" #include "ipfs/repo/fsrepo/journalstore.h" +#include "ipfs/repo/config/replication.h" /*** * See if we can handle this message @@ -124,14 +125,14 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st /*** * Send a journal message to a remote peer - * @param peer the peer to send it to + * @param replication_peer the peer to send it to * @returns true(1) on success, false(0) otherwise. */ -int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer) { +int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer) { // make sure we're connected securely - if (peer->is_local) + if (replication_peer->peer->is_local) return 0; - if (peer->sessionContext->secure_stream == NULL) + if (replication_peer->peer->sessionContext->secure_stream == NULL) return 0; // grab the last 10? files @@ -164,7 +165,11 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer) { } // send the message message->current_epoch = os_utils_gmtime(); - int retVal = ipfs_journal_send_message(local_node, peer, message); + int retVal = ipfs_journal_send_message(local_node, replication_peer->peer, message); + if (retVal) { + replication_peer->lastConnect = message->current_epoch; + replication_peer->lastJournalTime = message->end_epoch; + } // clean up ipfs_journal_message_free(message); ipfs_journal_free_records(journal_records); diff --git a/repo/config/replication.c b/repo/config/replication.c index 1baca83..337937d 100644 --- a/repo/config/replication.c +++ b/repo/config/replication.c @@ -1,7 +1,37 @@ #include + #include "multiaddr/multiaddr.h" +#include "libp2p/utils/linked_list.h" #include "ipfs/repo/config/replication.h" +/** + * Allocate a new ReplicationPeer struct + * @returns the new struct + */ +struct ReplicationPeer* repo_config_replication_peer_new() { + struct ReplicationPeer* rp = (struct ReplicationPeer*) malloc(sizeof(struct ReplicationPeer)); + if (rp != NULL) { + rp->lastConnect = 0; + rp->lastJournalTime = 0; + rp->peer = NULL; + } + return rp; +} + +/*** + * Free the resources of a ReplicationPeer + * @param rp the ReplicationPeer to free + * @returns true(1) + */ +int repo_config_replication_peer_free(struct ReplicationPeer* rp) { + if (rp != NULL) { + // we allocated the peer structure, so we must remove it + libp2p_peer_free(rp->peer); + free(rp); + } + return 1; +} + /*** * allocate memory and initialize the replication struct * @param replication a pointer to the struct to be allocated @@ -13,7 +43,7 @@ int repo_config_replication_new(struct Replication** replication) { return 0; struct Replication* out = *replication; out->announce_minutes = 0; - out->nodes = NULL; + out->replication_peers = NULL; return 1; } @@ -25,15 +55,77 @@ int repo_config_replication_new(struct Replication** replication) { int repo_config_replication_free(struct Replication* replication) { if (replication != NULL) { // free the vector - if (replication->nodes != NULL) { - for(int i = 0; i < replication->nodes->total; i++) { - struct MultiAddress* currAddr = (struct MultiAddress*)libp2p_utils_vector_get(replication->nodes, i); - multiaddress_free(currAddr); + if (replication->replication_peers != NULL) { + for(int i = 0; i < replication->replication_peers->total; i++) { + struct ReplicationPeer* currAddr = (struct ReplicationPeer*)libp2p_utils_vector_get(replication->replication_peers, i); + repo_config_replication_peer_free(currAddr); } - libp2p_utils_vector_free(replication->nodes); + libp2p_utils_vector_free(replication->replication_peers); } // free the struct free(replication); } return 1; } + +/*** + * Given a peer, return the ReplicationPeer record + * @param replication the context + * @param key the peer to look for + * @returns the ReplicationPeer struct, or NULL if not found + */ +struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* replication, struct Libp2pPeer* key) { + if (replication != NULL) { + if (replication->replication_peers != NULL) { + for(int i = 0; i < replication->replication_peers->total; i++) { + struct ReplicationPeer* confAddr = (struct ReplicationPeer*) libp2p_utils_vector_get(replication->replication_peers, i); + if (libp2p_peer_compare(confAddr->peer, key) == 0) + return confAddr; + } + } + } + return NULL; +} + +/*** + * Determines if this peer is one of our approved replication nodes + * @param replication the replication struct + * @param peer the peer to examine + * @returns true(1) if this peer should be kept syncronized, false(0) otherwise + */ +int repo_config_replication_approved_node(struct Replication* replication, struct Libp2pPeer* peer) { + if (replication != NULL) { + if (repo_config_get_replication_peer(replication, peer) != NULL) { + return 1; + } + } + return 0; +} + +/** + * Determine the last time a replication was successful for this peer (at least we sent it something without error) + * @param replication the replication struct + * @param peer the peer to examine + * @returns the time since the last replication, or the announce time if we have no record + */ +unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer) { + struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer); + if (rp != NULL) { + return rp->lastConnect; + } + return 0; +} + +/*** + * Determine the last journal record time that was sent to this peer + * @param replication the replication struct + * @param peer the peer to examine + * @returns the time of the last journal entry successfully sent + */ +unsigned long long repo_config_replication_last_journal_time(struct Replication* replication, struct Libp2pPeer* peer) { + struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer); + if (rp != NULL) { + return rp->lastJournalTime; + } + return 0; +} diff --git a/repo/fsrepo/fs_repo.c b/repo/fsrepo/fs_repo.c index e89549b..5c7d752 100644 --- a/repo/fsrepo/fs_repo.c +++ b/repo/fsrepo/fs_repo.c @@ -484,11 +484,11 @@ int fs_repo_open_config(struct FSRepo* repo) { curr_pos++; _get_json_int_value(data, tokens, num_tokens, curr_pos, "AnnounceMinutes", &repo->config->replication->announce_minutes); // nodes list - int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Nodes"); + int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Peers"); if (nodes_pos >= 0) { if (tokens[nodes_pos].type == JSMN_ARRAY) { int nodes_size = tokens[nodes_pos].size; - repo->config->replication->nodes = libp2p_utils_vector_new(nodes_size); + repo->config->replication->replication_peers = libp2p_utils_vector_new(nodes_size); nodes_pos++; for(int i = 0; i < nodes_size; i++) { char* val = NULL; @@ -497,7 +497,11 @@ int fs_repo_open_config(struct FSRepo* repo) { struct MultiAddress* cur = multiaddress_new_from_string(val); if (cur == NULL) continue; - libp2p_utils_vector_add(repo->config->replication->nodes, cur); + // make multiaddress a peer + struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur); + struct ReplicationPeer* rp = repo_config_replication_peer_new(); + rp->peer = peer; + libp2p_utils_vector_add(repo->config->replication->replication_peers, rp); free(val); } }