diff --git a/include/ipfs/journal/journal.h b/include/ipfs/journal/journal.h index ce8bd20..764656c 100644 --- a/include/ipfs/journal/journal.h +++ b/include/ipfs/journal/journal.h @@ -44,7 +44,8 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I /*** * Send a journal message to a remote peer + * @param local_node the local node * @param peer the peer to send it to * @returns true(1) on success, false(0) otherwise. */ -int ipfs_journal_sync(struct Libp2pPeer* peer); +int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer); diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h index 1f04056..ab623c0 100644 --- a/include/ipfs/repo/fsrepo/journalstore.h +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -29,7 +29,7 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum /** * Close the cursor */ -int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor); +int repo_journalstore_cursor_close(struct Datastore* datastore, void* cursor); int journal_record_free(struct JournalRecord* rec); diff --git a/journal/Makefile b/journal/Makefile index e0975c5..7a7b1d8 100644 --- a/journal/Makefile +++ b/journal/Makefile @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-multihash/include -I../../c-protobuf -Wall -std=c99 +CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-multihash/include -I../../c-protobuf -I../../lmdb/libraries/liblmdb -Wall -std=c99 ifdef DEBUG CFLAGS += -g3 diff --git a/journal/journal.c b/journal/journal.c index d76ecce..3539fd8 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -1,8 +1,11 @@ /** * The journal protocol attempts to keep a journal in sync with other (approved) nodes */ - +#include "libp2p/os/utils.h" #include "ipfs/journal/journal.h" +#include "ipfs/journal/journal_message.h" +#include "ipfs/journal/journal_entry.h" +#include "ipfs/repo/fsrepo/journalstore.h" /*** * See if we can handle this message @@ -58,32 +61,114 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I return handler; } +/*** + * Retrieve the last n records from the journalstore + * @param database the reference to the opened db + * @param n the number of records to retrieve + * @returns a vector of struct JournalRecord + */ +struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { + struct Libp2pVector* vector = libp2p_utils_vector_new(1); + if (vector != NULL) { + void* cursor; + if (!repo_journalstore_cursor_open(database, &cursor)) + return NULL; + struct JournalRecord* rec = NULL; + if (!repo_journalstore_cursor_get(database, cursor, CURSOR_LAST, &rec)) { + libp2p_utils_vector_free(vector); + repo_journalstore_cursor_close(database, cursor); + return NULL; + } + // we've got one, now start the loop + int i = 0; + do { + libp2p_utils_vector_add(vector, rec); + if (!repo_journalstore_cursor_get(database, cursor, CURSOR_PREVIOUS, &rec)) { + break; + } + i++; + } while(i < n); + repo_journalstore_cursor_close(database, cursor); + } + return vector; +} + +int ipfs_journal_free_records(struct Libp2pVector* records) { + if (records != NULL) { + for (int i = 0; i < records->total; i++) { + struct JournalRecord* rec = (struct JournalRecord*)libp2p_utils_vector_get(records, i); + journal_record_free(rec); + } + libp2p_utils_vector_free(records); + } + return 1; +} + +int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) { + if (peer->connection_type != CONNECTION_TYPE_CONNECTED) + libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, 10); + if (peer->connection_type != CONNECTION_TYPE_CONNECTED) + return 0; + // protobuf the message + size_t msg_size = ipfs_journal_message_encode_size(message); + uint8_t msg[msg_size]; + if (!ipfs_journal_message_encode(message, &msg[0], msg_size, &msg_size)) + return 0; + // send the header + char* header = "/ipfs/journalio/1.0.0/n"; + if (!peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, (unsigned char*)header, strlen(header))) + return 0; + // send the message + return peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, msg, msg_size); +} + /*** * Send a journal message to a remote peer * @param peer the peer to send it to * @returns true(1) on success, false(0) otherwise. */ -int ipfs_journal_sync(struct Libp2pPeer* peer) { +int ipfs_journal_sync(struct IpfsNode* local_node, struct Libp2pPeer* peer) { // make sure we're connected securely if (peer->is_local) return 0; if (peer->sessionContext->secure_stream == NULL) return 0; - /* - // grab the last 10 files - struct Libp2pVector* vector = libp2p_utils_vector_new(1); - if (vector == NULL) { - return 0; + + // grab the last 10? files + struct Libp2pVector* journal_records = ipfs_journal_get_last(local_node->repo->config->datastore, 10); + if (journal_records == NULL || journal_records->total == 0) { + // nothing to do + return 1; } - ipfs_journal_get_last(10, &vector); - struct JournalMessage* message = NULL; // build the message - if (!ipfs_journal_build_message(message)) - return 0; - // protobuf the message - // send the protocol header + struct JournalMessage* message = ipfs_journal_message_new(); + for(int i = 0; i < journal_records->total; i++) { + struct JournalRecord* rec = (struct JournalRecord*) libp2p_utils_vector_get(journal_records, i); + if (rec->timestamp > message->end_epoch) + message->end_epoch = rec->timestamp; + if (message->start_epoch == 0 || rec->timestamp < message->start_epoch) + message->start_epoch = rec->timestamp; + struct JournalEntry* entry = ipfs_journal_entry_new(); + entry->timestamp = rec->timestamp; + entry->pin = 1; + entry->hash_size = rec->hash_size; + entry->hash = (uint8_t*) malloc(entry->hash_size); + if (entry->hash == NULL) { + // out of memory + ipfs_journal_message_free(message); + ipfs_journal_free_records(journal_records); + return 0; + } + memcpy(entry->hash, rec->hash, entry->hash_size); + libp2p_utils_vector_add(message->journal_entries, entry); + } // send the message - */ - return 0; + message->current_epoch = os_utils_gmtime(); + int retVal = ipfs_journal_send_message(local_node, peer, message); + // clean up + ipfs_journal_message_free(message); + ipfs_journal_free_records(journal_records); + + return retVal; } diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index eaf6b3e..873101c 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -12,6 +12,7 @@ #include "lmdb.h" #include "libp2p/utils/logger.h" +#include "libp2p/os/utils.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h" #include "ipfs/repo/fsrepo/journalstore.h" #include "varint.h" @@ -122,17 +123,6 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, return 1; } - -/** - * Get the current time UTC - * @returns number of seconds since epoch in UTC - */ -unsigned long long lmdb_datastore_gmt_time() { - time_t local = time(NULL); - struct tm *gmt = gmtime(&local); - return (unsigned long long)mktime(gmt); -} - /** * Write data to the datastore with the specified key * @param key the key @@ -162,7 +152,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha return 0; // add the timestamp - unsigned long long timestamp = lmdb_datastore_gmt_time(); + unsigned long long timestamp = os_utils_gmtime(); uint8_t *record; size_t record_size; repo_fsrepo_lmdb_build_record(timestamp, data, data_size, &record, &record_size); diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index 83bbdbf..57363a8 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -100,10 +100,16 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum D MDB_val mdb_key; MDB_val mdb_value; MDB_cursor_op co = MDB_FIRST; + if (op == CURSOR_FIRST) co = MDB_FIRST; else if (op == CURSOR_NEXT) co = MDB_NEXT; + else if (op == CURSOR_LAST) + co = MDB_LAST; + else if (op == CURSOR_PREVIOUS) + co = MDB_PREV; + if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { return 0; } @@ -126,7 +132,7 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum D /** * Close the cursor */ -int repo_cournalstore_cursor_close(struct Datastore* datastore, void* crsr) { +int repo_journalstore_cursor_close(struct Datastore* datastore, void* crsr) { if (crsr != NULL) { struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr; if (cursor->cursor != NULL) {