refactored datastore/journalstore for readability
This commit is contained in:
parent
cb1ea3ceff
commit
78904ff1b6
6 changed files with 132 additions and 11 deletions
|
@ -502,6 +502,7 @@ int fs_repo_open_config(struct FSRepo* repo) {
|
||||||
continue;
|
continue;
|
||||||
// make multiaddress a peer
|
// make multiaddress a peer
|
||||||
struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur);
|
struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur);
|
||||||
|
multiaddress_free(cur);
|
||||||
struct ReplicationPeer* rp = repo_config_replication_peer_new();
|
struct ReplicationPeer* rp = repo_config_replication_peer_new();
|
||||||
rp->peer = peer;
|
rp->peer = peer;
|
||||||
libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer));
|
libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer));
|
||||||
|
|
|
@ -25,9 +25,6 @@ struct lmdb_trans_cursor* lmdb_trans_cursor_new() {
|
||||||
*/
|
*/
|
||||||
int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) {
|
int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) {
|
||||||
if (in != NULL) {
|
if (in != NULL) {
|
||||||
if (in->database != NULL) {
|
|
||||||
free(in->database);
|
|
||||||
}
|
|
||||||
free(in);
|
free(in);
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -158,12 +158,28 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas
|
||||||
* @param mdb_txn the transaction to be created
|
* @param mdb_txn the transaction to be created
|
||||||
* @returns true(1) on success, false(0) otherwise
|
* @returns true(1) on success, false(0) otherwise
|
||||||
*/
|
*/
|
||||||
int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_dbi *mdb_dbi, MDB_txn **mdb_txn) {
|
int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_txn **mdb_txn) {
|
||||||
// open transaction
|
// open transaction
|
||||||
if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0)
|
if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0)
|
||||||
return 0;
|
return 0;
|
||||||
if (mdb_dbi_open(*mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, mdb_dbi) != 0) {
|
return 1;
|
||||||
mdb_txn_commit(*mdb_txn);
|
}
|
||||||
|
|
||||||
|
int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *datastore_table, MDB_dbi* journalstore_table) {
|
||||||
|
if (mdb_env == NULL) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "open_databases: environment not set.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (mdb_txn == NULL) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "open_database: transaction does not exist.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, datastore_table) != 0) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "open_database: Unable to open datastore.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_table) != 0) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "open_database: Unable to open journalstore.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -182,6 +198,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
int retVal;
|
int retVal;
|
||||||
MDB_txn *datastore_txn;
|
MDB_txn *datastore_txn;
|
||||||
MDB_dbi datastore_table;
|
MDB_dbi datastore_table;
|
||||||
|
MDB_dbi journalstore_table;
|
||||||
struct MDB_val datastore_key;
|
struct MDB_val datastore_key;
|
||||||
struct MDB_val datastore_value;
|
struct MDB_val datastore_value;
|
||||||
struct DatastoreRecord *datastore_record = NULL;
|
struct DatastoreRecord *datastore_record = NULL;
|
||||||
|
@ -195,7 +212,13 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
}
|
}
|
||||||
|
|
||||||
// open a transaction to the databases
|
// open a transaction to the databases
|
||||||
if (!lmdb_datastore_create_transaction(mdb_env, &datastore_table, &datastore_txn)) {
|
if (!lmdb_datastore_create_transaction(mdb_env, &datastore_txn)) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "put: Unable to create db transaction.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!lmdb_datastore_open_databases(mdb_env, datastore_txn, &datastore_table, &journalstore_table)) {
|
||||||
|
libp2p_logger_error("lmdb_datastore", "put: Unable to open database tables.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -207,6 +230,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
}
|
}
|
||||||
journalstore_cursor->environment = mdb_env;
|
journalstore_cursor->environment = mdb_env;
|
||||||
journalstore_cursor->parent_transaction = datastore_txn;
|
journalstore_cursor->parent_transaction = datastore_txn;
|
||||||
|
journalstore_cursor->database = &journalstore_table;
|
||||||
|
|
||||||
// see if what we want is already in the datastore
|
// see if what we want is already in the datastore
|
||||||
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table);
|
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table);
|
||||||
|
@ -283,7 +307,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) {
|
if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) {
|
||||||
libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n");
|
libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n");
|
||||||
}
|
}
|
||||||
lmdb_journalstore_cursor_close(journalstore_cursor);
|
//lmdb_journalstore_cursor_close(journalstore_cursor);
|
||||||
lmdb_journal_record_free(journalstore_record);
|
lmdb_journal_record_free(journalstore_record);
|
||||||
retVal = 1;
|
retVal = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,7 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor,
|
||||||
|
|
||||||
MDB_val journalstore_key;
|
MDB_val journalstore_key;
|
||||||
MDB_val journalstore_value;
|
MDB_val journalstore_value;
|
||||||
|
int createdTransaction = 0;
|
||||||
|
|
||||||
if (!lmdb_journalstore_build_key_value_pair(journalstore_record, &journalstore_key, &journalstore_value)) {
|
if (!lmdb_journalstore_build_key_value_pair(journalstore_record, &journalstore_key, &journalstore_value)) {
|
||||||
libp2p_logger_error("lmdbd_journalstore", "add: Unable to convert journalstore record to key/value.\n");
|
libp2p_logger_error("lmdbd_journalstore", "add: Unable to convert journalstore record to key/value.\n");
|
||||||
|
@ -127,6 +128,7 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor,
|
||||||
// create transaction if necessary
|
// create transaction if necessary
|
||||||
if (journalstore_cursor->transaction == NULL) {
|
if (journalstore_cursor->transaction == NULL) {
|
||||||
mdb_txn_begin(journalstore_cursor->environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction);
|
mdb_txn_begin(journalstore_cursor->environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction);
|
||||||
|
createdTransaction = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (journalstore_cursor->database == NULL) {
|
if (journalstore_cursor->database == NULL) {
|
||||||
|
@ -138,11 +140,21 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for debuggin
|
||||||
|
MDB_txn *tx = journalstore_cursor->transaction;
|
||||||
|
MDB_dbi dbi = *journalstore_cursor->database;
|
||||||
if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) {
|
if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) {
|
||||||
libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n");
|
libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (createdTransaction) {
|
||||||
|
if (mdb_txn_commit(journalstore_cursor->transaction) != 0) {
|
||||||
|
libp2p_logger_error("lmdb_journalstore", "Unable to commit JOURNALSTORE transaction.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,7 +176,7 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal
|
||||||
|
|
||||||
// create a new transaction if necessary
|
// create a new transaction if necessary
|
||||||
if (journalstore_cursor->transaction == NULL) {
|
if (journalstore_cursor->transaction == NULL) {
|
||||||
if (mdb_txn_begin(mdb_env, NULL, 0, &journalstore_cursor->transaction) != 0) {
|
if (mdb_txn_begin(mdb_env, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) {
|
||||||
libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n");
|
libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -373,12 +385,12 @@ int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) {
|
||||||
if (cursor != NULL) {
|
if (cursor != NULL) {
|
||||||
if (cursor->cursor != NULL) {
|
if (cursor->cursor != NULL) {
|
||||||
mdb_cursor_close(cursor->cursor);
|
mdb_cursor_close(cursor->cursor);
|
||||||
cursor->cursor = NULL;
|
|
||||||
}
|
}
|
||||||
if (cursor->transaction != NULL) {
|
if (cursor->transaction != NULL) {
|
||||||
mdb_txn_commit(cursor->transaction);
|
mdb_txn_commit(cursor->transaction);
|
||||||
cursor->transaction = NULL;
|
|
||||||
}
|
}
|
||||||
|
cursor->cursor = NULL;
|
||||||
|
cursor->transaction = NULL;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,7 @@ int test_journal_server_1() {
|
||||||
ipfs_node_offline_new(ipfs_path, &local_node);
|
ipfs_node_offline_new(ipfs_path, &local_node);
|
||||||
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
|
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
|
||||||
ipfs_node_free(local_node);
|
ipfs_node_free(local_node);
|
||||||
|
ipfs_hashtable_node_free(node);
|
||||||
|
|
||||||
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n");
|
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n");
|
||||||
|
|
||||||
|
@ -191,3 +192,87 @@ int test_journal_server_2() {
|
||||||
pthread_join(daemon_thread, NULL);
|
pthread_join(daemon_thread, NULL);
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#include "lmdb.h"
|
||||||
|
|
||||||
|
// test the lightning db process
|
||||||
|
int test_journal_db() {
|
||||||
|
|
||||||
|
MDB_env *mdb_env = NULL;
|
||||||
|
MDB_txn *mdb_txn = NULL;
|
||||||
|
MDB_dbi datastore_db;
|
||||||
|
MDB_dbi journalstore_db;
|
||||||
|
MDB_val datastore_key;
|
||||||
|
MDB_val datastore_value;
|
||||||
|
MDB_val *journalstore_key;
|
||||||
|
MDB_val *journalstore_value;
|
||||||
|
MDB_val returned_value;
|
||||||
|
|
||||||
|
// set up records
|
||||||
|
char* key = "ABC123";
|
||||||
|
char* value = "Hello, world!";
|
||||||
|
datastore_key.mv_size = strlen(key);
|
||||||
|
datastore_key.mv_data = (void*)key;
|
||||||
|
datastore_value.mv_size = strlen(value);
|
||||||
|
datastore_value.mv_data = (void*)value;
|
||||||
|
journalstore_key = (MDB_val*) malloc(sizeof(MDB_val));
|
||||||
|
journalstore_key->mv_size = strlen(key);
|
||||||
|
journalstore_key->mv_data = (void*)key;
|
||||||
|
journalstore_value = (MDB_val*) malloc(sizeof(MDB_val));
|
||||||
|
journalstore_value->mv_size = strlen(value);
|
||||||
|
journalstore_value->mv_data = (void*)value;
|
||||||
|
|
||||||
|
// clean up the old stuff
|
||||||
|
unlink ("/tmp/lock.mdb");
|
||||||
|
unlink ("/tmp/data.mdb");
|
||||||
|
|
||||||
|
// create environment
|
||||||
|
if (mdb_env_create(&mdb_env) != 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (mdb_env_set_maxdbs(mdb_env, (MDB_dbi)2) != 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (mdb_env_open(mdb_env, "/tmp", 0, S_IRWXU) != 0) {
|
||||||
|
fprintf(stderr, "Unable to open environment.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// create a transaction
|
||||||
|
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) {
|
||||||
|
fprintf(stderr, "Unable to open transaction.\n");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// open databases
|
||||||
|
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &datastore_db) != 0)
|
||||||
|
return 0;
|
||||||
|
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &journalstore_db) != 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// search for a record in the datastore
|
||||||
|
if (mdb_get(mdb_txn, datastore_db, &datastore_key, &returned_value) != MDB_NOTFOUND) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// add record to datastore
|
||||||
|
if (mdb_put(mdb_txn, datastore_db, &datastore_key, &datastore_value, 0) != 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// add record to journalstore
|
||||||
|
if (mdb_put(mdb_txn, journalstore_db, journalstore_key, journalstore_value, 0) != 0)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// get rid of MDB_val values from journalstore to see if commit still works
|
||||||
|
free(journalstore_key);
|
||||||
|
free(journalstore_value);
|
||||||
|
journalstore_key = NULL;
|
||||||
|
journalstore_value = NULL;
|
||||||
|
|
||||||
|
// close everything up
|
||||||
|
if (mdb_txn_commit(mdb_txn) != 0)
|
||||||
|
return 0;
|
||||||
|
mdb_env_close(mdb_env);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ const char* names[] = {
|
||||||
"test_cid_protobuf_encode_decode",
|
"test_cid_protobuf_encode_decode",
|
||||||
"test_daemon_startup_shutdown",
|
"test_daemon_startup_shutdown",
|
||||||
"test_datastore_list_journal",
|
"test_datastore_list_journal",
|
||||||
|
"test_journal_db",
|
||||||
"test_journal_encode_decode",
|
"test_journal_encode_decode",
|
||||||
"test_journal_server_1",
|
"test_journal_server_1",
|
||||||
"test_journal_server_2",
|
"test_journal_server_2",
|
||||||
|
@ -107,6 +108,7 @@ int (*funcs[])(void) = {
|
||||||
test_cid_protobuf_encode_decode,
|
test_cid_protobuf_encode_decode,
|
||||||
test_daemon_startup_shutdown,
|
test_daemon_startup_shutdown,
|
||||||
test_datastore_list_journal,
|
test_datastore_list_journal,
|
||||||
|
test_journal_db,
|
||||||
test_journal_encode_decode,
|
test_journal_encode_decode,
|
||||||
test_journal_server_1,
|
test_journal_server_1,
|
||||||
test_journal_server_2,
|
test_journal_server_2,
|
||||||
|
|
Loading…
Reference in a new issue