diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h index aa3bc8f..056c725 100644 --- a/include/ipfs/repo/fsrepo/journalstore.h +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -8,6 +8,7 @@ #include "lmdb.h" #include "libp2p/db/datastore.h" +#include "ipfs/repo/fsrepo/lmdb_cursor.h" struct JournalRecord { unsigned long long timestamp; // the timestamp of the file @@ -23,19 +24,40 @@ int lmdb_journal_record_free(struct JournalRecord* rec); /** * Open a cursor to the journalstore table + * @param db_handle a handle to the database (an MDB_env pointer) + * @param cursor where to place the results + * @returns true(1) on success, false(0) otherwise */ -int repo_journalstore_cursor_open(struct Datastore* datastore, void** cursor); +int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor); /** * Read a record from the cursor */ -int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum DatastoreCursorOp op, struct JournalRecord** record); +int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *cursor, enum DatastoreCursorOp op, struct JournalRecord** record); + +/*** + * Write the record at the cursor + * @param crsr the cursor + * @param journal_record the record to write + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalRecord* journal_record); /** * Close the cursor */ -int repo_journalstore_cursor_close(struct Datastore* datastore, void* cursor); +int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor); int journal_record_free(struct JournalRecord* rec); int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record); + +/*** + * Attempt to get a specific record identified by its timestamp and bytes + * @param handle a handle to the database engine + * @param journalstore_cursor the cursor (will be returned as a cursor that points to the record found + * @param journalstore_record where to put the results (can pass null). If data is within the struct, will use it as search criteria + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record); + diff --git a/include/ipfs/repo/fsrepo/lmdb_cursor.h b/include/ipfs/repo/fsrepo/lmdb_cursor.h new file mode 100644 index 0000000..fe6ba92 --- /dev/null +++ b/include/ipfs/repo/fsrepo/lmdb_cursor.h @@ -0,0 +1,22 @@ +#pragma once + +#include "lmdb.h" + +struct lmdb_trans_cursor { + MDB_txn* transaction; + MDB_dbi* database; + MDB_cursor* cursor; +}; + +/** + * Create a new lmdb_trans_cursor struct + * @returns a newly allocated trans_cursor struct + */ +struct lmdb_trans_cursor* lmdb_trans_cursor_new(); + +/*** + * Clean up resources from a lmdb_trans_cursor struct + * @param in the cursor + * @returns true(1) + */ +int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in); diff --git a/include/ipfs/repo/fsrepo/lmdb_datastore.h b/include/ipfs/repo/fsrepo/lmdb_datastore.h index 88d31c0..b174172 100644 --- a/include/ipfs/repo/fsrepo/lmdb_datastore.h +++ b/include/ipfs/repo/fsrepo/lmdb_datastore.h @@ -3,11 +3,6 @@ #include "lmdb.h" #include "libp2p/db/datastore.h" -struct lmdb_trans_cursor { - MDB_txn* transaction; - MDB_cursor* cursor; -}; - /*** * Places the LMDB methods into the datastore's function pointers * @param datastore the datastore to fill diff --git a/journal/journal.c b/journal/journal.c index c7547db..33452f1 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -60,16 +60,16 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { struct Libp2pVector* vector = libp2p_utils_vector_new(1); if (vector != NULL) { - void* cursor = NULL; - if (!repo_journalstore_cursor_open(database, &cursor)) { + struct lmdb_trans_cursor *cursor = NULL; + if (!lmdb_journalstore_cursor_open(database->handle, &cursor)) { libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n"); return NULL; } struct JournalRecord* rec = NULL; - if (!repo_journalstore_cursor_get(database, cursor, CURSOR_LAST, &rec)) { + if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) { libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n"); libp2p_utils_vector_free(vector); - repo_journalstore_cursor_close(database, cursor); + lmdb_journalstore_cursor_close(cursor); return NULL; } // we've got one, now start the loop @@ -77,13 +77,13 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { do { libp2p_logger_debug("journal", "Adding record to the vector.\n"); libp2p_utils_vector_add(vector, rec); - if (!repo_journalstore_cursor_get(database, cursor, CURSOR_PREVIOUS, &rec)) { + if (!lmdb_journalstore_cursor_get(cursor, CURSOR_PREVIOUS, &rec)) { break; } i++; } while(i < n); libp2p_logger_debug("journal", "Closing journalstore cursor.\n"); - repo_journalstore_cursor_close(database, cursor); + lmdb_journalstore_cursor_close(cursor); } else { libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n"); } diff --git a/repo/fsrepo/Makefile b/repo/fsrepo/Makefile index 988a77e..a09390d 100644 --- a/repo/fsrepo/Makefile +++ b/repo/fsrepo/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o +OBJS = fs_repo.o jsmn.o lmdb_datastore.o lmdb_journalstore.o lmdb_cursor.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/repo/fsrepo/lmdb_cursor.c b/repo/fsrepo/lmdb_cursor.c new file mode 100644 index 0000000..f45be2a --- /dev/null +++ b/repo/fsrepo/lmdb_cursor.c @@ -0,0 +1,27 @@ +#include + +#include "ipfs/repo/fsrepo/lmdb_cursor.h" + +/** + * Create a new lmdb_trans_cursor struct + * @returns a newly allocated trans_cursor struct + */ +struct lmdb_trans_cursor* lmdb_trans_cursor_new() { + struct lmdb_trans_cursor* out = (struct lmdb_trans_cursor*) malloc(sizeof(struct lmdb_trans_cursor)); + if (out != NULL) { + out->cursor = NULL; + out->transaction = NULL; + out->database = NULL; + } + return out; +} + +/*** + * Clean up resources from a lmdb_trans_cursor struct + * @param in the cursor + * @returns true(1) + */ +int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) { + free(in); + return 1; +} diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index 21af022..990d0b9 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -150,6 +150,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha MDB_dbi mdb_dbi; struct MDB_val db_key; struct MDB_val db_value; + unsigned long long old_timestamp = 0; + struct DatastoreRecord *datastore_record = NULL; + struct JournalRecord *journalstore_record = NULL; + struct lmdb_trans_cursor *journalstore_cursor = NULL; MDB_env* mdb_env = (MDB_env*)datastore->handle; if (mdb_env == NULL) @@ -163,9 +167,42 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha if (retVal != 0) return 0; - // add the timestamp - unsigned long long timestamp = os_utils_gmtime(); - struct DatastoreRecord *datastore_record = libp2p_datastore_record_new(); + // check the datastore to see if it is already there. If it is there, use its timestamp if it is older. + repo_fsrepo_lmdb_get(key, key_size, &datastore_record, datastore); + if (datastore_record != NULL) { + // build the journalstore_record with the search criteria + journalstore_record = lmdb_journal_record_new(); + journalstore_record->hash_size = key_size; + journalstore_record->hash = malloc(key_size); + memcpy(journalstore_record->hash, key, key_size); + journalstore_record->timestamp = datastore_record->timestamp; + // look up the corresponding journalstore record for possible updating + journalstore_cursor = lmdb_trans_cursor_new(); + journalstore_cursor->transaction = mdb_txn; + lmdb_journalstore_get_record((void*)mdb_env, journalstore_cursor, &journalstore_record); + } else { // it wasn't previously in the database + datastore_record = libp2p_datastore_record_new(); + if (datastore_record == NULL) { + libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n"); + return 0; + } + } + + // Put in the timestamp if it isn't there already (or is newer) + unsigned long long now = os_utils_gmtime(); + if (datastore_record->timestamp == 0 || datastore_record->timestamp > now) { + //we need to update the timestamp. Be sure to update the journal too. (done further down) + old_timestamp = datastore_record->timestamp; + datastore_record->timestamp = now; + } + // fill in the other fields + datastore_record->key_size = key_size; + datastore_record->key = (uint8_t*)key; + datastore_record->value_size = data_size; + datastore_record->value = data; + + // convert it into a byte array + size_t record_size = 0; uint8_t *record; repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size); @@ -178,26 +215,35 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha db_value.mv_size = record_size; db_value.mv_data = record; - retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE); + retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA); + if (retVal == 0) { - // the normal case - // add it to the journalstore - struct JournalRecord* rec = lmdb_journal_record_new(); - rec->hash = (uint8_t*)key; - rec->hash_size = key_size; - rec->timestamp = timestamp; - rec->pending = 1; - rec->pin = 1; - lmdb_journalstore_journal_add(mdb_txn, rec); - lmdb_journal_record_free(rec); - retVal = 1; - } else { - if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip. + // added the datastore record, now work with the journalstore + if (journalstore_record != NULL) { + if (journalstore_record->timestamp != datastore_record->timestamp) { + // we need to update + journalstore_record->timestamp = datastore_record->timestamp; + lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record); + lmdb_journal_record_free(journalstore_record); + } + } else { + // add it to the journalstore + journalstore_record = lmdb_journal_record_new(); + journalstore_record->hash = (uint8_t*)key; + journalstore_record->hash_size = key_size; + journalstore_record->timestamp = datastore_record->timestamp; + journalstore_record->pending = 1; // TODO: Calculate this correctly + journalstore_record->pin = 1; + if (!lmdb_journalstore_journal_add(mdb_txn, journalstore_record)) { + libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n"); + } + lmdb_journal_record_free(journalstore_record); retVal = 1; - else { - libp2p_logger_error("lmdb_datastore", "mdb_put returned %d.\n", retVal); - retVal = 0; } + } else { + // datastore record was unable to be added. + libp2p_logger_error("lmdb_datastore", "mdb_put returned %d.\n", retVal); + retVal = 0; } // cleanup diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index a9844e7..826c207 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -29,6 +29,82 @@ int lmdb_journal_record_free(struct JournalRecord* rec) { return 1; } +int lmdb_journalstore_generate_key(const struct JournalRecord* journal_record, struct MDB_val *db_key) { + // build the key + uint8_t time_varint[8]; + size_t time_varint_size = 0; + varint_encode(journal_record->timestamp, &time_varint[0], 8, &time_varint_size); + + db_key->mv_size = time_varint_size; + db_key->mv_data = time_varint; + return 1; +} +/*** + * Convert the JournalRec struct into a lmdb key and lmdb value + * @param journal_record the record to convert + * @param db_key where to store the key information + * @param db_value where to store the value information + */ +int lmdb_journalstore_build_key_value_pair(const struct JournalRecord* journal_record, struct MDB_val* db_key, struct MDB_val *db_value) { + // build the record, which is a timestamp as a key + + // build the key + lmdb_journalstore_generate_key(journal_record, db_key); + + // build the value + size_t record_size = journal_record->hash_size + 2; + uint8_t record[record_size]; + // Field 1: pin flag + record[0] = journal_record->pin; + // Field 2: pending flag + record[1] = journal_record->pending; + // field 3: hash + memcpy(&record[2], journal_record->hash, journal_record->hash_size); + + db_value->mv_size = record_size; + db_value->mv_data = record; + + return 1; +} + +/*** + * Build a JournalRecord from a key/value pair from the db + * @param db_key the key + * @param db_value the value + * @param journal_record where to store the results + * @reutrns true(1) on success, false(0) on error + */ +int lmdb_journalstore_build_record(const struct MDB_val* db_key, const struct MDB_val *db_value, struct JournalRecord **journal_record) { + if (*journal_record == NULL) { + *journal_record = lmdb_journal_record_new(); + if (*journal_record == NULL) { + libp2p_logger_error("lmdb_journalstore", "build_record: Unable to allocate memory for new journal record.\n"); + return 0; + } + } + + struct JournalRecord *rec = *journal_record; + // timestamp + size_t varint_size = 0; + rec->timestamp = varint_decode(db_key->mv_data, db_key->mv_size, &varint_size); + // pin flag + rec->pin = ((uint8_t*)db_value->mv_data)[0]; + // pending flag + rec->pending = ((uint8_t*)db_value->mv_data)[1]; + // hash + if (rec->hash != NULL) { + free(rec->hash); + rec->hash = NULL; + rec->hash_size = 0; + } + rec->hash_size = db_value->mv_size - 2; + rec->hash = malloc(rec->hash_size); + uint8_t *val = (uint8_t*)db_value->mv_data; + memcpy(rec->hash, &val[2], rec->hash_size); + + return 1; +} + /*** * Write a journal record * @param mbd_txn the transaction @@ -42,71 +118,92 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journa struct MDB_val db_key; struct MDB_val db_value; - // build the record, which is a timestamp as a key - uint8_t time_varint[8]; - size_t time_varint_size = 0; - varint_encode(journal_record->timestamp, &time_varint[0], 8, &time_varint_size); - - size_t record_size = journal_record->hash_size + 2; - uint8_t record[record_size]; - // Field 1: pin flag - record[0] = journal_record->pin; - // Field 2: pending flag - record[1] = journal_record->pending; - // field 3: hash - memcpy(&record[2], journal_record->hash, journal_record->hash_size); - - // open the journal table - if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) { + libp2p_logger_error("lmdb_journalstore", "Unable to generate key value pair for journal_add.\n"); return 0; } - // write the record - db_key.mv_size = time_varint_size; - db_key.mv_data = time_varint; - - db_value.mv_size = record_size; - db_value.mv_data = record; + // open the journal table + if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); + return 0; + } if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n"); return 0; } return 1; } +/*** + * Attempt to get a specific record identified by its timestamp and bytes + * @param handle a handle to the database engine + * @param journalstore_cursor the cursor (will be returned as a cursor that points to the record found) + * @param journalstore_record where to put the results (can pass null). If data is within the struct, will use it as search criteria + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record) +{ + if (journalstore_cursor == NULL || journalstore_cursor->transaction == NULL) { + libp2p_logger_error("lmdb_journalstore", "get_record: journalstore cursor not initialized properly.\n"); + return 0; + } + if (journalstore_cursor->cursor == NULL) { + if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) { + libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n"); + return 0; + } + } + // search for the timestamp + if (!lmdb_journalstore_cursor_get(journalstore_cursor, CURSOR_FIRST, journalstore_record)) { + libp2p_logger_error("lmdb_journalstore", "Unable to find any records in table.\n"); + return 0; + } + // now look for the hash key + + return 0; +} + /** * Open a cursor to the journalstore table - * @param datastore the data connection - * @param crsr a reference to the cursor. In this implementation, it is an lmdb_trans_cursor struct - * @returns true(1) on success, false(0) otherwises + * @param db_handle a handle to the database (an MDB_env pointer) + * @param cursor where to place the results + * @returns true(1) on success, false(0) otherwise */ -int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) { - if (datastore->handle != NULL) { - MDB_env* mdb_env = (MDB_env*)datastore->handle; - MDB_dbi mdb_dbi; - if (*crsr == NULL ) { - *crsr = malloc(sizeof(struct lmdb_trans_cursor)); - struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr; +int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) { + if (handle != NULL) { + MDB_env* mdb_env = (MDB_env*)handle; + struct lmdb_trans_cursor *cursor = *crsr; + if (cursor == NULL ) { + cursor = lmdb_trans_cursor_new(); + if (cursor == NULL) + return 0; + *crsr = cursor; + } + if (cursor->transaction == NULL) { // open transaction if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) { - libp2p_logger_error("lmdb_journalstore", "Unable to start a transaction.\n"); + libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n"); return 0; } - MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; - if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { - libp2p_logger_error("lmdb_journalstore", "Unable to open the dbi to the journalstore"); - mdb_txn_commit(mdb_txn); + } + if (cursor->database == NULL) { + if (mdb_dbi_open(cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, cursor->database) != 0) { + libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open the dbi to the journalstore"); + mdb_txn_commit(cursor->transaction); return 0; } + } + if (cursor->cursor == NULL) { // open cursor - if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) { - mdb_txn_commit(mdb_txn); + if (mdb_cursor_open(cursor->transaction, *cursor->database, &cursor->cursor) != 0) { + libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open cursor.\n"); + mdb_txn_commit(cursor->transaction); return 0; } return 1; - } else { - libp2p_logger_error("lmdb_journalstore", "Attempted to open a new cursor but there is something already there.\n"); } } else { libp2p_logger_error("lmdb_journalstore", "Unable to open cursor on null db handle.\n"); @@ -115,12 +212,43 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) { } +int lmdb_journalstore_composite_key_compare(struct JournalRecord *a, struct JournalRecord *b) { + if (a == NULL && b == NULL) + return 0; + if (a == NULL && b != NULL) + return 1; + if (a != NULL && b == NULL) + return -1; + if (a->timestamp != b->timestamp) { + if (a->timestamp > b->timestamp) + return -1; + else + return 1; + } + if (a->hash_size != b->hash_size) { + if (a->hash_size > b->hash_size) + return -1; + else + return 1; + } + for(int i = 0; i < a->hash_size; i++) { + if (a->hash[i] != b->hash[i]) { + return b->hash[i] - a->hash[i]; + } + } + return 0; +} + /** - * Read a record from the cursor + * Read a record from the cursor. If (record) contains a key, it will look for the exact record. + * If not, it will return the first matching record. + * @param crsr the lmdb_trans_cursor + * @param op the cursor operation (i.e. CURSOR_FIRST, CURSOR_NEXT, CURSOR_LAST, CURSOR_PREVIOUS) + * @param record the record (will allocate a new one if *record is NULL) + * @returns true(1) if something was found, false(0) otherwise) */ -int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum DatastoreCursorOp op, struct JournalRecord** record) { - if (crsr != NULL) { - struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)crsr; +int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *tc, enum DatastoreCursorOp op, struct JournalRecord** record) { + if (tc != NULL) { MDB_val mdb_key; MDB_val mdb_value; MDB_cursor_op co = MDB_FIRST; @@ -134,41 +262,86 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* crsr, enum D else if (op == CURSOR_PREVIOUS) co = MDB_PREV; + if (*record != NULL) { + lmdb_journalstore_generate_key(*record, &mdb_key); + } + if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { return 0; } - // build the JournalRecord object - *record = (struct JournalRecord*) malloc(sizeof(struct JournalRecord)); - struct JournalRecord *rec = *record; - // timestamp - size_t varint_size = 0; - rec->timestamp = varint_decode(mdb_key.mv_data, mdb_key.mv_size, &varint_size); - // pin flag - rec->pin = ((uint8_t*)mdb_value.mv_data)[0]; - // pending flag - rec->pending = ((uint8_t*)mdb_value.mv_data)[1]; - // hash - rec->hash_size = mdb_value.mv_size - 2; - rec->hash = malloc(rec->hash_size); - uint8_t *val = (uint8_t*)mdb_value.mv_data; - memcpy(rec->hash, &val[2], rec->hash_size); - return 1; + + // see if the passed in record has a specific record in mind (take care of duplicate keys) + if ( (*record)->hash_size > 0) { + struct JournalRecord* curr_record = NULL; + if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, &curr_record)) { + libp2p_logger_error("lmdb_journalstore", "Unable to convert journalstore record into a JournalRecord struct.\n"); + return 0; + } + // we are looking for a specific record. Flip through the records looking for the exact record + while (lmdb_journalstore_composite_key_compare(*record, curr_record) != 0) { + if ( (*record)->timestamp != curr_record->timestamp) { + //we've exhausted all records for this timestamp. Exit. + lmdb_journal_record_free(curr_record); + curr_record = NULL; + break; + } + // we did not find the exact record. Skip to the next one + lmdb_journal_record_free(curr_record); + curr_record = NULL; + mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, MDB_NEXT); + if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, &curr_record)) { + libp2p_logger_error("lmdb_journalstore", "Unable to convert journalstore record into a JournalRecord struct.\n"); + return 0; + } + } + if (curr_record != NULL) { + // we found the exact record. merge it into the *record + (*record)->pending = curr_record->pending; + (*record)->pin = curr_record->pin; + lmdb_journal_record_free(curr_record); + return 1; + } + } else { + // we're not looking for any particular record. Return the first one found. + return lmdb_journalstore_build_record(&mdb_key, &mdb_value, record); + } } return 0; } +/*** + * Write the record at the cursor + * @param crsr the cursor + * @param journal_record the record to write + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalRecord* journal_record) { + struct MDB_cursor* cursor = crsr->cursor; + struct MDB_val db_key; + struct MDB_val db_value; + + if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) { + libp2p_logger_error("lmdb_journalstore", "Unable to create journalstore record.\n"); + return 0; + } + if (mdb_cursor_put(cursor, &db_key, &db_value, 0) == 0) { + return 0; + } + + return 1; +} + /** * Close the cursor + * @param crsr a lmdb_trans_cursor pointer */ -int repo_journalstore_cursor_close(struct Datastore* datastore, void* crsr) { - if (crsr != NULL) { - struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)crsr; - if (cursor->cursor != NULL) { - mdb_cursor_close(cursor->cursor); - mdb_txn_commit(cursor->transaction); - free(cursor); - return 1; - } +int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) { + if (cursor->cursor != NULL) { + mdb_cursor_close(cursor->cursor); + mdb_txn_commit(cursor->transaction); + free(cursor); + return 1; + } else { free(cursor); } return 0; diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index 2307096..e591be5 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -84,8 +84,8 @@ int test_datastore_list_journal() { return 0; } // open cursor - void* crsr; - if (!repo_journalstore_cursor_open(fs_repo->config->datastore, &crsr)) { + struct lmdb_trans_cursor *crsr = NULL; + if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->handle, &crsr)) { ipfs_repo_fsrepo_free(fs_repo); return 0; } @@ -93,7 +93,7 @@ int test_datastore_list_journal() { struct JournalRecord* record = NULL; enum DatastoreCursorOp op = CURSOR_FIRST; do { - if (repo_journalstore_cursor_get(fs_repo->config->datastore, crsr, op, &record) == 0) { + if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) { lmdb_journal_record_free(record); record = NULL; }