2017-08-30 16:10:14 +00:00
|
|
|
#include <string.h>
|
|
|
|
|
|
|
|
#include "varint.h"
|
|
|
|
#include "lmdb.h"
|
|
|
|
#include "libp2p/utils/logger.h"
|
2017-08-31 11:41:54 +00:00
|
|
|
#include "libp2p/crypto/encoding/base58.h"
|
2017-08-21 19:49:21 +00:00
|
|
|
#include "ipfs/repo/fsrepo/journalstore.h"
|
|
|
|
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
|
|
|
|
|
2017-09-04 18:33:56 +00:00
|
|
|
struct JournalRecord* lmdb_journal_record_new() {
|
|
|
|
struct JournalRecord* rec = (struct JournalRecord*) malloc(sizeof(struct JournalRecord));
|
|
|
|
if (rec != NULL) {
|
|
|
|
rec->hash = NULL;
|
|
|
|
rec->hash_size = 0;
|
|
|
|
rec->pending = 0;
|
|
|
|
rec->pin = 0;
|
|
|
|
rec->timestamp = 0;
|
|
|
|
}
|
|
|
|
return rec;
|
|
|
|
}
|
2017-08-21 19:49:21 +00:00
|
|
|
|
2017-09-04 18:33:56 +00:00
|
|
|
int lmdb_journal_record_free(struct JournalRecord* rec) {
|
2017-08-21 19:49:21 +00:00
|
|
|
if (rec != NULL) {
|
|
|
|
if (rec->hash != NULL)
|
|
|
|
free(rec->hash);
|
|
|
|
rec->hash = NULL;
|
|
|
|
free(rec);
|
|
|
|
}
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
2017-08-24 15:08:27 +00:00
|
|
|
/***
|
|
|
|
* Write a journal record
|
|
|
|
* @param mbd_txn the transaction
|
|
|
|
* @param timestamp the timestamp
|
|
|
|
* @param hash the hash
|
|
|
|
* @param hash_size the size of the hash
|
|
|
|
* @returns true(1) on success, false(0) otherwise
|
|
|
|
*/
|
2017-09-04 18:33:56 +00:00
|
|
|
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journal_record) {
|
2017-08-24 15:08:27 +00:00
|
|
|
MDB_dbi mdb_dbi;
|
|
|
|
struct MDB_val db_key;
|
|
|
|
struct MDB_val db_value;
|
|
|
|
|
2017-09-04 18:33:56 +00:00
|
|
|
// build the record, which is a timestamp as a key
|
2017-08-24 15:08:27 +00:00
|
|
|
uint8_t time_varint[8];
|
|
|
|
size_t time_varint_size = 0;
|
2017-09-04 18:33:56 +00:00
|
|
|
varint_encode(journal_record->timestamp, &time_varint[0], 8, &time_varint_size);
|
2017-08-24 15:08:27 +00:00
|
|
|
|
2017-09-04 18:33:56 +00:00
|
|
|
size_t record_size = journal_record->hash_size + 2;
|
2017-08-24 15:08:27 +00:00
|
|
|
uint8_t record[record_size];
|
2017-09-04 18:33:56 +00:00
|
|
|
// Field 1: pin flag
|
|
|
|
record[0] = journal_record->pin;
|
|
|
|
// Field 2: pending flag
|
|
|
|
record[1] = journal_record->pending;
|
|
|
|
// field 3: hash
|
|
|
|
memcpy(&record[2], journal_record->hash, journal_record->hash_size);
|
2017-08-31 11:41:54 +00:00
|
|
|
|
2017-08-24 15:08:27 +00:00
|
|
|
// open the journal table
|
2017-08-30 16:10:14 +00:00
|
|
|
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
2017-08-24 15:08:27 +00:00
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
// write the record
|
|
|
|
db_key.mv_size = time_varint_size;
|
|
|
|
db_key.mv_data = time_varint;
|
|
|
|
|
|
|
|
db_value.mv_size = record_size;
|
|
|
|
db_value.mv_data = record;
|
|
|
|
|
|
|
|
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) {
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
|
2017-08-21 19:49:21 +00:00
|
|
|
/**
|
|
|
|
* Open a cursor to the journalstore table
|
|
|
|
* @param datastore the data connection
|
|
|
|
* @param crsr a reference to the cursor. In this implementation, it is an lmdb_trans_cursor struct
|
|
|
|
* @returns true(1) on success, false(0) otherwises
|
|
|
|
*/
|
|
|
|
int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
|
|
|
|
if (datastore->handle != NULL) {
|
|
|
|
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
|
|
|
MDB_dbi mdb_dbi;
|
|
|
|
if (*crsr == NULL ) {
|
|
|
|
*crsr = malloc(sizeof(struct lmdb_trans_cursor));
|
|
|
|
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr;
|
|
|
|
// open transaction
|
2017-08-30 16:10:14 +00:00
|
|
|
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) {
|
|
|
|
libp2p_logger_error("lmdb_journalstore", "Unable to start a transaction.\n");
|
2017-08-21 19:49:21 +00:00
|
|
|
return 0;
|
2017-08-30 16:10:14 +00:00
|
|
|
}
|
2017-08-21 19:49:21 +00:00
|
|
|
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
|
2017-08-30 16:10:14 +00:00
|
|
|
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
|
|
|
libp2p_logger_error("lmdb_journalstore", "Unable to open the dbi to the journalstore");
|
2017-08-21 19:49:21 +00:00
|
|
|
mdb_txn_commit(mdb_txn);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
// open cursor
|
|
|
|
if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) {
|
|
|
|
mdb_txn_commit(mdb_txn);
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
return 1;
|
2017-08-30 16:10:14 +00:00
|
|
|
} else {
|
|
|
|
libp2p_logger_error("lmdb_journalstore", "Attempted to open a new cursor but there is something already there.\n");
|
2017-08-21 19:49:21 +00:00
|
|
|
}
|
2017-08-30 16:10:14 +00:00
|
|
|
} else {
|
|
|
|
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor on null db handle.\n");
|
2017-08-21 19:49:21 +00:00
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Read a record from the cursor
|
|
|
|
*/
|
|
|
|
int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum DatastoreCursorOp op, struct JournalRecord** record) {
|
|
|
|
if (crsr != NULL) {
|
|
|
|
struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)crsr;
|
|
|
|
MDB_val mdb_key;
|
|
|
|
MDB_val mdb_value;
|
|
|
|
MDB_cursor_op co = MDB_FIRST;
|
2017-08-24 18:30:44 +00:00
|
|
|
|
2017-08-21 19:49:21 +00:00
|
|
|
if (op == CURSOR_FIRST)
|
|
|
|
co = MDB_FIRST;
|
|
|
|
else if (op == CURSOR_NEXT)
|
|
|
|
co = MDB_NEXT;
|
2017-08-24 18:30:44 +00:00
|
|
|
else if (op == CURSOR_LAST)
|
|
|
|
co = MDB_LAST;
|
|
|
|
else if (op == CURSOR_PREVIOUS)
|
|
|
|
co = MDB_PREV;
|
|
|
|
|
2017-08-21 19:49:21 +00:00
|
|
|
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) {
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
// build the JournalRecord object
|
|
|
|
*record = (struct JournalRecord*) malloc(sizeof(struct JournalRecord));
|
|
|
|
struct JournalRecord *rec = *record;
|
|
|
|
// timestamp
|
|
|
|
size_t varint_size = 0;
|
|
|
|
rec->timestamp = varint_decode(mdb_key.mv_data, mdb_key.mv_size, &varint_size);
|
|
|
|
// pin flag
|
|
|
|
rec->pin = ((uint8_t*)mdb_value.mv_data)[0];
|
2017-09-04 18:33:56 +00:00
|
|
|
// pending flag
|
|
|
|
rec->pending = ((uint8_t*)mdb_value.mv_data)[1];
|
|
|
|
// hash
|
|
|
|
rec->hash_size = mdb_value.mv_size - 2;
|
2017-08-21 19:49:21 +00:00
|
|
|
rec->hash = malloc(rec->hash_size);
|
2017-08-31 21:41:10 +00:00
|
|
|
uint8_t *val = (uint8_t*)mdb_value.mv_data;
|
2017-09-04 18:33:56 +00:00
|
|
|
memcpy(rec->hash, &val[2], rec->hash_size);
|
2017-08-21 19:49:21 +00:00
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Close the cursor
|
|
|
|
*/
|
2017-08-24 18:30:44 +00:00
|
|
|
int repo_journalstore_cursor_close(struct Datastore* datastore, void* crsr) {
|
2017-08-21 19:49:21 +00:00
|
|
|
if (crsr != NULL) {
|
|
|
|
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr;
|
|
|
|
if (cursor->cursor != NULL) {
|
|
|
|
mdb_cursor_close(cursor->cursor);
|
|
|
|
mdb_txn_commit(cursor->transaction);
|
|
|
|
free(cursor);
|
|
|
|
return 1;
|
|
|
|
}
|
|
|
|
free(cursor);
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
}
|