Added journaling when a file is saved - beginning of backup scheme
This commit is contained in:
parent
5b242a2d08
commit
d13e4b4318
11 changed files with 346 additions and 45 deletions
|
@ -238,9 +238,6 @@ int ipfs_blockstore_put_unixfs(const struct UnixFS* unix_fs, const struct FSRepo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// send to Put with key (this is now done separately)
|
|
||||||
//fs_repo->config->datastore->datastore_put(key, key_length, block->data, block->data_length, fs_repo->config->datastore);
|
|
||||||
|
|
||||||
free(key);
|
free(key);
|
||||||
free(filename);
|
free(filename);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
33
include/ipfs/repo/fsrepo/journalstore.h
Normal file
33
include/ipfs/repo/fsrepo/journalstore.h
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
#pragma once
|
||||||
|
/**
|
||||||
|
* Piggyback on the datastore to access the journal entries
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
#include "libp2p/db/datastore.h"
|
||||||
|
|
||||||
|
struct JournalRecord {
|
||||||
|
unsigned long long timestamp;
|
||||||
|
int pin;
|
||||||
|
uint8_t *hash;
|
||||||
|
size_t hash_size;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a cursor to the journalstore table
|
||||||
|
*/
|
||||||
|
int repo_journalstore_cursor_open(struct Datastore* datastore, void** cursor);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read a record from the cursor
|
||||||
|
*/
|
||||||
|
int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum DatastoreCursorOp op, struct JournalRecord** record);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the cursor
|
||||||
|
*/
|
||||||
|
int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor);
|
||||||
|
|
||||||
|
int journal_record_free(struct JournalRecord* rec);
|
|
@ -1,8 +1,16 @@
|
||||||
#ifndef __FS_REPO_LMDB_DATASTORE_H__
|
#pragma once
|
||||||
#define __FS_REPO_LMDB_DATASTORE_H__
|
|
||||||
|
|
||||||
|
#include "lmdb.h"
|
||||||
#include "libp2p/db/datastore.h"
|
#include "libp2p/db/datastore.h"
|
||||||
|
|
||||||
|
static const char* DATASTORE_DB = "DATASTORE";
|
||||||
|
static const char* JOURNAL_DB = "JOURNAL";
|
||||||
|
|
||||||
|
struct lmdb_trans_cursor {
|
||||||
|
MDB_txn* transaction;
|
||||||
|
MDB_cursor* cursor;
|
||||||
|
};
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* Places the LMDB methods into the datastore's function pointers
|
* Places the LMDB methods into the datastore's function pointers
|
||||||
* @param datastore the datastore to fill
|
* @param datastore the datastore to fill
|
||||||
|
@ -33,5 +41,3 @@ int repo_fsrepo_lmdb_close(struct Datastore* datastore);
|
||||||
* @returns true(1) on success
|
* @returns true(1) on success
|
||||||
*/
|
*/
|
||||||
int repo_fsrepo_lmdb_create_directory(struct Datastore* datastore);
|
int repo_fsrepo_lmdb_create_directory(struct Datastore* datastore);
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ OBJS = main.o \
|
||||||
../namesys/*.o \
|
../namesys/*.o \
|
||||||
../pin/pin.o \
|
../pin/pin.o \
|
||||||
../repo/init.o \
|
../repo/init.o \
|
||||||
../repo/fsrepo/fs_repo.o ../repo/fsrepo/jsmn.o ../repo/fsrepo/lmdb_datastore.o \
|
../repo/fsrepo/*.o \
|
||||||
../repo/config/*.o \
|
../repo/config/*.o \
|
||||||
../routing/*.o \
|
../routing/*.o \
|
||||||
../thirdparty/ipfsaddr/ipfs_addr.o \
|
../thirdparty/ipfsaddr/ipfs_addr.o \
|
||||||
|
|
|
@ -108,6 +108,7 @@ int main(int argc, char** argv) {
|
||||||
libp2p_logger_add_class("peerstore");
|
libp2p_logger_add_class("peerstore");
|
||||||
libp2p_logger_add_class("dht_protocol");
|
libp2p_logger_add_class("dht_protocol");
|
||||||
libp2p_logger_add_class("peer");
|
libp2p_logger_add_class("peer");
|
||||||
|
libp2p_logger_add_class("lmdb_datastore");
|
||||||
|
|
||||||
strip_quotes(argc, argv);
|
strip_quotes(argc, argv);
|
||||||
int retVal = parse_arguments(argc, argv);
|
int retVal = parse_arguments(argc, argv);
|
||||||
|
|
|
@ -7,7 +7,7 @@ endif
|
||||||
|
|
||||||
LFLAGS =
|
LFLAGS =
|
||||||
DEPS =
|
DEPS =
|
||||||
OBJS = fs_repo.o jsmn.o lmdb_datastore.o
|
OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o
|
||||||
|
|
||||||
%.o: %.c $(DEPS)
|
%.o: %.c $(DEPS)
|
||||||
$(CC) -c -o $@ $< $(CFLAGS)
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
|
@ -8,14 +8,55 @@
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
#include "lmdb.h"
|
#include "lmdb.h"
|
||||||
|
#include "libp2p/utils/logger.h"
|
||||||
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
|
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
|
||||||
|
#include "varint.h"
|
||||||
|
|
||||||
struct lmdb_trans_cursor {
|
/**
|
||||||
MDB_txn* transaction;
|
* Build a "value" section for a datastore record
|
||||||
MDB_cursor* cursor;
|
* @param timestamp the timestamp
|
||||||
};
|
* @param data the data (usually a base32 of the cid hash)
|
||||||
|
* @param data_length the length of data
|
||||||
|
* @param result the resultant data object
|
||||||
|
* @param result_size the size of the result
|
||||||
|
* @returns true(1) on success, otherwise 0
|
||||||
|
*/
|
||||||
|
int repo_fsrepo_lmdb_build_record(const unsigned long long timestamp, const uint8_t *data, size_t data_length, uint8_t **result, size_t *result_size) {
|
||||||
|
// turn timestamp into varint
|
||||||
|
uint8_t ts_varint[8];
|
||||||
|
size_t num_bytes;
|
||||||
|
if (varint_encode(timestamp, &ts_varint[0], 8, &num_bytes) == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// make new structure
|
||||||
|
*result = (uint8_t *) malloc(num_bytes + data_length);
|
||||||
|
if (*result == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
memcpy(*result, ts_varint, num_bytes);
|
||||||
|
memcpy(&(*result)[num_bytes], data, data_length);
|
||||||
|
*result_size = data_length + num_bytes;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* read a "value" section from a datastore record.
|
||||||
|
* @param data what we read from the datastore
|
||||||
|
* @param data_length the length of what we read from the datastore
|
||||||
|
* @param timestamp the timestamp that was read from the datastore
|
||||||
|
* @param record_pos where the data starts (without the timestamp)
|
||||||
|
* @param record_size the size of the data section of the record
|
||||||
|
* @returns true(1) on success, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int repo_fsrepo_lmdb_parse_record(uint8_t *data, size_t data_length, unsigned long long *timestamp, uint8_t *record_pos, size_t *record_size) {
|
||||||
|
size_t varint_size = 0;
|
||||||
|
*timestamp = varint_decode(data, data_length, &varint_size);
|
||||||
|
record_pos = &data[varint_size];
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* retrieve a record from the database and put in a pre-sized buffer
|
* retrieve a record from the database and put in a pre-sized buffer
|
||||||
|
@ -28,7 +69,6 @@ struct lmdb_trans_cursor {
|
||||||
* @returns true(1) on success
|
* @returns true(1) on success
|
||||||
*/
|
*/
|
||||||
int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, size_t max_data_size, size_t* data_size, const struct Datastore* datastore) {
|
int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, size_t max_data_size, size_t* data_size, const struct Datastore* datastore) {
|
||||||
int retVal;
|
|
||||||
MDB_txn* mdb_txn;
|
MDB_txn* mdb_txn;
|
||||||
MDB_dbi mdb_dbi;
|
MDB_dbi mdb_dbi;
|
||||||
struct MDB_val db_key;
|
struct MDB_val db_key;
|
||||||
|
@ -39,11 +79,10 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data,
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// open transaction
|
// open transaction
|
||||||
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
|
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0)
|
||||||
if (retVal != 0)
|
|
||||||
return 0;
|
return 0;
|
||||||
retVal = mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi);
|
|
||||||
if (retVal != 0) {
|
if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -52,33 +91,93 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data,
|
||||||
db_key.mv_size = key_size;
|
db_key.mv_size = key_size;
|
||||||
db_key.mv_data = (char*)key;
|
db_key.mv_data = (char*)key;
|
||||||
|
|
||||||
//printf("Looking for data that has a key size of %lu that starts with %02x and ends with %02x\n", db_key.mv_size, ((char*)db_key.mv_data)[0], ((char*)db_key.mv_data)[db_key.mv_size - 1]);
|
if (mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value) != 0) {
|
||||||
|
|
||||||
retVal = mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value);
|
|
||||||
if (retVal != 0) {
|
|
||||||
//mdb_dbi_close(mdb_env, mdb_dbi);
|
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// now copy the data
|
// the data from the database includes a timestamp. We'll need to strip it off.
|
||||||
if (db_value.mv_size > max_data_size) {
|
unsigned long long timestamp;
|
||||||
//mdb_dbi_close(mdb_env, mdb_dbi);
|
uint8_t *pos = NULL;
|
||||||
|
size_t size = 0;
|
||||||
|
if (!repo_fsrepo_lmdb_parse_record(db_key.mv_data, db_key.mv_size, ×tamp, pos, &size)) {
|
||||||
|
mdb_txn_commit(mdb_txn);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Was it too big to fit in the buffer they sent?
|
||||||
|
if (size > max_data_size) {
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set return values
|
// set return values
|
||||||
memcpy(data, db_value.mv_data, db_value.mv_size);
|
memcpy(data, pos, size);
|
||||||
(*data_size) = db_value.mv_size;
|
(*data_size) = size;
|
||||||
|
|
||||||
// clean up
|
// clean up
|
||||||
//mdb_dbi_close(mdb_env, mdb_dbi);
|
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Write a journal record
|
||||||
|
* @param mbd_txn the transaction
|
||||||
|
* @param timestamp the timestamp
|
||||||
|
* @param hash the hash
|
||||||
|
* @param hash_size the size of the hash
|
||||||
|
* @returns true(1) on success, false(0) otherwise
|
||||||
|
*/
|
||||||
|
int repo_fsrepo_lmdb_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) {
|
||||||
|
MDB_dbi mdb_dbi;
|
||||||
|
struct MDB_val db_key;
|
||||||
|
struct MDB_val db_value;
|
||||||
|
|
||||||
|
libp2p_logger_debug("lmdb_datastore", "journal add timestamp: %llu.\n", timestamp);
|
||||||
|
|
||||||
|
// build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value
|
||||||
|
uint8_t time_varint[8];
|
||||||
|
size_t time_varint_size = 0;
|
||||||
|
varint_encode(timestamp, &time_varint[0], 8, &time_varint_size);
|
||||||
|
|
||||||
|
libp2p_logger_debug("lmdb_datastore", "journal add varint size: %lu.\n", (unsigned long)time_varint_size);
|
||||||
|
|
||||||
|
size_t record_size = hash_size + 1;
|
||||||
|
uint8_t record[record_size];
|
||||||
|
record[0] = 1;
|
||||||
|
memcpy(&record[1], hash, hash_size);
|
||||||
|
|
||||||
|
// open the journal table
|
||||||
|
|
||||||
|
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// write the record
|
||||||
|
db_key.mv_size = time_varint_size;
|
||||||
|
db_key.mv_data = time_varint;
|
||||||
|
|
||||||
|
db_value.mv_size = record_size;
|
||||||
|
db_value.mv_data = record;
|
||||||
|
|
||||||
|
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the current time UTC
|
||||||
|
* @returns number of seconds since epoch in UTC
|
||||||
|
*/
|
||||||
|
unsigned long long lmdb_datastore_gmt_time() {
|
||||||
|
time_t local = time(NULL);
|
||||||
|
struct tm *gmt = gmtime(&local);
|
||||||
|
return (unsigned long long)mktime(gmt);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Write data to the datastore with the specified key
|
* Write data to the datastore with the specified key
|
||||||
* @param key the key
|
* @param key the key
|
||||||
|
@ -103,21 +202,30 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
|
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
|
||||||
if (retVal != 0)
|
if (retVal != 0)
|
||||||
return 0;
|
return 0;
|
||||||
retVal = mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi);
|
retVal = mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi);
|
||||||
if (retVal != 0)
|
if (retVal != 0)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
// add the timestamp
|
||||||
|
unsigned long long timestamp = lmdb_datastore_gmt_time();
|
||||||
|
uint8_t *record;
|
||||||
|
size_t record_size;
|
||||||
|
repo_fsrepo_lmdb_build_record(timestamp, data, data_size, &record, &record_size);
|
||||||
|
|
||||||
// prepare data
|
// prepare data
|
||||||
db_key.mv_size = key_size;
|
db_key.mv_size = key_size;
|
||||||
db_key.mv_data = (char*)key;
|
db_key.mv_data = (char*)key;
|
||||||
|
|
||||||
// write
|
// write
|
||||||
db_value.mv_size = data_size;
|
db_value.mv_size = record_size;
|
||||||
db_value.mv_data = data;
|
db_value.mv_data = record;
|
||||||
|
|
||||||
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE);
|
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE);
|
||||||
if (retVal == 0) // the normal case
|
if (retVal == 0) {
|
||||||
|
// the normal case
|
||||||
|
repo_fsrepo_lmdb_journal_add(mdb_txn, timestamp, key, key_size);
|
||||||
retVal = 1;
|
retVal = 1;
|
||||||
else {
|
} else {
|
||||||
if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip.
|
if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip.
|
||||||
retVal = 1;
|
retVal = 1;
|
||||||
else
|
else
|
||||||
|
@ -125,7 +233,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanup
|
// cleanup
|
||||||
//mdb_dbi_close(mdb_env, mdb_dbi);
|
free(record);
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
@ -139,15 +247,20 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
|
||||||
int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
|
int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
|
||||||
// create environment
|
// create environment
|
||||||
struct MDB_env* mdb_env;
|
struct MDB_env* mdb_env;
|
||||||
int retVal = mdb_env_create(&mdb_env);
|
if (mdb_env_create(&mdb_env) < 0) {
|
||||||
if (retVal < 0) {
|
mdb_env_close(mdb_env);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// at most, 2 databases will be opened. The datastore and the journal.
|
||||||
|
MDB_dbi dbs = 2;
|
||||||
|
if (mdb_env_set_maxdbs(mdb_env, dbs) != 0) {
|
||||||
mdb_env_close(mdb_env);
|
mdb_env_close(mdb_env);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// open the environment
|
// open the environment
|
||||||
retVal = mdb_env_open(mdb_env, datastore->path, 0, S_IRWXU);
|
if (mdb_env_open(mdb_env, datastore->path, 0, S_IRWXU) < 0) {
|
||||||
if (retVal < 0) {
|
|
||||||
mdb_env_close(mdb_env);
|
mdb_env_close(mdb_env);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -169,6 +282,11 @@ int repo_fsrepo_lmdb_close(struct Datastore* datastore) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Create a new cursor on the datastore database
|
||||||
|
* @param datastore the place to store the cursor
|
||||||
|
* @returns true(1) on success, false(0) otherwise
|
||||||
|
*/
|
||||||
int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
|
int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
|
||||||
if (datastore->handle != NULL) {
|
if (datastore->handle != NULL) {
|
||||||
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
||||||
|
@ -180,7 +298,7 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
|
||||||
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
|
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
|
||||||
return 0;
|
return 0;
|
||||||
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
|
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
|
||||||
if (mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi) != 0) {
|
if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||||
mdb_txn_commit(mdb_txn);
|
mdb_txn_commit(mdb_txn);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -195,6 +313,16 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Get a record using a cursor
|
||||||
|
* @param key the key from the record
|
||||||
|
* @param key_length the length of the key
|
||||||
|
* @param value the value of the record
|
||||||
|
* @param value_length the length of the value
|
||||||
|
* @param CURSOR_FIRST or CURSOR_NEXT
|
||||||
|
* @param datastore holds the reference to the opened cursor
|
||||||
|
* @returns true(1) on success, false(0) otherwise
|
||||||
|
*/
|
||||||
int repo_fsrepo_lmdb_cursor_get(unsigned char** key, int* key_length,
|
int repo_fsrepo_lmdb_cursor_get(unsigned char** key, int* key_length,
|
||||||
unsigned char** value, int* value_length,
|
unsigned char** value, int* value_length,
|
||||||
enum DatastoreCursorOp op, struct Datastore* datastore)
|
enum DatastoreCursorOp op, struct Datastore* datastore)
|
||||||
|
|
98
repo/fsrepo/lmdb_journalstore.c
Normal file
98
repo/fsrepo/lmdb_journalstore.c
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
#include "ipfs/repo/fsrepo/journalstore.h"
|
||||||
|
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
|
||||||
|
|
||||||
|
#include "lmdb.h"
|
||||||
|
#include "varint.h"
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
int journal_record_free(struct JournalRecord* rec) {
|
||||||
|
if (rec != NULL) {
|
||||||
|
if (rec->hash != NULL)
|
||||||
|
free(rec->hash);
|
||||||
|
rec->hash = NULL;
|
||||||
|
free(rec);
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Open a cursor to the journalstore table
|
||||||
|
* @param datastore the data connection
|
||||||
|
* @param crsr a reference to the cursor. In this implementation, it is an lmdb_trans_cursor struct
|
||||||
|
* @returns true(1) on success, false(0) otherwises
|
||||||
|
*/
|
||||||
|
int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) {
|
||||||
|
if (datastore->handle != NULL) {
|
||||||
|
MDB_env* mdb_env = (MDB_env*)datastore->handle;
|
||||||
|
MDB_dbi mdb_dbi;
|
||||||
|
if (*crsr == NULL ) {
|
||||||
|
*crsr = malloc(sizeof(struct lmdb_trans_cursor));
|
||||||
|
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr;
|
||||||
|
// open transaction
|
||||||
|
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
|
||||||
|
return 0;
|
||||||
|
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
|
||||||
|
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
|
||||||
|
mdb_txn_commit(mdb_txn);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// open cursor
|
||||||
|
if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) {
|
||||||
|
mdb_txn_commit(mdb_txn);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read a record from the cursor
|
||||||
|
*/
|
||||||
|
int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum DatastoreCursorOp op, struct JournalRecord** record) {
|
||||||
|
if (crsr != NULL) {
|
||||||
|
struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)crsr;
|
||||||
|
MDB_val mdb_key;
|
||||||
|
MDB_val mdb_value;
|
||||||
|
MDB_cursor_op co = MDB_FIRST;
|
||||||
|
if (op == CURSOR_FIRST)
|
||||||
|
co = MDB_FIRST;
|
||||||
|
else if (op == CURSOR_NEXT)
|
||||||
|
co = MDB_NEXT;
|
||||||
|
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// build the JournalRecord object
|
||||||
|
*record = (struct JournalRecord*) malloc(sizeof(struct JournalRecord));
|
||||||
|
struct JournalRecord *rec = *record;
|
||||||
|
// timestamp
|
||||||
|
size_t varint_size = 0;
|
||||||
|
rec->timestamp = varint_decode(mdb_key.mv_data, mdb_key.mv_size, &varint_size);
|
||||||
|
// pin flag
|
||||||
|
rec->pin = ((uint8_t*)mdb_value.mv_data)[0];
|
||||||
|
rec->hash_size = mdb_value.mv_size - 1;
|
||||||
|
rec->hash = malloc(rec->hash_size);
|
||||||
|
memcpy(rec->hash, &mdb_value.mv_data[1], rec->hash_size);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close the cursor
|
||||||
|
*/
|
||||||
|
int repo_cournalstore_cursor_close(struct Datastore* datastore, void* crsr) {
|
||||||
|
if (crsr != NULL) {
|
||||||
|
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr;
|
||||||
|
if (cursor->cursor != NULL) {
|
||||||
|
mdb_cursor_close(cursor->cursor);
|
||||||
|
mdb_txn_commit(cursor->transaction);
|
||||||
|
free(cursor);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
free(cursor);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -20,7 +20,7 @@ OBJS = testit.o test_helper.o \
|
||||||
../merkledag/merkledag.o ../merkledag/node.o \
|
../merkledag/merkledag.o ../merkledag/node.o \
|
||||||
../multibase/multibase.o \
|
../multibase/multibase.o \
|
||||||
../repo/init.o \
|
../repo/init.o \
|
||||||
../repo/fsrepo/fs_repo.o ../repo/fsrepo/jsmn.o ../repo/fsrepo/lmdb_datastore.o \
|
../repo/fsrepo/*.o \
|
||||||
../repo/config/*.o \
|
../repo/config/*.o \
|
||||||
../routing/offline.o \
|
../routing/offline.o \
|
||||||
../routing/online.o \
|
../routing/online.o \
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
#include "ipfs/blocks/block.h"
|
#include "ipfs/blocks/block.h"
|
||||||
#include "ipfs/repo/config/config.h"
|
#include "ipfs/repo/config/config.h"
|
||||||
#include "ipfs/repo/fsrepo/fs_repo.h"
|
#include "ipfs/repo/fsrepo/fs_repo.h"
|
||||||
|
#include "ipfs/repo/fsrepo/journalstore.h"
|
||||||
|
|
||||||
#include "../test_helper.h"
|
#include "../test_helper.h"
|
||||||
|
|
||||||
|
@ -61,13 +62,48 @@ int test_ipfs_datastore_put() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// save the block
|
|
||||||
|
|
||||||
// check the results
|
|
||||||
|
|
||||||
// clean up
|
// clean up
|
||||||
ipfs_repo_fsrepo_free(fs_repo);
|
ipfs_repo_fsrepo_free(fs_repo);
|
||||||
ipfs_block_free(block);
|
ipfs_block_free(block);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List what is in the journal
|
||||||
|
*/
|
||||||
|
int test_datastore_list_journal() {
|
||||||
|
libp2p_logger_add_class("test_datastore");
|
||||||
|
libp2p_logger_add_class("lmdb_datastore");
|
||||||
|
// open database
|
||||||
|
struct FSRepo* fs_repo;
|
||||||
|
if (ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (ipfs_repo_fsrepo_open(fs_repo) == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// open cursor
|
||||||
|
void* crsr;
|
||||||
|
if (!repo_journalstore_cursor_open(fs_repo->config->datastore, &crsr)) {
|
||||||
|
ipfs_repo_fsrepo_free(fs_repo);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
// grab records
|
||||||
|
struct JournalRecord* record = NULL;
|
||||||
|
enum DatastoreCursorOp op = CURSOR_FIRST;
|
||||||
|
do {
|
||||||
|
if (repo_journalstore_cursor_get(fs_repo->config->datastore, crsr, op, &record) == 0) {
|
||||||
|
journal_record_free(record);
|
||||||
|
record = NULL;
|
||||||
|
}
|
||||||
|
// display record
|
||||||
|
libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp);
|
||||||
|
libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N");
|
||||||
|
// free record
|
||||||
|
journal_record_free(record);
|
||||||
|
record = NULL;
|
||||||
|
op = CURSOR_NEXT;
|
||||||
|
} while (record != NULL);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
|
@ -45,6 +45,7 @@ const char* names[] = {
|
||||||
"test_cid_cast_non_multihash",
|
"test_cid_cast_non_multihash",
|
||||||
"test_cid_protobuf_encode_decode",
|
"test_cid_protobuf_encode_decode",
|
||||||
"test_daemon_startup_shutdown",
|
"test_daemon_startup_shutdown",
|
||||||
|
"test_datastore_list_journal",
|
||||||
"test_repo_config_new",
|
"test_repo_config_new",
|
||||||
"test_repo_config_init",
|
"test_repo_config_init",
|
||||||
"test_repo_config_write",
|
"test_repo_config_write",
|
||||||
|
@ -101,6 +102,7 @@ int (*funcs[])(void) = {
|
||||||
test_cid_cast_non_multihash,
|
test_cid_cast_non_multihash,
|
||||||
test_cid_protobuf_encode_decode,
|
test_cid_protobuf_encode_decode,
|
||||||
test_daemon_startup_shutdown,
|
test_daemon_startup_shutdown,
|
||||||
|
test_datastore_list_journal,
|
||||||
test_repo_config_new,
|
test_repo_config_new,
|
||||||
test_repo_config_init,
|
test_repo_config_init,
|
||||||
test_repo_config_write,
|
test_repo_config_write,
|
||||||
|
|
Loading…
Reference in a new issue