diff --git a/blocks/blockstore.c b/blocks/blockstore.c index d377555..af5abe5 100644 --- a/blocks/blockstore.c +++ b/blocks/blockstore.c @@ -238,9 +238,6 @@ int ipfs_blockstore_put_unixfs(const struct UnixFS* unix_fs, const struct FSRepo return 0; } - // send to Put with key (this is now done separately) - //fs_repo->config->datastore->datastore_put(key, key_length, block->data, block->data_length, fs_repo->config->datastore); - free(key); free(filename); return 1; diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h new file mode 100644 index 0000000..67d4987 --- /dev/null +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -0,0 +1,33 @@ +#pragma once +/** + * Piggyback on the datastore to access the journal entries + */ + +#include +#include + +#include "libp2p/db/datastore.h" + +struct JournalRecord { + unsigned long long timestamp; + int pin; + uint8_t *hash; + size_t hash_size; +}; + +/** + * Open a cursor to the journalstore table + */ +int repo_journalstore_cursor_open(struct Datastore* datastore, void** cursor); + +/** + * Read a record from the cursor + */ +int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum DatastoreCursorOp op, struct JournalRecord** record); + +/** + * Close the cursor + */ +int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor); + +int journal_record_free(struct JournalRecord* rec); diff --git a/include/ipfs/repo/fsrepo/lmdb_datastore.h b/include/ipfs/repo/fsrepo/lmdb_datastore.h index c290aeb..66b467f 100644 --- a/include/ipfs/repo/fsrepo/lmdb_datastore.h +++ b/include/ipfs/repo/fsrepo/lmdb_datastore.h @@ -1,8 +1,16 @@ -#ifndef __FS_REPO_LMDB_DATASTORE_H__ -#define __FS_REPO_LMDB_DATASTORE_H__ +#pragma once +#include "lmdb.h" #include "libp2p/db/datastore.h" +static const char* DATASTORE_DB = "DATASTORE"; +static const char* JOURNAL_DB = "JOURNAL"; + +struct lmdb_trans_cursor { + MDB_txn* transaction; + MDB_cursor* cursor; +}; + /*** * Places the LMDB methods into the datastore's function pointers * @param datastore the datastore to fill @@ -33,5 +41,3 @@ int repo_fsrepo_lmdb_close(struct Datastore* datastore); * @returns true(1) on success */ int repo_fsrepo_lmdb_create_directory(struct Datastore* datastore); - -#endif diff --git a/main/Makefile b/main/Makefile index e7d14f8..65403ce 100644 --- a/main/Makefile +++ b/main/Makefile @@ -20,7 +20,7 @@ OBJS = main.o \ ../namesys/*.o \ ../pin/pin.o \ ../repo/init.o \ - ../repo/fsrepo/fs_repo.o ../repo/fsrepo/jsmn.o ../repo/fsrepo/lmdb_datastore.o \ + ../repo/fsrepo/*.o \ ../repo/config/*.o \ ../routing/*.o \ ../thirdparty/ipfsaddr/ipfs_addr.o \ diff --git a/main/main.c b/main/main.c index 8aa6784..2b5a882 100644 --- a/main/main.c +++ b/main/main.c @@ -108,6 +108,7 @@ int main(int argc, char** argv) { libp2p_logger_add_class("peerstore"); libp2p_logger_add_class("dht_protocol"); libp2p_logger_add_class("peer"); + libp2p_logger_add_class("lmdb_datastore"); strip_quotes(argc, argv); int retVal = parse_arguments(argc, argv); diff --git a/repo/fsrepo/Makefile b/repo/fsrepo/Makefile index 4b9e7d2..988a77e 100644 --- a/repo/fsrepo/Makefile +++ b/repo/fsrepo/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = fs_repo.o jsmn.o lmdb_datastore.o +OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index f41207e..0736d51 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -8,14 +8,55 @@ #include #include #include +#include #include "lmdb.h" +#include "libp2p/utils/logger.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h" +#include "varint.h" -struct lmdb_trans_cursor { - MDB_txn* transaction; - MDB_cursor* cursor; -}; +/** + * Build a "value" section for a datastore record + * @param timestamp the timestamp + * @param data the data (usually a base32 of the cid hash) + * @param data_length the length of data + * @param result the resultant data object + * @param result_size the size of the result + * @returns true(1) on success, otherwise 0 + */ +int repo_fsrepo_lmdb_build_record(const unsigned long long timestamp, const uint8_t *data, size_t data_length, uint8_t **result, size_t *result_size) { + // turn timestamp into varint + uint8_t ts_varint[8]; + size_t num_bytes; + if (varint_encode(timestamp, &ts_varint[0], 8, &num_bytes) == NULL) { + return 0; + } + // make new structure + *result = (uint8_t *) malloc(num_bytes + data_length); + if (*result == NULL) { + return 0; + } + memcpy(*result, ts_varint, num_bytes); + memcpy(&(*result)[num_bytes], data, data_length); + *result_size = data_length + num_bytes; + return 1; +} + +/** + * read a "value" section from a datastore record. + * @param data what we read from the datastore + * @param data_length the length of what we read from the datastore + * @param timestamp the timestamp that was read from the datastore + * @param record_pos where the data starts (without the timestamp) + * @param record_size the size of the data section of the record + * @returns true(1) on success, false(0) otherwise + */ +int repo_fsrepo_lmdb_parse_record(uint8_t *data, size_t data_length, unsigned long long *timestamp, uint8_t *record_pos, size_t *record_size) { + size_t varint_size = 0; + *timestamp = varint_decode(data, data_length, &varint_size); + record_pos = &data[varint_size]; + return 1; +} /*** * retrieve a record from the database and put in a pre-sized buffer @@ -28,7 +69,6 @@ struct lmdb_trans_cursor { * @returns true(1) on success */ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, size_t max_data_size, size_t* data_size, const struct Datastore* datastore) { - int retVal; MDB_txn* mdb_txn; MDB_dbi mdb_dbi; struct MDB_val db_key; @@ -39,11 +79,10 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, return 0; // open transaction - retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn); - if (retVal != 0) + if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) return 0; - retVal = mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi); - if (retVal != 0) { + + if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { mdb_txn_commit(mdb_txn); return 0; } @@ -52,33 +91,93 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, db_key.mv_size = key_size; db_key.mv_data = (char*)key; - //printf("Looking for data that has a key size of %lu that starts with %02x and ends with %02x\n", db_key.mv_size, ((char*)db_key.mv_data)[0], ((char*)db_key.mv_data)[db_key.mv_size - 1]); - - retVal = mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value); - if (retVal != 0) { - //mdb_dbi_close(mdb_env, mdb_dbi); + if (mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value) != 0) { mdb_txn_commit(mdb_txn); return 0; } - // now copy the data - if (db_value.mv_size > max_data_size) { - //mdb_dbi_close(mdb_env, mdb_dbi); + // the data from the database includes a timestamp. We'll need to strip it off. + unsigned long long timestamp; + uint8_t *pos = NULL; + size_t size = 0; + if (!repo_fsrepo_lmdb_parse_record(db_key.mv_data, db_key.mv_size, ×tamp, pos, &size)) { + mdb_txn_commit(mdb_txn); + return 0; + } + + // Was it too big to fit in the buffer they sent? + if (size > max_data_size) { mdb_txn_commit(mdb_txn); return 0; } // set return values - memcpy(data, db_value.mv_data, db_value.mv_size); - (*data_size) = db_value.mv_size; + memcpy(data, pos, size); + (*data_size) = size; // clean up - //mdb_dbi_close(mdb_env, mdb_dbi); mdb_txn_commit(mdb_txn); return 1; } +/*** + * Write a journal record + * @param mbd_txn the transaction + * @param timestamp the timestamp + * @param hash the hash + * @param hash_size the size of the hash + * @returns true(1) on success, false(0) otherwise + */ +int repo_fsrepo_lmdb_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) { + MDB_dbi mdb_dbi; + struct MDB_val db_key; + struct MDB_val db_value; + + libp2p_logger_debug("lmdb_datastore", "journal add timestamp: %llu.\n", timestamp); + + // build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value + uint8_t time_varint[8]; + size_t time_varint_size = 0; + varint_encode(timestamp, &time_varint[0], 8, &time_varint_size); + + libp2p_logger_debug("lmdb_datastore", "journal add varint size: %lu.\n", (unsigned long)time_varint_size); + + size_t record_size = hash_size + 1; + uint8_t record[record_size]; + record[0] = 1; + memcpy(&record[1], hash, hash_size); + + // open the journal table + + if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + return 0; + } + + // write the record + db_key.mv_size = time_varint_size; + db_key.mv_data = time_varint; + + db_value.mv_size = record_size; + db_value.mv_data = record; + + if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { + return 0; + } + + return 1; +} + +/** + * Get the current time UTC + * @returns number of seconds since epoch in UTC + */ +unsigned long long lmdb_datastore_gmt_time() { + time_t local = time(NULL); + struct tm *gmt = gmtime(&local); + return (unsigned long long)mktime(gmt); +} + /** * Write data to the datastore with the specified key * @param key the key @@ -103,21 +202,30 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn); if (retVal != 0) return 0; - retVal = mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi); + retVal = mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi); if (retVal != 0) return 0; + // add the timestamp + unsigned long long timestamp = lmdb_datastore_gmt_time(); + uint8_t *record; + size_t record_size; + repo_fsrepo_lmdb_build_record(timestamp, data, data_size, &record, &record_size); + // prepare data db_key.mv_size = key_size; db_key.mv_data = (char*)key; // write - db_value.mv_size = data_size; - db_value.mv_data = data; + db_value.mv_size = record_size; + db_value.mv_data = record; + retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE); - if (retVal == 0) // the normal case + if (retVal == 0) { + // the normal case + repo_fsrepo_lmdb_journal_add(mdb_txn, timestamp, key, key_size); retVal = 1; - else { + } else { if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip. retVal = 1; else @@ -125,7 +233,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha } // cleanup - //mdb_dbi_close(mdb_env, mdb_dbi); + free(record); mdb_txn_commit(mdb_txn); return retVal; } @@ -139,15 +247,20 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { // create environment struct MDB_env* mdb_env; - int retVal = mdb_env_create(&mdb_env); - if (retVal < 0) { + if (mdb_env_create(&mdb_env) < 0) { + mdb_env_close(mdb_env); + return 0; + } + + // at most, 2 databases will be opened. The datastore and the journal. + MDB_dbi dbs = 2; + if (mdb_env_set_maxdbs(mdb_env, dbs) != 0) { mdb_env_close(mdb_env); return 0; } // open the environment - retVal = mdb_env_open(mdb_env, datastore->path, 0, S_IRWXU); - if (retVal < 0) { + if (mdb_env_open(mdb_env, datastore->path, 0, S_IRWXU) < 0) { mdb_env_close(mdb_env); return 0; } @@ -169,6 +282,11 @@ int repo_fsrepo_lmdb_close(struct Datastore* datastore) { return 1; } +/*** + * Create a new cursor on the datastore database + * @param datastore the place to store the cursor + * @returns true(1) on success, false(0) otherwise + */ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) { if (datastore->handle != NULL) { MDB_env* mdb_env = (MDB_env*)datastore->handle; @@ -180,7 +298,7 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) { if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) return 0; MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; - if (mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi) != 0) { + if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { mdb_txn_commit(mdb_txn); return 0; } @@ -195,6 +313,16 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) { return 0; } +/*** + * Get a record using a cursor + * @param key the key from the record + * @param key_length the length of the key + * @param value the value of the record + * @param value_length the length of the value + * @param CURSOR_FIRST or CURSOR_NEXT + * @param datastore holds the reference to the opened cursor + * @returns true(1) on success, false(0) otherwise + */ int repo_fsrepo_lmdb_cursor_get(unsigned char** key, int* key_length, unsigned char** value, int* value_length, enum DatastoreCursorOp op, struct Datastore* datastore) diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c new file mode 100644 index 0000000..b1ccd94 --- /dev/null +++ b/repo/fsrepo/lmdb_journalstore.c @@ -0,0 +1,98 @@ +#include "ipfs/repo/fsrepo/journalstore.h" +#include "ipfs/repo/fsrepo/lmdb_datastore.h" + +#include "lmdb.h" +#include "varint.h" +#include + +int journal_record_free(struct JournalRecord* rec) { + if (rec != NULL) { + if (rec->hash != NULL) + free(rec->hash); + rec->hash = NULL; + free(rec); + } + return 1; +} + +/** + * Open a cursor to the journalstore table + * @param datastore the data connection + * @param crsr a reference to the cursor. In this implementation, it is an lmdb_trans_cursor struct + * @returns true(1) on success, false(0) otherwises + */ +int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) { + if (datastore->handle != NULL) { + MDB_env* mdb_env = (MDB_env*)datastore->handle; + MDB_dbi mdb_dbi; + if (*crsr == NULL ) { + *crsr = malloc(sizeof(struct lmdb_trans_cursor)); + struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr; + // open transaction + if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) + return 0; + MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; + if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + mdb_txn_commit(mdb_txn); + return 0; + } + // open cursor + if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) { + mdb_txn_commit(mdb_txn); + return 0; + } + return 1; + } + } + return 0; + +} + +/** + * Read a record from the cursor + */ +int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum DatastoreCursorOp op, struct JournalRecord** record) { + if (crsr != NULL) { + struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)crsr; + MDB_val mdb_key; + MDB_val mdb_value; + MDB_cursor_op co = MDB_FIRST; + if (op == CURSOR_FIRST) + co = MDB_FIRST; + else if (op == CURSOR_NEXT) + co = MDB_NEXT; + if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { + return 0; + } + // build the JournalRecord object + *record = (struct JournalRecord*) malloc(sizeof(struct JournalRecord)); + struct JournalRecord *rec = *record; + // timestamp + size_t varint_size = 0; + rec->timestamp = varint_decode(mdb_key.mv_data, mdb_key.mv_size, &varint_size); + // pin flag + rec->pin = ((uint8_t*)mdb_value.mv_data)[0]; + rec->hash_size = mdb_value.mv_size - 1; + rec->hash = malloc(rec->hash_size); + memcpy(rec->hash, &mdb_value.mv_data[1], rec->hash_size); + return 1; + } + return 0; +} + +/** + * Close the cursor + */ +int repo_cournalstore_cursor_close(struct Datastore* datastore, void* crsr) { + if (crsr != NULL) { + struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr; + if (cursor->cursor != NULL) { + mdb_cursor_close(cursor->cursor); + mdb_txn_commit(cursor->transaction); + free(cursor); + return 1; + } + free(cursor); + } + return 0; +} diff --git a/test/Makefile b/test/Makefile index 6a6dfae..a1195b1 100644 --- a/test/Makefile +++ b/test/Makefile @@ -20,7 +20,7 @@ OBJS = testit.o test_helper.o \ ../merkledag/merkledag.o ../merkledag/node.o \ ../multibase/multibase.o \ ../repo/init.o \ - ../repo/fsrepo/fs_repo.o ../repo/fsrepo/jsmn.o ../repo/fsrepo/lmdb_datastore.o \ + ../repo/fsrepo/*.o \ ../repo/config/*.o \ ../routing/offline.o \ ../routing/online.o \ diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index 6c64ce0..85fe25f 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -3,6 +3,7 @@ #include "ipfs/blocks/block.h" #include "ipfs/repo/config/config.h" #include "ipfs/repo/fsrepo/fs_repo.h" +#include "ipfs/repo/fsrepo/journalstore.h" #include "../test_helper.h" @@ -61,13 +62,48 @@ int test_ipfs_datastore_put() { return 0; } - // save the block - - // check the results - // clean up ipfs_repo_fsrepo_free(fs_repo); ipfs_block_free(block); return 1; } + +/** + * List what is in the journal + */ +int test_datastore_list_journal() { + libp2p_logger_add_class("test_datastore"); + libp2p_logger_add_class("lmdb_datastore"); + // open database + struct FSRepo* fs_repo; + if (ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo) == 0) { + return 0; + } + if (ipfs_repo_fsrepo_open(fs_repo) == 0) { + return 0; + } + // open cursor + void* crsr; + if (!repo_journalstore_cursor_open(fs_repo->config->datastore, &crsr)) { + ipfs_repo_fsrepo_free(fs_repo); + return 0; + } + // grab records + struct JournalRecord* record = NULL; + enum DatastoreCursorOp op = CURSOR_FIRST; + do { + if (repo_journalstore_cursor_get(fs_repo->config->datastore, crsr, op, &record) == 0) { + journal_record_free(record); + record = NULL; + } + // display record + libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp); + libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N"); + // free record + journal_record_free(record); + record = NULL; + op = CURSOR_NEXT; + } while (record != NULL); + return 1; +} diff --git a/test/testit.c b/test/testit.c index 87c9524..96ce9fe 100644 --- a/test/testit.c +++ b/test/testit.c @@ -45,6 +45,7 @@ const char* names[] = { "test_cid_cast_non_multihash", "test_cid_protobuf_encode_decode", "test_daemon_startup_shutdown", + "test_datastore_list_journal", "test_repo_config_new", "test_repo_config_init", "test_repo_config_write", @@ -101,6 +102,7 @@ int (*funcs[])(void) = { test_cid_cast_non_multihash, test_cid_protobuf_encode_decode, test_daemon_startup_shutdown, + test_datastore_list_journal, test_repo_config_new, test_repo_config_init, test_repo_config_write,