Several memory leak fixes for journal code
This commit is contained in:
parent
78904ff1b6
commit
a9481631df
9 changed files with 163 additions and 135 deletions
|
@ -3,6 +3,7 @@
|
|||
*/
|
||||
#include <stdlib.h>
|
||||
#include <unistd.h> // for sleep()
|
||||
#include "libp2p/os/utils.h"
|
||||
#include "libp2p/utils/logger.h"
|
||||
#include "ipfs/core/ipfs_node.h"
|
||||
#include "ipfs/exchange/exchange.h"
|
||||
|
@ -13,9 +14,7 @@
|
|||
#include "ipfs/exchange/bitswap/want_manager.h"
|
||||
|
||||
int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) {
|
||||
if (incoming_size < 8)
|
||||
return 0;
|
||||
char* result = strstr((char*)incoming, "/ipfs/bitswap");
|
||||
char* result = strnstr((char*)incoming, "/ipfs/bitswap", incoming_size);
|
||||
if(result == NULL || result != (char*)incoming)
|
||||
return 0;
|
||||
return 1;
|
||||
|
|
|
@ -28,7 +28,7 @@ int lmdb_journal_record_free(struct JournalRecord* rec);
|
|||
* @param cursor where to place the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor);
|
||||
int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor, struct MDB_txn *trans_to_use);
|
||||
|
||||
/**
|
||||
* Read a record from the cursor
|
||||
|
@ -46,7 +46,7 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR
|
|||
/**
|
||||
* Close the cursor
|
||||
*/
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor);
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction);
|
||||
|
||||
int journal_record_free(struct JournalRecord* rec);
|
||||
|
||||
|
|
|
@ -2,6 +2,13 @@
|
|||
|
||||
#include "lmdb.h"
|
||||
|
||||
struct lmdb_context {
|
||||
MDB_env *db_environment;
|
||||
MDB_txn *current_transaction;
|
||||
MDB_dbi *datastore_db;
|
||||
MDB_dbi *journal_db;
|
||||
};
|
||||
|
||||
struct lmdb_trans_cursor {
|
||||
MDB_env* environment;
|
||||
MDB_txn* parent_transaction;
|
||||
|
|
|
@ -17,9 +17,10 @@
|
|||
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
|
||||
*/
|
||||
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) {
|
||||
if (incoming_size < 8)
|
||||
const char* protocol = "/ipfs/journalio/1.0.0";
|
||||
if (incoming_size < 21)
|
||||
return 0;
|
||||
char* result = strstr((char*)incoming, "/ipfs/journalio/1.0.0");
|
||||
char* result = strnstr((char*)incoming, protocol, incoming_size);
|
||||
if(result == NULL || result != (char*)incoming)
|
||||
return 0;
|
||||
libp2p_logger_debug("journal", "Handling incoming message.\n");
|
||||
|
@ -61,7 +62,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
|
|||
struct Libp2pVector* vector = libp2p_utils_vector_new(1);
|
||||
if (vector != NULL) {
|
||||
struct lmdb_trans_cursor *cursor = NULL;
|
||||
if (!lmdb_journalstore_cursor_open(database->datastore_handle, &cursor)) {
|
||||
if (!lmdb_journalstore_cursor_open(database->datastore_context, &cursor, NULL)) {
|
||||
libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n");
|
||||
return NULL;
|
||||
}
|
||||
|
@ -69,7 +70,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
|
|||
if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) {
|
||||
libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n");
|
||||
libp2p_utils_vector_free(vector);
|
||||
lmdb_journalstore_cursor_close(cursor);
|
||||
lmdb_journalstore_cursor_close(cursor, 1);
|
||||
return NULL;
|
||||
}
|
||||
// we've got one, now start the loop
|
||||
|
@ -83,7 +84,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
|
|||
i++;
|
||||
} while(i < n);
|
||||
libp2p_logger_debug("journal", "Closing journalstore cursor.\n");
|
||||
lmdb_journalstore_cursor_close(cursor);
|
||||
lmdb_journalstore_cursor_close(cursor, 1);
|
||||
} else {
|
||||
libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n");
|
||||
}
|
||||
|
@ -147,6 +148,7 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
|
|||
if (journal_records == NULL || journal_records->total == 0) {
|
||||
// nothing to do
|
||||
libp2p_logger_debug("journal", "There are no journal records to process.\n");
|
||||
replication_peer->lastConnect = os_utils_gmtime();
|
||||
return 1;
|
||||
}
|
||||
// build the message
|
||||
|
@ -169,17 +171,11 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
|
|||
return 0;
|
||||
}
|
||||
memcpy(entry->hash, rec->hash, entry->hash_size);
|
||||
// debugging
|
||||
size_t b58size = 100;
|
||||
uint8_t *b58key = (uint8_t*) malloc(b58size);
|
||||
libp2p_crypto_encoding_base58_encode(entry->hash, entry->hash_size, &b58key, &b58size);
|
||||
free(b58key);
|
||||
libp2p_logger_debug("journal", "Adding hash %s to JournalMessage.\n", b58key);
|
||||
libp2p_utils_vector_add(message->journal_entries, entry);
|
||||
}
|
||||
// send the message
|
||||
message->current_epoch = os_utils_gmtime();
|
||||
libp2p_logger_debug("journal", "Sending message to %s.\n", peer->id);
|
||||
libp2p_logger_debug("journal", "Sending message to %s.\n", libp2p_peer_id_to_string(peer));
|
||||
int retVal = ipfs_journal_send_message(local_node, peer, message);
|
||||
if (retVal) {
|
||||
replication_peer->lastConnect = message->current_epoch;
|
||||
|
@ -308,6 +304,7 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
|
|||
if (second_read) {
|
||||
free(incoming_pos);
|
||||
}
|
||||
ipfs_journal_message_free(message);
|
||||
return -1;
|
||||
}
|
||||
struct Libp2pVector* todo_vector = NULL;
|
||||
|
@ -345,6 +342,8 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
|
|||
}
|
||||
//TODO: set new values in their ReplicationPeer struct
|
||||
|
||||
ipfs_journal_message_free(message);
|
||||
|
||||
if (second_read)
|
||||
free(incoming_pos);
|
||||
return 1;
|
||||
|
|
|
@ -129,22 +129,22 @@ int repo_fsrepo_lmdb_get_with_transaction(const unsigned char* key, size_t key_s
|
|||
*/
|
||||
int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) {
|
||||
MDB_txn* mdb_txn;
|
||||
MDB_dbi mdb_dbi;
|
||||
|
||||
MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle;
|
||||
if (mdb_env == NULL)
|
||||
if (datastore == NULL || datastore->datastore_context == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "get: datastore not initialized.\n");
|
||||
return 0;
|
||||
|
||||
// open transaction
|
||||
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0)
|
||||
return 0;
|
||||
|
||||
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||
mdb_txn_commit(mdb_txn);
|
||||
}
|
||||
struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context;
|
||||
if (db_context->db_environment == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "get: datastore environment not initialized.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, &mdb_dbi);
|
||||
// open transaction
|
||||
if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &mdb_txn) != 0)
|
||||
return 0;
|
||||
|
||||
int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, db_context->datastore_db);
|
||||
|
||||
mdb_txn_commit(mdb_txn);
|
||||
|
||||
|
@ -158,33 +158,13 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas
|
|||
* @param mdb_txn the transaction to be created
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_txn **mdb_txn) {
|
||||
int lmdb_datastore_create_transaction(struct lmdb_context *db_context, MDB_txn **mdb_txn) {
|
||||
// open transaction
|
||||
if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0)
|
||||
if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, mdb_txn) != 0)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *datastore_table, MDB_dbi* journalstore_table) {
|
||||
if (mdb_env == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "open_databases: environment not set.\n");
|
||||
return 0;
|
||||
}
|
||||
if (mdb_txn == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "open_database: transaction does not exist.\n");
|
||||
return 0;
|
||||
}
|
||||
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, datastore_table) != 0) {
|
||||
libp2p_logger_error("lmdb_datastore", "open_database: Unable to open datastore.\n");
|
||||
return 0;
|
||||
}
|
||||
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_table) != 0) {
|
||||
libp2p_logger_error("lmdb_datastore", "open_database: Unable to open journalstore.\n");
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write (or update) data in the datastore with the specified key
|
||||
* @param key the key
|
||||
|
@ -196,44 +176,38 @@ int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *d
|
|||
*/
|
||||
int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) {
|
||||
int retVal;
|
||||
MDB_txn *datastore_txn;
|
||||
MDB_dbi datastore_table;
|
||||
MDB_dbi journalstore_table;
|
||||
struct MDB_txn *child_transaction;
|
||||
struct MDB_val datastore_key;
|
||||
struct MDB_val datastore_value;
|
||||
struct DatastoreRecord *datastore_record = NULL;
|
||||
struct JournalRecord *journalstore_record = NULL;
|
||||
struct lmdb_trans_cursor *journalstore_cursor = NULL;
|
||||
|
||||
MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle;
|
||||
if (mdb_env == NULL) {
|
||||
if (datastore == NULL || datastore->datastore_context == NULL)
|
||||
return 0;
|
||||
|
||||
struct lmdb_context *db_context = (struct lmdb_context*)datastore->datastore_context;
|
||||
|
||||
if (db_context->db_environment == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: invalid datastore handle.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// open a transaction to the databases
|
||||
if (!lmdb_datastore_create_transaction(mdb_env, &datastore_txn)) {
|
||||
if (!lmdb_datastore_create_transaction(db_context, &child_transaction)) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: Unable to create db transaction.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!lmdb_datastore_open_databases(mdb_env, datastore_txn, &datastore_table, &journalstore_table)) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: Unable to open database tables.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// build the journalstore connectivity stuff
|
||||
journalstore_cursor = lmdb_trans_cursor_new();
|
||||
lmdb_journalstore_cursor_open(datastore->datastore_context, &journalstore_cursor, child_transaction);
|
||||
if (journalstore_cursor == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for journalstore cursor.\n");
|
||||
return 0;
|
||||
}
|
||||
journalstore_cursor->environment = mdb_env;
|
||||
journalstore_cursor->parent_transaction = datastore_txn;
|
||||
journalstore_cursor->database = &journalstore_table;
|
||||
|
||||
// see if what we want is already in the datastore
|
||||
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table);
|
||||
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, child_transaction, db_context->datastore_db);
|
||||
if (datastore_record != NULL) {
|
||||
// build the journalstore_record with the search criteria
|
||||
journalstore_record = lmdb_journal_record_new();
|
||||
|
@ -242,14 +216,13 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
memcpy(journalstore_record->hash, key, key_size);
|
||||
journalstore_record->timestamp = datastore_record->timestamp;
|
||||
// look up the corresponding journalstore record for possible updating
|
||||
lmdb_journalstore_get_record(datastore->datastore_handle, journalstore_cursor, &journalstore_record);
|
||||
lmdb_journalstore_cursor_close(journalstore_cursor);
|
||||
lmdb_journalstore_get_record(db_context, journalstore_cursor, &journalstore_record);
|
||||
} else { // it wasn't previously in the database
|
||||
datastore_record = libp2p_datastore_record_new();
|
||||
if (datastore_record == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n");
|
||||
lmdb_trans_cursor_free(journalstore_cursor);
|
||||
mdb_txn_commit(datastore_txn);
|
||||
mdb_txn_commit(child_transaction);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -283,7 +256,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
datastore_value.mv_size = record_size;
|
||||
datastore_value.mv_data = record;
|
||||
|
||||
retVal = mdb_put(datastore_txn, datastore_table, &datastore_key, &datastore_value, MDB_NODUPDATA);
|
||||
retVal = mdb_put(child_transaction, *db_context->datastore_db, &datastore_key, &datastore_value, MDB_NODUPDATA);
|
||||
|
||||
if (retVal == 0) {
|
||||
// Successfully added the datastore record. Now work with the journalstore.
|
||||
|
@ -292,7 +265,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
// we need to update
|
||||
journalstore_record->timestamp = datastore_record->timestamp;
|
||||
lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record);
|
||||
lmdb_journalstore_cursor_close(journalstore_cursor);
|
||||
lmdb_journalstore_cursor_close(journalstore_cursor, 0);
|
||||
lmdb_journal_record_free(journalstore_record);
|
||||
}
|
||||
} else {
|
||||
|
@ -307,7 +280,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) {
|
||||
libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n");
|
||||
}
|
||||
//lmdb_journalstore_cursor_close(journalstore_cursor);
|
||||
lmdb_journalstore_cursor_close(journalstore_cursor, 0);
|
||||
lmdb_journal_record_free(journalstore_record);
|
||||
retVal = 1;
|
||||
}
|
||||
|
@ -318,9 +291,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
}
|
||||
|
||||
// cleanup
|
||||
mdb_txn_commit(datastore_txn);
|
||||
if (mdb_txn_commit(child_transaction) != 0) {
|
||||
libp2p_logger_error("lmdb_datastore", "lmdb_put: transaction commit failed.\n");
|
||||
}
|
||||
free(record);
|
||||
lmdb_trans_cursor_free(journalstore_cursor);
|
||||
libp2p_datastore_record_free(datastore_record);
|
||||
return retVal;
|
||||
}
|
||||
|
@ -330,6 +304,8 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
* Note: for now, the parameters are not used
|
||||
* @param argc number of parameters in the following array
|
||||
* @param argv an array of parameters
|
||||
* @param datastore the datastore struct
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
|
||||
// create environment
|
||||
|
@ -352,20 +328,55 @@ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
datastore->datastore_handle = (void*)mdb_env;
|
||||
struct lmdb_context *db_context = (struct lmdb_context *) malloc(sizeof(struct lmdb_context));
|
||||
datastore->datastore_context = (void*) db_context;
|
||||
db_context->db_environment = (void*)mdb_env;
|
||||
db_context->datastore_db = (MDB_dbi*) malloc(sizeof(MDB_dbi));
|
||||
db_context->journal_db = (MDB_dbi*) malloc(sizeof(MDB_dbi));
|
||||
|
||||
// open the 2 databases
|
||||
if (mdb_txn_begin(mdb_env, NULL, 0, &db_context->current_transaction) != 0) {
|
||||
mdb_env_close(mdb_env);
|
||||
db_context->db_environment = NULL;
|
||||
return 0;
|
||||
}
|
||||
if (mdb_dbi_open(db_context->current_transaction, "DATASTORE", MDB_DUPSORT | MDB_CREATE, db_context->datastore_db ) != 0) {
|
||||
mdb_txn_abort(db_context->current_transaction);
|
||||
mdb_env_close(mdb_env);
|
||||
db_context->db_environment = NULL;
|
||||
return 0;
|
||||
}
|
||||
if (mdb_dbi_open(db_context->current_transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, db_context->journal_db) != 0) {
|
||||
mdb_txn_abort(db_context->current_transaction);
|
||||
mdb_env_close(mdb_env);
|
||||
db_context->db_environment = NULL;
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/***
|
||||
* Close an LMDB database
|
||||
* NOTE: for now, argc and argv are not used
|
||||
* @param argc number of parameters in the argv array
|
||||
* @param argv parameters to be passed in
|
||||
* @param datastore the datastore struct that contains information about the opened database
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int repo_fsrepo_lmdb_close(struct Datastore* datastore) {
|
||||
struct MDB_env* mdb_env = (struct MDB_env*)datastore->datastore_handle;
|
||||
mdb_env_close(mdb_env);
|
||||
// check parameters
|
||||
if (datastore == NULL || datastore->datastore_context == NULL)
|
||||
return 0;
|
||||
|
||||
// close the db environment
|
||||
struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context;
|
||||
if (db_context->current_transaction != NULL) {
|
||||
mdb_txn_commit(db_context->current_transaction);
|
||||
}
|
||||
mdb_env_close(db_context->db_environment);
|
||||
|
||||
free(db_context->datastore_db);
|
||||
free(db_context->journal_db);
|
||||
|
||||
free(db_context);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
||||
#include "varint.h"
|
||||
#include "lmdb.h"
|
||||
|
@ -131,18 +132,6 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor,
|
|||
createdTransaction = 1;
|
||||
}
|
||||
|
||||
if (journalstore_cursor->database == NULL) {
|
||||
// open the journal table
|
||||
journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi));
|
||||
if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// for debuggin
|
||||
MDB_txn *tx = journalstore_cursor->transaction;
|
||||
MDB_dbi dbi = *journalstore_cursor->database;
|
||||
if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n");
|
||||
return 0;
|
||||
|
@ -172,31 +161,18 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal
|
|||
libp2p_logger_error("lmdb_journalstore", "get_record: database environment not set up.\n");
|
||||
return 0;
|
||||
}
|
||||
struct MDB_env *mdb_env = (struct MDB_env*)handle;
|
||||
struct lmdb_context *db_context = (struct lmdb_context*)handle;
|
||||
|
||||
// create a new transaction if necessary
|
||||
if (journalstore_cursor->transaction == NULL) {
|
||||
if (mdb_txn_begin(mdb_env, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) {
|
||||
if (mdb_txn_begin(db_context->db_environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) {
|
||||
libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (journalstore_cursor->database == NULL) {
|
||||
// open the journal table
|
||||
journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi));
|
||||
if (journalstore_cursor->database == NULL) {
|
||||
libp2p_logger_error("lmdb_journalstore", "get_record: Unable to allocate memory for journalstore database.\n");
|
||||
return 0;
|
||||
}
|
||||
if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (journalstore_cursor->cursor == NULL) {
|
||||
if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) {
|
||||
if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor, NULL)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n");
|
||||
return 0;
|
||||
}
|
||||
|
@ -216,9 +192,9 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal
|
|||
* @param cursor where to place the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) {
|
||||
int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr, struct MDB_txn* trans_to_use) {
|
||||
if (handle != NULL) {
|
||||
MDB_env* mdb_env = (MDB_env*)handle;
|
||||
struct lmdb_context *db_context = (struct lmdb_context*)handle;
|
||||
struct lmdb_trans_cursor *cursor = *crsr;
|
||||
if (cursor == NULL ) {
|
||||
cursor = lmdb_trans_cursor_new();
|
||||
|
@ -226,19 +202,20 @@ int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr)
|
|||
return 0;
|
||||
*crsr = cursor;
|
||||
}
|
||||
cursor->database = db_context->journal_db;
|
||||
cursor->environment = db_context->db_environment;
|
||||
cursor->parent_transaction = db_context->current_transaction;
|
||||
|
||||
if (cursor->transaction == NULL) {
|
||||
if (trans_to_use != NULL)
|
||||
cursor->transaction = trans_to_use;
|
||||
else {
|
||||
// open transaction
|
||||
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) {
|
||||
if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &cursor->transaction) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (cursor->database == NULL) {
|
||||
if (mdb_dbi_open(cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, cursor->database) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open the dbi to the journalstore");
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (cursor->cursor == NULL) {
|
||||
// open cursor
|
||||
|
@ -310,10 +287,23 @@ int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *tc, enum DatastoreCur
|
|||
lmdb_journalstore_generate_key(*record, &mdb_key);
|
||||
}
|
||||
|
||||
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) {
|
||||
int retVal = mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co);
|
||||
if (retVal != 0) {
|
||||
if (retVal == MDB_NOTFOUND) {
|
||||
libp2p_logger_debug("lmdb_journalstore", "cursor_get: No records found in db.\n");
|
||||
} else if (retVal == EINVAL) {
|
||||
libp2p_logger_debug("lmdb_journalstore", "cursor_get: Invalid parameter specified.\n");
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (*record == NULL) {
|
||||
// make a new record and pass it back
|
||||
if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, record))
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// see if the passed in record has a specific record in mind (take care of duplicate keys)
|
||||
if ( (*record)->hash_size > 0) {
|
||||
struct JournalRecord* curr_record = NULL;
|
||||
|
@ -376,21 +366,21 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR
|
|||
}
|
||||
|
||||
/**
|
||||
* Close the cursor, but does not free the struct. It simply closes the cursor
|
||||
* and commits the transaction.
|
||||
* Close the cursor and commits the transaction.
|
||||
* @param crsr a lmdb_trans_cursor pointer
|
||||
* @returns true(1)
|
||||
*/
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) {
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction) {
|
||||
if (cursor != NULL) {
|
||||
if (cursor->cursor != NULL) {
|
||||
mdb_cursor_close(cursor->cursor);
|
||||
//mdb_cursor_close(cursor->cursor);
|
||||
}
|
||||
if (cursor->transaction != NULL) {
|
||||
if (cursor->transaction != NULL && commitTransaction) {
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
}
|
||||
cursor->cursor = NULL;
|
||||
cursor->transaction = NULL;
|
||||
lmdb_trans_cursor_free(cursor);
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
#include "ipfs/journal/journal_entry.h"
|
||||
#include "ipfs/journal/journal_message.h"
|
||||
#include "ipfs/repo/fsrepo/journalstore.h"
|
||||
|
||||
int test_journal_encode_decode() {
|
||||
int retVal = 0;
|
||||
|
@ -129,6 +130,8 @@ int test_journal_server_1() {
|
|||
|
||||
sleep(45);
|
||||
|
||||
libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n");
|
||||
|
||||
retVal = 1;
|
||||
exit:
|
||||
ipfs_daemon_stop();
|
||||
|
@ -183,7 +186,9 @@ int test_journal_server_2() {
|
|||
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
|
||||
thread_started = 1;
|
||||
|
||||
sleep(120);
|
||||
sleep(45);
|
||||
|
||||
libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n");
|
||||
|
||||
retVal = 1;
|
||||
exit:
|
||||
|
@ -272,6 +277,7 @@ int test_journal_db() {
|
|||
// close everything up
|
||||
if (mdb_txn_commit(mdb_txn) != 0)
|
||||
return 0;
|
||||
|
||||
mdb_env_close(mdb_env);
|
||||
|
||||
return 1;
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "mh/multihash.h"
|
||||
#include "libp2p/crypto/encoding/base58.h"
|
||||
#include "ipfs/core/ipfs_node.h"
|
||||
#include "ipfs/repo/fsrepo/lmdb_cursor.h"
|
||||
|
||||
int test_import_large_file() {
|
||||
size_t bytes_size = 1000000; //1mb
|
||||
|
@ -166,11 +167,6 @@ int test_import_small_file() {
|
|||
// cid should be the same each time
|
||||
unsigned char cid_test[10] = { 0x1e, 0xcf, 0x04, 0xce, 0x6a, 0xe8, 0xbf, 0xc0, 0xeb, 0xe4 };
|
||||
|
||||
/*
|
||||
for (int i = 0; i < 10; i++) {
|
||||
printf("%02x\n", write_node->hash[i]);
|
||||
}
|
||||
*/
|
||||
|
||||
for(int i = 0; i < 10; i++) {
|
||||
if (write_node->hash[i] != cid_test[i]) {
|
||||
|
@ -208,6 +204,19 @@ int test_import_small_file() {
|
|||
}
|
||||
}
|
||||
|
||||
// attempt to look in the journal for the entry
|
||||
struct lmdb_context *context = (struct lmdb_context*)local_node->repo->config->datastore->datastore_context;
|
||||
struct JournalRecord* record = NULL;
|
||||
struct lmdb_trans_cursor *cursor = lmdb_trans_cursor_new();
|
||||
cursor->environment = context->db_environment;
|
||||
cursor->database = context->journal_db;
|
||||
cursor->parent_transaction = context->current_transaction;
|
||||
if (mdb_cursor_open(context->current_transaction, *cursor->database, &cursor->cursor) != 0) {
|
||||
fprintf(stderr, "Unable to open cursor.\n");
|
||||
} else if (!lmdb_journalstore_cursor_get(cursor, CURSOR_FIRST, &record)) {
|
||||
fprintf(stderr, "Unable to find any records in the database.\n");
|
||||
}
|
||||
|
||||
ipfs_node_free(local_node);
|
||||
ipfs_hashtable_node_free(write_node);
|
||||
ipfs_hashtable_node_free(read_node);
|
||||
|
|
|
@ -73,11 +73,15 @@ int test_ipfs_datastore_put() {
|
|||
* List what is in the journal
|
||||
*/
|
||||
int test_datastore_list_journal() {
|
||||
int recCount = 0;
|
||||
libp2p_logger_add_class("test_datastore");
|
||||
libp2p_logger_add_class("lmdb_datastore");
|
||||
|
||||
// need to run test_import_small_file first
|
||||
|
||||
// open database
|
||||
struct FSRepo* fs_repo;
|
||||
if (ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo) == 0) {
|
||||
if (ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo) == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (ipfs_repo_fsrepo_open(fs_repo) == 0) {
|
||||
|
@ -85,7 +89,7 @@ int test_datastore_list_journal() {
|
|||
}
|
||||
// open cursor
|
||||
struct lmdb_trans_cursor *crsr = NULL;
|
||||
if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_handle, &crsr)) {
|
||||
if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_context, &crsr, NULL)) {
|
||||
ipfs_repo_fsrepo_free(fs_repo);
|
||||
return 0;
|
||||
}
|
||||
|
@ -96,7 +100,9 @@ int test_datastore_list_journal() {
|
|||
if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) {
|
||||
lmdb_journal_record_free(record);
|
||||
record = NULL;
|
||||
break;
|
||||
}
|
||||
recCount++;
|
||||
// display record
|
||||
libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp);
|
||||
libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N");
|
||||
|
@ -105,5 +111,6 @@ int test_datastore_list_journal() {
|
|||
record = NULL;
|
||||
op = CURSOR_NEXT;
|
||||
} while (record != NULL);
|
||||
libp2p_logger_error("test_datastore", "Found %d records.\n", recCount);
|
||||
return 1;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue