Implementation of timestamp in both datastore and journalstore
This commit is contained in:
parent
bf87d93136
commit
bf7ba9049c
9 changed files with 393 additions and 108 deletions
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include "lmdb.h"
|
||||
#include "libp2p/db/datastore.h"
|
||||
#include "ipfs/repo/fsrepo/lmdb_cursor.h"
|
||||
|
||||
struct JournalRecord {
|
||||
unsigned long long timestamp; // the timestamp of the file
|
||||
|
@ -23,19 +24,40 @@ int lmdb_journal_record_free(struct JournalRecord* rec);
|
|||
|
||||
/**
|
||||
* Open a cursor to the journalstore table
|
||||
* @param db_handle a handle to the database (an MDB_env pointer)
|
||||
* @param cursor where to place the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int repo_journalstore_cursor_open(struct Datastore* datastore, void** cursor);
|
||||
int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor);
|
||||
|
||||
/**
|
||||
* Read a record from the cursor
|
||||
*/
|
||||
int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum DatastoreCursorOp op, struct JournalRecord** record);
|
||||
int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *cursor, enum DatastoreCursorOp op, struct JournalRecord** record);
|
||||
|
||||
/***
|
||||
* Write the record at the cursor
|
||||
* @param crsr the cursor
|
||||
* @param journal_record the record to write
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalRecord* journal_record);
|
||||
|
||||
/**
|
||||
* Close the cursor
|
||||
*/
|
||||
int repo_journalstore_cursor_close(struct Datastore* datastore, void* cursor);
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor);
|
||||
|
||||
int journal_record_free(struct JournalRecord* rec);
|
||||
|
||||
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record);
|
||||
|
||||
/***
|
||||
* Attempt to get a specific record identified by its timestamp and bytes
|
||||
* @param handle a handle to the database engine
|
||||
* @param journalstore_cursor the cursor (will be returned as a cursor that points to the record found
|
||||
* @param journalstore_record where to put the results (can pass null). If data is within the struct, will use it as search criteria
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record);
|
||||
|
||||
|
|
22
include/ipfs/repo/fsrepo/lmdb_cursor.h
Normal file
22
include/ipfs/repo/fsrepo/lmdb_cursor.h
Normal file
|
@ -0,0 +1,22 @@
|
|||
#pragma once
|
||||
|
||||
#include "lmdb.h"
|
||||
|
||||
struct lmdb_trans_cursor {
|
||||
MDB_txn* transaction;
|
||||
MDB_dbi* database;
|
||||
MDB_cursor* cursor;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create a new lmdb_trans_cursor struct
|
||||
* @returns a newly allocated trans_cursor struct
|
||||
*/
|
||||
struct lmdb_trans_cursor* lmdb_trans_cursor_new();
|
||||
|
||||
/***
|
||||
* Clean up resources from a lmdb_trans_cursor struct
|
||||
* @param in the cursor
|
||||
* @returns true(1)
|
||||
*/
|
||||
int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in);
|
|
@ -3,11 +3,6 @@
|
|||
#include "lmdb.h"
|
||||
#include "libp2p/db/datastore.h"
|
||||
|
||||
struct lmdb_trans_cursor {
|
||||
MDB_txn* transaction;
|
||||
MDB_cursor* cursor;
|
||||
};
|
||||
|
||||
/***
|
||||
* Places the LMDB methods into the datastore's function pointers
|
||||
* @param datastore the datastore to fill
|
||||
|
|
|
@ -60,16 +60,16 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I
|
|||
struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
|
||||
struct Libp2pVector* vector = libp2p_utils_vector_new(1);
|
||||
if (vector != NULL) {
|
||||
void* cursor = NULL;
|
||||
if (!repo_journalstore_cursor_open(database, &cursor)) {
|
||||
struct lmdb_trans_cursor *cursor = NULL;
|
||||
if (!lmdb_journalstore_cursor_open(database->handle, &cursor)) {
|
||||
libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n");
|
||||
return NULL;
|
||||
}
|
||||
struct JournalRecord* rec = NULL;
|
||||
if (!repo_journalstore_cursor_get(database, cursor, CURSOR_LAST, &rec)) {
|
||||
if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) {
|
||||
libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n");
|
||||
libp2p_utils_vector_free(vector);
|
||||
repo_journalstore_cursor_close(database, cursor);
|
||||
lmdb_journalstore_cursor_close(cursor);
|
||||
return NULL;
|
||||
}
|
||||
// we've got one, now start the loop
|
||||
|
@ -77,13 +77,13 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
|
|||
do {
|
||||
libp2p_logger_debug("journal", "Adding record to the vector.\n");
|
||||
libp2p_utils_vector_add(vector, rec);
|
||||
if (!repo_journalstore_cursor_get(database, cursor, CURSOR_PREVIOUS, &rec)) {
|
||||
if (!lmdb_journalstore_cursor_get(cursor, CURSOR_PREVIOUS, &rec)) {
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
} while(i < n);
|
||||
libp2p_logger_debug("journal", "Closing journalstore cursor.\n");
|
||||
repo_journalstore_cursor_close(database, cursor);
|
||||
lmdb_journalstore_cursor_close(cursor);
|
||||
} else {
|
||||
libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n");
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ endif
|
|||
|
||||
LFLAGS =
|
||||
DEPS =
|
||||
OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o
|
||||
OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o lmdb_cursor.o
|
||||
|
||||
%.o: %.c $(DEPS)
|
||||
$(CC) -c -o $@ $< $(CFLAGS)
|
||||
|
|
27
repo/fsrepo/lmdb_cursor.c
Normal file
27
repo/fsrepo/lmdb_cursor.c
Normal file
|
@ -0,0 +1,27 @@
|
|||
#include <stdlib.h>
|
||||
|
||||
#include "ipfs/repo/fsrepo/lmdb_cursor.h"
|
||||
|
||||
/**
|
||||
* Create a new lmdb_trans_cursor struct
|
||||
* @returns a newly allocated trans_cursor struct
|
||||
*/
|
||||
struct lmdb_trans_cursor* lmdb_trans_cursor_new() {
|
||||
struct lmdb_trans_cursor* out = (struct lmdb_trans_cursor*) malloc(sizeof(struct lmdb_trans_cursor));
|
||||
if (out != NULL) {
|
||||
out->cursor = NULL;
|
||||
out->transaction = NULL;
|
||||
out->database = NULL;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/***
|
||||
* Clean up resources from a lmdb_trans_cursor struct
|
||||
* @param in the cursor
|
||||
* @returns true(1)
|
||||
*/
|
||||
int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) {
|
||||
free(in);
|
||||
return 1;
|
||||
}
|
|
@ -150,6 +150,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
MDB_dbi mdb_dbi;
|
||||
struct MDB_val db_key;
|
||||
struct MDB_val db_value;
|
||||
unsigned long long old_timestamp = 0;
|
||||
struct DatastoreRecord *datastore_record = NULL;
|
||||
struct JournalRecord *journalstore_record = NULL;
|
||||
struct lmdb_trans_cursor *journalstore_cursor = NULL;
|
||||
|
||||
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
||||
if (mdb_env == NULL)
|
||||
|
@ -163,9 +167,42 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
if (retVal != 0)
|
||||
return 0;
|
||||
|
||||
// add the timestamp
|
||||
unsigned long long timestamp = os_utils_gmtime();
|
||||
struct DatastoreRecord *datastore_record = libp2p_datastore_record_new();
|
||||
// check the datastore to see if it is already there. If it is there, use its timestamp if it is older.
|
||||
repo_fsrepo_lmdb_get(key, key_size, &datastore_record, datastore);
|
||||
if (datastore_record != NULL) {
|
||||
// build the journalstore_record with the search criteria
|
||||
journalstore_record = lmdb_journal_record_new();
|
||||
journalstore_record->hash_size = key_size;
|
||||
journalstore_record->hash = malloc(key_size);
|
||||
memcpy(journalstore_record->hash, key, key_size);
|
||||
journalstore_record->timestamp = datastore_record->timestamp;
|
||||
// look up the corresponding journalstore record for possible updating
|
||||
journalstore_cursor = lmdb_trans_cursor_new();
|
||||
journalstore_cursor->transaction = mdb_txn;
|
||||
lmdb_journalstore_get_record((void*)mdb_env, journalstore_cursor, &journalstore_record);
|
||||
} else { // it wasn't previously in the database
|
||||
datastore_record = libp2p_datastore_record_new();
|
||||
if (datastore_record == NULL) {
|
||||
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// Put in the timestamp if it isn't there already (or is newer)
|
||||
unsigned long long now = os_utils_gmtime();
|
||||
if (datastore_record->timestamp == 0 || datastore_record->timestamp > now) {
|
||||
//we need to update the timestamp. Be sure to update the journal too. (done further down)
|
||||
old_timestamp = datastore_record->timestamp;
|
||||
datastore_record->timestamp = now;
|
||||
}
|
||||
// fill in the other fields
|
||||
datastore_record->key_size = key_size;
|
||||
datastore_record->key = (uint8_t*)key;
|
||||
datastore_record->value_size = data_size;
|
||||
datastore_record->value = data;
|
||||
|
||||
// convert it into a byte array
|
||||
|
||||
size_t record_size = 0;
|
||||
uint8_t *record;
|
||||
repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size);
|
||||
|
@ -178,26 +215,35 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
|||
db_value.mv_size = record_size;
|
||||
db_value.mv_data = record;
|
||||
|
||||
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE);
|
||||
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA);
|
||||
|
||||
if (retVal == 0) {
|
||||
// the normal case
|
||||
// add it to the journalstore
|
||||
struct JournalRecord* rec = lmdb_journal_record_new();
|
||||
rec->hash = (uint8_t*)key;
|
||||
rec->hash_size = key_size;
|
||||
rec->timestamp = timestamp;
|
||||
rec->pending = 1;
|
||||
rec->pin = 1;
|
||||
lmdb_journalstore_journal_add(mdb_txn, rec);
|
||||
lmdb_journal_record_free(rec);
|
||||
retVal = 1;
|
||||
} else {
|
||||
if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip.
|
||||
// added the datastore record, now work with the journalstore
|
||||
if (journalstore_record != NULL) {
|
||||
if (journalstore_record->timestamp != datastore_record->timestamp) {
|
||||
// we need to update
|
||||
journalstore_record->timestamp = datastore_record->timestamp;
|
||||
lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record);
|
||||
lmdb_journal_record_free(journalstore_record);
|
||||
}
|
||||
} else {
|
||||
// add it to the journalstore
|
||||
journalstore_record = lmdb_journal_record_new();
|
||||
journalstore_record->hash = (uint8_t*)key;
|
||||
journalstore_record->hash_size = key_size;
|
||||
journalstore_record->timestamp = datastore_record->timestamp;
|
||||
journalstore_record->pending = 1; // TODO: Calculate this correctly
|
||||
journalstore_record->pin = 1;
|
||||
if (!lmdb_journalstore_journal_add(mdb_txn, journalstore_record)) {
|
||||
libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n");
|
||||
}
|
||||
lmdb_journal_record_free(journalstore_record);
|
||||
retVal = 1;
|
||||
else {
|
||||
libp2p_logger_error("lmdb_datastore", "mdb_put returned %d.\n", retVal);
|
||||
retVal = 0;
|
||||
}
|
||||
} else {
|
||||
// datastore record was unable to be added.
|
||||
libp2p_logger_error("lmdb_datastore", "mdb_put returned %d.\n", retVal);
|
||||
retVal = 0;
|
||||
}
|
||||
|
||||
// cleanup
|
||||
|
|
|
@ -29,6 +29,82 @@ int lmdb_journal_record_free(struct JournalRecord* rec) {
|
|||
return 1;
|
||||
}
|
||||
|
||||
int lmdb_journalstore_generate_key(const struct JournalRecord* journal_record, struct MDB_val *db_key) {
|
||||
// build the key
|
||||
uint8_t time_varint[8];
|
||||
size_t time_varint_size = 0;
|
||||
varint_encode(journal_record->timestamp, &time_varint[0], 8, &time_varint_size);
|
||||
|
||||
db_key->mv_size = time_varint_size;
|
||||
db_key->mv_data = time_varint;
|
||||
return 1;
|
||||
}
|
||||
/***
|
||||
* Convert the JournalRec struct into a lmdb key and lmdb value
|
||||
* @param journal_record the record to convert
|
||||
* @param db_key where to store the key information
|
||||
* @param db_value where to store the value information
|
||||
*/
|
||||
int lmdb_journalstore_build_key_value_pair(const struct JournalRecord* journal_record, struct MDB_val* db_key, struct MDB_val *db_value) {
|
||||
// build the record, which is a timestamp as a key
|
||||
|
||||
// build the key
|
||||
lmdb_journalstore_generate_key(journal_record, db_key);
|
||||
|
||||
// build the value
|
||||
size_t record_size = journal_record->hash_size + 2;
|
||||
uint8_t record[record_size];
|
||||
// Field 1: pin flag
|
||||
record[0] = journal_record->pin;
|
||||
// Field 2: pending flag
|
||||
record[1] = journal_record->pending;
|
||||
// field 3: hash
|
||||
memcpy(&record[2], journal_record->hash, journal_record->hash_size);
|
||||
|
||||
db_value->mv_size = record_size;
|
||||
db_value->mv_data = record;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/***
|
||||
* Build a JournalRecord from a key/value pair from the db
|
||||
* @param db_key the key
|
||||
* @param db_value the value
|
||||
* @param journal_record where to store the results
|
||||
* @reutrns true(1) on success, false(0) on error
|
||||
*/
|
||||
int lmdb_journalstore_build_record(const struct MDB_val* db_key, const struct MDB_val *db_value, struct JournalRecord **journal_record) {
|
||||
if (*journal_record == NULL) {
|
||||
*journal_record = lmdb_journal_record_new();
|
||||
if (*journal_record == NULL) {
|
||||
libp2p_logger_error("lmdb_journalstore", "build_record: Unable to allocate memory for new journal record.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
struct JournalRecord *rec = *journal_record;
|
||||
// timestamp
|
||||
size_t varint_size = 0;
|
||||
rec->timestamp = varint_decode(db_key->mv_data, db_key->mv_size, &varint_size);
|
||||
// pin flag
|
||||
rec->pin = ((uint8_t*)db_value->mv_data)[0];
|
||||
// pending flag
|
||||
rec->pending = ((uint8_t*)db_value->mv_data)[1];
|
||||
// hash
|
||||
if (rec->hash != NULL) {
|
||||
free(rec->hash);
|
||||
rec->hash = NULL;
|
||||
rec->hash_size = 0;
|
||||
}
|
||||
rec->hash_size = db_value->mv_size - 2;
|
||||
rec->hash = malloc(rec->hash_size);
|
||||
uint8_t *val = (uint8_t*)db_value->mv_data;
|
||||
memcpy(rec->hash, &val[2], rec->hash_size);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/***
|
||||
* Write a journal record
|
||||
* @param mbd_txn the transaction
|
||||
|
@ -42,71 +118,92 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journa
|
|||
struct MDB_val db_key;
|
||||
struct MDB_val db_value;
|
||||
|
||||
// build the record, which is a timestamp as a key
|
||||
uint8_t time_varint[8];
|
||||
size_t time_varint_size = 0;
|
||||
varint_encode(journal_record->timestamp, &time_varint[0], 8, &time_varint_size);
|
||||
|
||||
size_t record_size = journal_record->hash_size + 2;
|
||||
uint8_t record[record_size];
|
||||
// Field 1: pin flag
|
||||
record[0] = journal_record->pin;
|
||||
// Field 2: pending flag
|
||||
record[1] = journal_record->pending;
|
||||
// field 3: hash
|
||||
memcpy(&record[2], journal_record->hash, journal_record->hash_size);
|
||||
|
||||
// open the journal table
|
||||
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||
if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to generate key value pair for journal_add.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
// write the record
|
||||
db_key.mv_size = time_varint_size;
|
||||
db_key.mv_data = time_varint;
|
||||
|
||||
db_value.mv_size = record_size;
|
||||
db_value.mv_data = record;
|
||||
// open the journal table
|
||||
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n");
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/***
|
||||
* Attempt to get a specific record identified by its timestamp and bytes
|
||||
* @param handle a handle to the database engine
|
||||
* @param journalstore_cursor the cursor (will be returned as a cursor that points to the record found)
|
||||
* @param journalstore_record where to put the results (can pass null). If data is within the struct, will use it as search criteria
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record)
|
||||
{
|
||||
if (journalstore_cursor == NULL || journalstore_cursor->transaction == NULL) {
|
||||
libp2p_logger_error("lmdb_journalstore", "get_record: journalstore cursor not initialized properly.\n");
|
||||
return 0;
|
||||
}
|
||||
if (journalstore_cursor->cursor == NULL) {
|
||||
if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
// search for the timestamp
|
||||
if (!lmdb_journalstore_cursor_get(journalstore_cursor, CURSOR_FIRST, journalstore_record)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to find any records in table.\n");
|
||||
return 0;
|
||||
}
|
||||
// now look for the hash key
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a cursor to the journalstore table
|
||||
* @param datastore the data connection
|
||||
* @param crsr a reference to the cursor. In this implementation, it is an lmdb_trans_cursor struct
|
||||
* @returns true(1) on success, false(0) otherwises
|
||||
* @param db_handle a handle to the database (an MDB_env pointer)
|
||||
* @param cursor where to place the results
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
|
||||
if (datastore->handle != NULL) {
|
||||
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
||||
MDB_dbi mdb_dbi;
|
||||
if (*crsr == NULL ) {
|
||||
*crsr = malloc(sizeof(struct lmdb_trans_cursor));
|
||||
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr;
|
||||
int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) {
|
||||
if (handle != NULL) {
|
||||
MDB_env* mdb_env = (MDB_env*)handle;
|
||||
struct lmdb_trans_cursor *cursor = *crsr;
|
||||
if (cursor == NULL ) {
|
||||
cursor = lmdb_trans_cursor_new();
|
||||
if (cursor == NULL)
|
||||
return 0;
|
||||
*crsr = cursor;
|
||||
}
|
||||
if (cursor->transaction == NULL) {
|
||||
// open transaction
|
||||
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to start a transaction.\n");
|
||||
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n");
|
||||
return 0;
|
||||
}
|
||||
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
|
||||
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open the dbi to the journalstore");
|
||||
mdb_txn_commit(mdb_txn);
|
||||
}
|
||||
if (cursor->database == NULL) {
|
||||
if (mdb_dbi_open(cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, cursor->database) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open the dbi to the journalstore");
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (cursor->cursor == NULL) {
|
||||
// open cursor
|
||||
if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) {
|
||||
mdb_txn_commit(mdb_txn);
|
||||
if (mdb_cursor_open(cursor->transaction, *cursor->database, &cursor->cursor) != 0) {
|
||||
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open cursor.\n");
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
} else {
|
||||
libp2p_logger_error("lmdb_journalstore", "Attempted to open a new cursor but there is something already there.\n");
|
||||
}
|
||||
} else {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor on null db handle.\n");
|
||||
|
@ -115,12 +212,43 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
|
|||
|
||||
}
|
||||
|
||||
int lmdb_journalstore_composite_key_compare(struct JournalRecord *a, struct JournalRecord *b) {
|
||||
if (a == NULL && b == NULL)
|
||||
return 0;
|
||||
if (a == NULL && b != NULL)
|
||||
return 1;
|
||||
if (a != NULL && b == NULL)
|
||||
return -1;
|
||||
if (a->timestamp != b->timestamp) {
|
||||
if (a->timestamp > b->timestamp)
|
||||
return -1;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
if (a->hash_size != b->hash_size) {
|
||||
if (a->hash_size > b->hash_size)
|
||||
return -1;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
for(int i = 0; i < a->hash_size; i++) {
|
||||
if (a->hash[i] != b->hash[i]) {
|
||||
return b->hash[i] - a->hash[i];
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a record from the cursor
|
||||
* Read a record from the cursor. If (record) contains a key, it will look for the exact record.
|
||||
* If not, it will return the first matching record.
|
||||
* @param crsr the lmdb_trans_cursor
|
||||
* @param op the cursor operation (i.e. CURSOR_FIRST, CURSOR_NEXT, CURSOR_LAST, CURSOR_PREVIOUS)
|
||||
* @param record the record (will allocate a new one if *record is NULL)
|
||||
* @returns true(1) if something was found, false(0) otherwise)
|
||||
*/
|
||||
int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum DatastoreCursorOp op, struct JournalRecord** record) {
|
||||
if (crsr != NULL) {
|
||||
struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)crsr;
|
||||
int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *tc, enum DatastoreCursorOp op, struct JournalRecord** record) {
|
||||
if (tc != NULL) {
|
||||
MDB_val mdb_key;
|
||||
MDB_val mdb_value;
|
||||
MDB_cursor_op co = MDB_FIRST;
|
||||
|
@ -134,41 +262,86 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum D
|
|||
else if (op == CURSOR_PREVIOUS)
|
||||
co = MDB_PREV;
|
||||
|
||||
if (*record != NULL) {
|
||||
lmdb_journalstore_generate_key(*record, &mdb_key);
|
||||
}
|
||||
|
||||
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) {
|
||||
return 0;
|
||||
}
|
||||
// build the JournalRecord object
|
||||
*record = (struct JournalRecord*) malloc(sizeof(struct JournalRecord));
|
||||
struct JournalRecord *rec = *record;
|
||||
// timestamp
|
||||
size_t varint_size = 0;
|
||||
rec->timestamp = varint_decode(mdb_key.mv_data, mdb_key.mv_size, &varint_size);
|
||||
// pin flag
|
||||
rec->pin = ((uint8_t*)mdb_value.mv_data)[0];
|
||||
// pending flag
|
||||
rec->pending = ((uint8_t*)mdb_value.mv_data)[1];
|
||||
// hash
|
||||
rec->hash_size = mdb_value.mv_size - 2;
|
||||
rec->hash = malloc(rec->hash_size);
|
||||
uint8_t *val = (uint8_t*)mdb_value.mv_data;
|
||||
memcpy(rec->hash, &val[2], rec->hash_size);
|
||||
return 1;
|
||||
|
||||
// see if the passed in record has a specific record in mind (take care of duplicate keys)
|
||||
if ( (*record)->hash_size > 0) {
|
||||
struct JournalRecord* curr_record = NULL;
|
||||
if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, &curr_record)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to convert journalstore record into a JournalRecord struct.\n");
|
||||
return 0;
|
||||
}
|
||||
// we are looking for a specific record. Flip through the records looking for the exact record
|
||||
while (lmdb_journalstore_composite_key_compare(*record, curr_record) != 0) {
|
||||
if ( (*record)->timestamp != curr_record->timestamp) {
|
||||
//we've exhausted all records for this timestamp. Exit.
|
||||
lmdb_journal_record_free(curr_record);
|
||||
curr_record = NULL;
|
||||
break;
|
||||
}
|
||||
// we did not find the exact record. Skip to the next one
|
||||
lmdb_journal_record_free(curr_record);
|
||||
curr_record = NULL;
|
||||
mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, MDB_NEXT);
|
||||
if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, &curr_record)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to convert journalstore record into a JournalRecord struct.\n");
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
if (curr_record != NULL) {
|
||||
// we found the exact record. merge it into the *record
|
||||
(*record)->pending = curr_record->pending;
|
||||
(*record)->pin = curr_record->pin;
|
||||
lmdb_journal_record_free(curr_record);
|
||||
return 1;
|
||||
}
|
||||
} else {
|
||||
// we're not looking for any particular record. Return the first one found.
|
||||
return lmdb_journalstore_build_record(&mdb_key, &mdb_value, record);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/***
|
||||
* Write the record at the cursor
|
||||
* @param crsr the cursor
|
||||
* @param journal_record the record to write
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalRecord* journal_record) {
|
||||
struct MDB_cursor* cursor = crsr->cursor;
|
||||
struct MDB_val db_key;
|
||||
struct MDB_val db_value;
|
||||
|
||||
if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) {
|
||||
libp2p_logger_error("lmdb_journalstore", "Unable to create journalstore record.\n");
|
||||
return 0;
|
||||
}
|
||||
if (mdb_cursor_put(cursor, &db_key, &db_value, 0) == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the cursor
|
||||
* @param crsr a lmdb_trans_cursor pointer
|
||||
*/
|
||||
int repo_journalstore_cursor_close(struct Datastore* datastore, void* crsr) {
|
||||
if (crsr != NULL) {
|
||||
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr;
|
||||
if (cursor->cursor != NULL) {
|
||||
mdb_cursor_close(cursor->cursor);
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
free(cursor);
|
||||
return 1;
|
||||
}
|
||||
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) {
|
||||
if (cursor->cursor != NULL) {
|
||||
mdb_cursor_close(cursor->cursor);
|
||||
mdb_txn_commit(cursor->transaction);
|
||||
free(cursor);
|
||||
return 1;
|
||||
} else {
|
||||
free(cursor);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -84,8 +84,8 @@ int test_datastore_list_journal() {
|
|||
return 0;
|
||||
}
|
||||
// open cursor
|
||||
void* crsr;
|
||||
if (!repo_journalstore_cursor_open(fs_repo->config->datastore, &crsr)) {
|
||||
struct lmdb_trans_cursor *crsr = NULL;
|
||||
if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->handle, &crsr)) {
|
||||
ipfs_repo_fsrepo_free(fs_repo);
|
||||
return 0;
|
||||
}
|
||||
|
@ -93,7 +93,7 @@ int test_datastore_list_journal() {
|
|||
struct JournalRecord* record = NULL;
|
||||
enum DatastoreCursorOp op = CURSOR_FIRST;
|
||||
do {
|
||||
if (repo_journalstore_cursor_get(fs_repo->config->datastore, crsr, op, &record) == 0) {
|
||||
if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) {
|
||||
lmdb_journal_record_free(record);
|
||||
record = NULL;
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue