diff --git a/Makefile b/Makefile index 14f545e..1507920 100644 --- a/Makefile +++ b/Makefile @@ -11,6 +11,7 @@ all: cd core; make all; cd exchange; make all; cd importer; make all; + cd journal; make all; cd merkledag; make all; cd multibase; make all; cd pin; make all; @@ -35,6 +36,7 @@ clean: cd core; make clean; cd exchange; make clean; cd importer; make clean; + cd journal; make clean; cd merkledag; make clean; cd multibase; make clean; cd pin; make clean; diff --git a/core/ipfs_node.c b/core/ipfs_node.c index 30acd19..b157adb 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -5,14 +5,15 @@ #include "libp2p/routing/dht_protocol.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/exchange/bitswap/bitswap.h" +#include "ipfs/journal/journal.h" struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* node) { struct Libp2pVector* retVal = libp2p_utils_vector_new(1); if (retVal != NULL) { // secio libp2p_utils_vector_add(retVal, libp2p_secio_build_protocol_handler(&node->identity->private_key, node->peerstore)); - // nodeio - //libp2p_utils_vector_add(retVal, libp2p_nodeio_build_protocol_handler()); + // journal + libp2p_utils_vector_add(retVal, ipfs_journal_build_protocol_handler(node)); // kademlia libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore)); // bitswap diff --git a/include/ipfs/journal/journal.h b/include/ipfs/journal/journal.h new file mode 100644 index 0000000..ce8bd20 --- /dev/null +++ b/include/ipfs/journal/journal.h @@ -0,0 +1,50 @@ +#pragma once +#include +#include + +#include "libp2p/conn/session.h" +#include "ipfs/core/ipfs_node.h" +#include "libp2p/net/protocol.h" + +/** + * The journal protocol attempts to keep a journal in sync with other (approved) nodes + */ + +/*** + * See if we can handle this message + * @param incoming the incoming message + * @param incoming_size the size of the incoming message + * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. + */ +int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size); + +/** + * Clean up resources used by this handler + * @param context the context to clean up + * @returns true(1) + */ +int ipfs_journal_shutdown_handler(void* context); + +/*** + * Handles a message + * @param incoming the message + * @param incoming_size the size of the message + * @param session_context details of the remote peer + * @param protocol_context in this case, an IpfsNode + * @returns 0 if the caller should not continue looping, <0 on error, >0 on success + */ +int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) ; + +/*** + * Build the protocol handler struct for the Journal protocol + * @param local_node what to stuff in the context + * @returns the protocol handler + */ +struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct IpfsNode* local_node); + +/*** + * Send a journal message to a remote peer + * @param peer the peer to send it to + * @returns true(1) on success, false(0) otherwise. + */ +int ipfs_journal_sync(struct Libp2pPeer* peer); diff --git a/include/ipfs/journal/journal_entry.h b/include/ipfs/journal/journal_entry.h new file mode 100644 index 0000000..f83fcaa --- /dev/null +++ b/include/ipfs/journal/journal_entry.h @@ -0,0 +1,43 @@ +#pragma once + +/** + * A journal entry protobuf + */ + +#include +#include + +struct JournalEntry { + unsigned long long timestamp; + uint8_t pin; + uint8_t *hash; + size_t hash_size; +}; + +struct JournalEntry* ipfs_journal_entry_new(); + +int ipfs_journal_entry_free(struct JournalEntry* entry); + +/** + * Determine the maximum size of a protobuf'd JournalEntry + */ +int ipfs_journal_entry_encode_size(struct JournalEntry* entry); + +/*** + * Protobuf the journal entry + * @param entry the JournalEntry to protobuf + * @param buffer where to place the results + * @param max_buffer_size the amount of memory allocated for the buffer + * @param bytes_used the amount of the buffer used + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_entry_encode(struct JournalEntry* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_used); + +/*** + * Turn a protobuf'd JournalEntry and turn it into a real JournalEntry + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming buffer + * @param results where to put the new JournalEntry + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_entry_decode(uint8_t *incoming, size_t incoming_size, struct JournalEntry **results); diff --git a/include/ipfs/journal/journal_message.h b/include/ipfs/journal/journal_message.h new file mode 100644 index 0000000..3289b7d --- /dev/null +++ b/include/ipfs/journal/journal_message.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include + +#include "libp2p/utils/vector.h" + +struct JournalMessage { + unsigned long long current_epoch; + unsigned long long start_epoch; + unsigned long long end_epoch; + struct Libp2pVector* journal_entries; +}; + +struct JournalMessage* ipfs_journal_message_new(); +int ipfs_journal_message_free(struct JournalMessage* message); + +/** + * Determine the maximum size of a protobuf'd JournalMessage + * @param message the JournalMessage + * @returns the maximum size of this message in bytes if it were protobuf'd + */ +int ipfs_journal_message_encode_size(struct JournalMessage* message); + +/*** + * Protobuf the journal message + * @param message the JournalMessage to protobuf + * @param buffer where to place the results + * @param max_buffer_size the amount of memory allocated for the buffer + * @param bytes_used the amount of the buffer used + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_message_encode(struct JournalMessage* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_used); + +/*** + * Turn a protobuf'd JournalMessage and turn it into a real JournalMessage + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming buffer + * @param results where to put the new JournalMessage + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **results); diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h index 67d4987..1f04056 100644 --- a/include/ipfs/repo/fsrepo/journalstore.h +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -6,6 +6,7 @@ #include #include +#include "lmdb.h" #include "libp2p/db/datastore.h" struct JournalRecord { @@ -31,3 +32,5 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor); int journal_record_free(struct JournalRecord* rec); + +int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size); diff --git a/journal/Makefile b/journal/Makefile new file mode 100644 index 0000000..e0975c5 --- /dev/null +++ b/journal/Makefile @@ -0,0 +1,18 @@ +CC = gcc +CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-multihash/include -I../../c-protobuf -Wall -std=c99 + +ifdef DEBUG +CFLAGS += -g3 +endif + +LFLAGS = +DEPS = +OBJS = journal.o journal_entry.o journal_message.o + +%.o: %.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) + +all: $(OBJS) + +clean: + rm -f *.o diff --git a/journal/journal.c b/journal/journal.c new file mode 100644 index 0000000..d76ecce --- /dev/null +++ b/journal/journal.c @@ -0,0 +1,89 @@ +/** + * The journal protocol attempts to keep a journal in sync with other (approved) nodes + */ + +#include "ipfs/journal/journal.h" + +/*** + * See if we can handle this message + * @param incoming the incoming message + * @param incoming_size the size of the incoming message + * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. + */ +int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) { + if (incoming_size < 8) + return 0; + char* result = strstr((char*)incoming, "/ipfs/journal/1.0.0"); + if(result == NULL || result != (char*)incoming) + return 0; + return 1; +} + +/** + * Clean up resources used by this handler + * @param context the context to clean up + * @returns true(1) + */ +int ipfs_journal_shutdown_handler(void* context) { + return 1; +} + +/*** + * Handles a message + * @param incoming the message + * @param incoming_size the size of the message + * @param session_context details of the remote peer + * @param protocol_context in this case, an IpfsNode + * @returns 0 if the caller should not continue looping, <0 on error, >0 on success + */ +int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { + //struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; + //TODO: handle the message + return -1; +} + +/*** + * Build the protocol handler struct for the Journal protocol + * @param local_node what to stuff in the context + * @returns the protocol handler + */ +struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct IpfsNode* local_node) { + struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*) malloc(sizeof(struct Libp2pProtocolHandler)); + if (handler != NULL) { + handler->context = (void*)local_node; + handler->CanHandle = ipfs_journal_can_handle; + handler->HandleMessage = ipfs_journal_handle_message; + handler->Shutdown = ipfs_journal_shutdown_handler; + } + return handler; +} + +/*** + * Send a journal message to a remote peer + * @param peer the peer to send it to + * @returns true(1) on success, false(0) otherwise. + */ +int ipfs_journal_sync(struct Libp2pPeer* peer) { + // make sure we're connected securely + if (peer->is_local) + return 0; + if (peer->sessionContext->secure_stream == NULL) + return 0; + /* + // grab the last 10 files + struct Libp2pVector* vector = libp2p_utils_vector_new(1); + if (vector == NULL) { + return 0; + } + ipfs_journal_get_last(10, &vector); + struct JournalMessage* message = NULL; + // build the message + if (!ipfs_journal_build_message(message)) + return 0; + // protobuf the message + // send the protocol header + // send the message + */ + return 0; +} + diff --git a/journal/journal_entry.c b/journal/journal_entry.c new file mode 100644 index 0000000..c8f22d7 --- /dev/null +++ b/journal/journal_entry.c @@ -0,0 +1,141 @@ +/** + * A journal entry protobuf + */ +#include "libp2p/utils/logger.h" +#include "ipfs/journal/journal_entry.h" +#include "protobuf.h" + +struct JournalEntry* ipfs_journal_entry_new() { + struct JournalEntry* journal_entry = (struct JournalEntry*) malloc(sizeof(struct JournalEntry)); + if (journal_entry != NULL) { + journal_entry->hash = NULL; + journal_entry->hash_size = 0; + journal_entry->pin = 0; + journal_entry->timestamp = 0; + } + return journal_entry; +} + +int ipfs_journal_entry_free(struct JournalEntry* entry) { + if (entry != NULL) { + if (entry->hash != NULL) { + free(entry->hash); + entry->hash = NULL; + entry->hash_size = 0; + } + free(entry); + } + return 1; +} + +/** + * Determine the maximum size of a protobuf'd JournalEntry + */ +int ipfs_journal_entry_encode_size(struct JournalEntry* entry) { + // hash + int retVal = entry->hash_size; + // hash size + retVal += 11; + // pin + retVal += 1; + // timestamp + retVal += 11; + return retVal; +} + +/*** + * Protobuf the journal entry + * @param entry the JournalEntry to protobuf + * @param buffer where to place the results + * @param max_buffer_size the amount of memory allocated for the buffer + * @param bytes_used the amount of the buffer used + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_entry_encode(struct JournalEntry* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_written) { + /* + message JournalEntry { + int32 timestamp = 1; + string hash = 2; + bool pin = 3; + } + */ + // sanity checks + if (buffer == NULL) + return 0; + if (max_buffer_size <= 0) + return 0; + *bytes_written = 0; + size_t bytes_used; + // timestamp + if (!protobuf_encode_varint(1, WIRETYPE_VARINT, entry->timestamp, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + // hash + if (!protobuf_encode_length_delimited(2, WIRETYPE_LENGTH_DELIMITED, (char*)entry->hash, entry->hash_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + // pin + if (!protobuf_encode_varint(3, WIRETYPE_VARINT, entry->pin, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + return 1; +} + +/*** + * Turn a protobuf'd JournalEntry and turn it into a real JournalEntry + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming buffer + * @param results where to put the new JournalEntry + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_entry_decode(uint8_t *incoming, size_t incoming_size, struct JournalEntry **out) { + size_t pos = 0; + int retVal = 0, got_something = 0;; + + if ( (*out = ipfs_journal_entry_new()) == NULL) + goto exit; + + while(pos < incoming_size) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&incoming[pos], incoming_size, &field_no, &field_type, &bytes_read) == 0) { + goto exit; + } + if (field_no < 1 || field_no > 5) { + libp2p_logger_error("journal_entry", "Invalid character in journal_entry protobuf at position %lu. Value: %02x\n", pos, incoming[pos]); + } + pos += bytes_read; + switch(field_no) { + case (1): // timestamp + if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->timestamp, &bytes_read) == 0) + goto exit; + pos += bytes_read; + got_something = 1; + break; + case (2): // hash + if (protobuf_decode_length_delimited(&incoming[pos], incoming_size - pos, (char**)&(*out)->hash, &(*out)->hash_size, &bytes_read) == 0) + goto exit; + pos += bytes_read; + got_something = 1; + break; + case (3): { // pin + unsigned long long temp; + if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &temp, &bytes_read) == 0) + goto exit; + (*out)->pin = (temp == 1); + pos += bytes_read; + got_something = 1; + break; + } + } + } + + retVal = got_something; + +exit: + if (retVal == 0) { + ipfs_journal_entry_free(*out); + } + return retVal; +} diff --git a/journal/journal_message.c b/journal/journal_message.c new file mode 100644 index 0000000..94c6f52 --- /dev/null +++ b/journal/journal_message.c @@ -0,0 +1,171 @@ +#include "ipfs/journal/journal_message.h" +#include "ipfs/journal/journal_entry.h" +#include "libp2p/utils/logger.h" +#include "protobuf.h" + +struct JournalMessage* ipfs_journal_message_new() { + struct JournalMessage *message = (struct JournalMessage*) malloc(sizeof(struct JournalMessage)); + if (message != NULL) { + message->current_epoch = 0; + message->end_epoch = 0; + message->start_epoch = 0; + message->journal_entries = libp2p_utils_vector_new(1); + } + return message; +} + +int ipfs_journal_message_free(struct JournalMessage* message) { + if (message != NULL) { + if (message->journal_entries != NULL) { + for(int i = 0; i < message->journal_entries->total; i++) { + struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i); + ipfs_journal_entry_free(entry); + } + libp2p_utils_vector_free(message->journal_entries); + message->journal_entries = NULL; + } + free(message); + } + return 1; +} + +/** + * Determine the maximum size of a protobuf'd JournalMessage + * @param message the JournalMessage + * @returns the maximum size of this message in bytes if it were protobuf'd + */ +int ipfs_journal_message_encode_size(struct JournalMessage* message) { + // 3 epochs + int sz = 33; + // journal entries + for (int i = 0; i < message->journal_entries->total; i++) { + struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i); + sz += ipfs_journal_entry_encode_size(entry); + } + return sz; +} + +/*** + * Protobuf the journal message + * @param message the JournalMessage to protobuf + * @param buffer where to place the results + * @param max_buffer_size the amount of memory allocated for the buffer + * @param bytes_used the amount of the buffer used + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_message_encode(struct JournalMessage* message, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_written) { + /* + message JournalMessage { + int32 current_epoch = 1; + int32 start_epoch = 2; + int32 end_epoch = 3; + repeated JournalEntry journal_entries = 4; + } + */ + // sanity checks + if (buffer == NULL) + return 0; + if (max_buffer_size <= 0) + return 0; + *bytes_written = 0; + size_t bytes_used; + // current_epoch + if (!protobuf_encode_varint(1, WIRETYPE_VARINT, message->current_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + // start_epoch + if (!protobuf_encode_varint(2, WIRETYPE_VARINT, message->start_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + // end_epoch + if (!protobuf_encode_varint(3, WIRETYPE_VARINT, message->end_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) + return 0; + *bytes_written += bytes_used; + // journal_entries + for (int i = 0; i < message->journal_entries->total; i++) { + struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i); + // encode the journal entry + size_t temp_size = ipfs_journal_entry_encode_size(entry); + uint8_t temp[temp_size]; + if (!ipfs_journal_entry_encode(entry, &temp[0], temp_size, &temp_size)) + return 0; + if (!protobuf_encode_length_delimited(4, WIRETYPE_LENGTH_DELIMITED, (char*)&temp[0], temp_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) { + return 0; + } + *bytes_written += bytes_used; + } + return 1; +} + +/*** + * Turn a protobuf'd JournalMessage and turn it into a real JournalMessage + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming buffer + * @param results where to put the new JournalMessage + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **out) { + size_t pos = 0; + int retVal = 0, got_something = 0;; + + if ( (*out = ipfs_journal_message_new()) == NULL) + goto exit; + + while(pos < incoming_size) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&incoming[pos], incoming_size, &field_no, &field_type, &bytes_read) == 0) { + goto exit; + } + if (field_no < 1 || field_no > 5) { + libp2p_logger_error("journal_message", "Invalid character in journal_message protobuf at position %lu. Value: %02x\n", pos, incoming[pos]); + } + pos += bytes_read; + switch(field_no) { + case (1): // current_epoch + if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->current_epoch, &bytes_read) == 0) + goto exit; + pos += bytes_read; + got_something = 1; + break; + case (2): // start_epoch + if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->start_epoch, &bytes_read) == 0) + goto exit; + pos += bytes_read; + got_something = 1; + break; + case (3): // end_epoch + if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->end_epoch, &bytes_read) == 0) + goto exit; + pos += bytes_read; + got_something = 1; + break; + case (4): { // journal entry + uint8_t *temp; + size_t temp_length; + protobuf_decode_length_delimited(&incoming[pos], incoming_size - pos, (char**)&temp, &temp_length, &bytes_read); + pos += bytes_read; + struct JournalEntry* entry = NULL; + if (ipfs_journal_entry_decode(&temp[0], temp_length, &entry)) { + libp2p_utils_vector_add((*out)->journal_entries, (void*)entry); + free(temp); + } else { + free(temp); + goto exit; + } + got_something = 1; + break; + } + } + } + + retVal = got_something; + +exit: + if (retVal == 0) { + ipfs_journal_message_free(*out); + } + return retVal; + +} diff --git a/journal/journal_message.proto b/journal/journal_message.proto new file mode 100644 index 0000000..2da0c82 --- /dev/null +++ b/journal/journal_message.proto @@ -0,0 +1,12 @@ +message JournalEntry { + int32 timestamp = 1; + string hash = 2; + bool pin = 3; +} + +message JournalMessage { + int32 current_epoch = 1; + int32 start_epoch = 2; + int32 end_epoch = 3; + repeated JournalEntry journal_entries = 4; +} \ No newline at end of file diff --git a/main/Makefile b/main/Makefile index 65403ce..b194bd6 100644 --- a/main/Makefile +++ b/main/Makefile @@ -14,6 +14,7 @@ OBJS = main.o \ ../exchange/bitswap/*.o \ ../flatfs/flatfs.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ + ../journal/*.o \ ../path/path.o \ ../merkledag/merkledag.o ../merkledag/node.o \ ../multibase/multibase.o \ diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index 0736d51..eaf6b3e 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -13,6 +13,7 @@ #include "lmdb.h" #include "libp2p/utils/logger.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h" +#include "ipfs/repo/fsrepo/journalstore.h" #include "varint.h" /** @@ -121,52 +122,6 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, return 1; } -/*** - * Write a journal record - * @param mbd_txn the transaction - * @param timestamp the timestamp - * @param hash the hash - * @param hash_size the size of the hash - * @returns true(1) on success, false(0) otherwise - */ -int repo_fsrepo_lmdb_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) { - MDB_dbi mdb_dbi; - struct MDB_val db_key; - struct MDB_val db_value; - - libp2p_logger_debug("lmdb_datastore", "journal add timestamp: %llu.\n", timestamp); - - // build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value - uint8_t time_varint[8]; - size_t time_varint_size = 0; - varint_encode(timestamp, &time_varint[0], 8, &time_varint_size); - - libp2p_logger_debug("lmdb_datastore", "journal add varint size: %lu.\n", (unsigned long)time_varint_size); - - size_t record_size = hash_size + 1; - uint8_t record[record_size]; - record[0] = 1; - memcpy(&record[1], hash, hash_size); - - // open the journal table - - if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { - return 0; - } - - // write the record - db_key.mv_size = time_varint_size; - db_key.mv_data = time_varint; - - db_value.mv_size = record_size; - db_value.mv_data = record; - - if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { - return 0; - } - - return 1; -} /** * Get the current time UTC @@ -223,7 +178,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE); if (retVal == 0) { // the normal case - repo_fsrepo_lmdb_journal_add(mdb_txn, timestamp, key, key_size); + lmdb_journalstore_journal_add(mdb_txn, timestamp, key, key_size); retVal = 1; } else { if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip. diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index b1ccd94..83bbdbf 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -15,6 +15,49 @@ int journal_record_free(struct JournalRecord* rec) { return 1; } +/*** + * Write a journal record + * @param mbd_txn the transaction + * @param timestamp the timestamp + * @param hash the hash + * @param hash_size the size of the hash + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) { + MDB_dbi mdb_dbi; + struct MDB_val db_key; + struct MDB_val db_value; + + // build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value + uint8_t time_varint[8]; + size_t time_varint_size = 0; + varint_encode(timestamp, &time_varint[0], 8, &time_varint_size); + + size_t record_size = hash_size + 1; + uint8_t record[record_size]; + record[0] = 1; + memcpy(&record[1], hash, hash_size); + + // open the journal table + + if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + return 0; + } + + // write the record + db_key.mv_size = time_varint_size; + db_key.mv_data = time_varint; + + db_value.mv_size = record_size; + db_value.mv_data = record; + + if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { + return 0; + } + + return 1; +} + /** * Open a cursor to the journalstore table * @param datastore the data connection diff --git a/test/Makefile b/test/Makefile index a1195b1..50c5abf 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../c-protobuf -g3 -Wall -std=c99 +CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../c-protobuf -I../../lmdb/libraries/liblmdb -g3 -Wall -std=c99 LFLAGS = -L../../c-libp2p -L../../c-multihash -L../../c-multiaddr -lp2p -lm -lmultihash -lmultiaddr -lpthread DEPS = cmd/ipfs/test_init.h repo/test_repo_bootstrap_peers.h repo/test_repo_config.h repo/test_repo_identity.h cid/test_cid.h OBJS = testit.o test_helper.o \ @@ -17,6 +17,7 @@ OBJS = testit.o test_helper.o \ ../exchange/bitswap/*.o \ ../flatfs/flatfs.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ + ../journal/*.o \ ../merkledag/merkledag.o ../merkledag/node.o \ ../multibase/multibase.o \ ../repo/init.o \ diff --git a/test/journal/test_journal.h b/test/journal/test_journal.h new file mode 100644 index 0000000..5818543 --- /dev/null +++ b/test/journal/test_journal.h @@ -0,0 +1,69 @@ +#include + +#include "ipfs/journal/journal_entry.h" +#include "ipfs/journal/journal_message.h" + +int test_journal_encode_decode() { + int retVal = 0; + struct JournalEntry* entry = ipfs_journal_entry_new(); + struct JournalMessage* message = ipfs_journal_message_new(); + struct JournalMessage* result_message = NULL; + struct JournalEntry* result_entry = NULL; + uint8_t *buffer; + size_t buffer_size; + + // build entry + entry->hash = malloc(1); + entry->hash[0] = 1; + entry->hash_size = 1; + entry->pin = 1; + entry->timestamp = 1; + // build message + message->current_epoch = 2; + message->start_epoch = 3; + message->end_epoch = 4; + libp2p_utils_vector_add(message->journal_entries, entry); + + // protobuf it + buffer_size = ipfs_journal_message_encode_size(message); + buffer = malloc(buffer_size); + if (!ipfs_journal_message_encode(message, buffer, buffer_size, &buffer_size)) + goto exit; + + // unprotobuf it + if (!ipfs_journal_message_decode(buffer, buffer_size, &result_message)) + goto exit; + + // compare + if (result_message->current_epoch != message->current_epoch) + goto exit; + if (result_message->start_epoch != message->start_epoch) + goto exit; + if (result_message->end_epoch != message->end_epoch) + goto exit; + if (result_message->journal_entries->total != message->journal_entries->total) + goto exit; + result_entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, 0); + if (result_entry->timestamp != entry->timestamp) + goto exit; + if (result_entry->pin != entry->pin) + goto exit; + if (result_entry->hash_size != entry->hash_size) + goto exit; + for (int i = 0; i < result_entry->hash_size; i++) { + if (result_entry->hash[i] != entry->hash[i]) + goto exit; + } + + // cleanup + retVal = 1; + exit: + if (buffer != NULL) + free(buffer); + ipfs_journal_message_free(message); + ipfs_journal_message_free(result_message); + // the above lines take care of these + //ipfs_journal_entry_free(entry); + //ipfs_journal_entry_free(result_entry); + return retVal; +} diff --git a/test/testit.c b/test/testit.c index 96ce9fe..c655413 100644 --- a/test/testit.c +++ b/test/testit.c @@ -3,6 +3,7 @@ #include "exchange/test_bitswap.h" #include "exchange/test_bitswap_request_queue.h" #include "flatfs/test_flatfs.h" +#include "journal/test_journal.h" #include "merkledag/test_merkledag.h" #include "node/test_node.h" #include "node/test_importer.h" @@ -46,6 +47,7 @@ const char* names[] = { "test_cid_protobuf_encode_decode", "test_daemon_startup_shutdown", "test_datastore_list_journal", + "test_journal_encode_decode", "test_repo_config_new", "test_repo_config_init", "test_repo_config_write", @@ -103,6 +105,7 @@ int (*funcs[])(void) = { test_cid_protobuf_encode_decode, test_daemon_startup_shutdown, test_datastore_list_journal, + test_journal_encode_decode, test_repo_config_new, test_repo_config_init, test_repo_config_write,