This commit is contained in:
jmjatlanta 2017-09-13 05:03:13 -05:00
commit 478fa403fd
18 changed files with 552 additions and 260 deletions

View file

@ -1,6 +1,8 @@
/** /**
* Methods for lightweight/specific HTTP for API communication. * Methods for lightweight/specific HTTP for API communication.
*/ */
#define _GNU_SOURCE
#define __USE_GNU
#include <pthread.h> #include <pthread.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -179,6 +181,94 @@ int read_all(int fd, struct s_request *req, char *already, size_t alread_size)
return 1; return 1;
} }
/**
* Find a token in a string array.
* @param string array and token string.
* @returns the pointer after where the token was found or NULL if it fails.
*/
char *str_tok(char *str, char *tok)
{
char *p = strstr(str, tok);
if (p) {
p += strlen(tok);
while(*p == ' ') p++;
}
return p;
}
/**
* Find a token in a binary array.
* @param array, size of array, token and size of token.
* @returns the pointer after where the token was found or NULL if it fails.
*/
char *bin_tok(char *bin, size_t limit, char *tok, size_t tok_size)
{
char *p = memmem(bin, limit, tok, tok_size);
if (p) {
p += tok_size;
}
return p;
}
/**
* Check if header contain a especific value.
* @param request structure, header name and value to check.
* @returns the pointer where the value was found or NULL if it fails.
*/
char *header_value_cmp(struct s_request *req, char *header, char *value)
{
char *p = str_tok(req->buf + req->header, header);
if (p) {
if (strstart(p, value)) {
return p;
}
}
return NULL;
}
/**
* Lookup for boundary at buffer string.
* @param body buffer string, boundary id, filename and content-type string.
* @returns the pointer where the multipart start.
*/
char *boundary_find(char *str, char *boundary, char **filename, char **contenttype)
{
char *p = str_tok(str, "--");
while (p) {
if (strstart(p, boundary)) {
// skip to the beginning, ignoring the header for now, if there is.
// TODO: return filename and content-type
p = strstr(p, "\r\n\r\n");
if (p) {
return p + 4; // ignore 4 bytes CRLF 2x
}
break;
}
p = str_tok(str, "--");
}
return NULL;
}
/**
* Return the size of boundary.
* @param boundary buffer, boundary id.
* @returns the size of boundary or 0 if fails.
*/
size_t boundary_size(char *str, char *boundary, size_t limit)
{
char *p = bin_tok(str, limit, "\r\n--", 4);
while (p) {
if (strstart(p, boundary)) {
if (cstrstart(p + strlen(boundary), "--\r\n")) {
p -= 4;
return (size_t)(p - str);
}
}
p = bin_tok(p, limit, "\r\n--", 4);
}
return 0;
}
/** /**
* Pthread to take care of each client connection. * Pthread to take care of each client connection.
* @param ptr is the connection index in api_list, integer not pointer, cast required. * @param ptr is the connection index in api_list, integer not pointer, cast required.
@ -256,14 +346,9 @@ void *api_connection_thread (void *ptr)
req.body = req.size; req.body = req.size;
req.body_size = 0; req.body_size = 0;
p = strstr(req.buf + req.header, "Transfer-Encoding:"); if (header_value_cmp(&req, "Transfer-Encoding:", "chunked")) {
if (p) {
p += strlen("Transfer-Encoding:");
while(*p == ' ') p++;
if (cstrstart(p, "chunked\r\n") || strcmp(p, "chunked")==0) {
read_func = read_chunked; read_func = read_chunked;
} }
}
if (!read_func(s, &req, body, r - (body - buf))) { if (!read_func(s, &req, body, r - (body - buf))) {
libp2p_logger_error("api", "fail read_func.\n"); libp2p_logger_error("api", "fail read_func.\n");
@ -271,7 +356,49 @@ void *api_connection_thread (void *ptr)
goto quit; goto quit;
} }
p = strstr(req.buf + req.header, "Accept-Encoding:"); if (strcmp(req.buf + req.method, "GET")==0) {
// just an error message, because it's not used yet.
// TODO: implement gateway requests and GUI (javascript) for API.
write_dual (s, req.buf + req.http_ver, strchr (HTTP_404, ' '));
} else if (cstrstart(buf, "POST ")) {
// TODO: Handle gzip/json POST requests.
p = header_value_cmp(&req, "Content-Type:", "multipart/form-data;");
if (p) {
p = str_tok(p, "boundary=");
if (p) {
char *boundary, *l;
int len;
if (*p == '"') {
p++;
l = strchr(p, '"');
} else {
l = p;
while (*l != '\r' && *l != '\0') l++;
}
len = l - p;
boundary = malloc (len+1);
if (boundary) {
memcpy(boundary, p, len);
boundary[len] = '\0';
p = boundary_find(req.buf + req.body, boundary, NULL, NULL);
if (p) {
req.boundary_size = boundary_size(p, boundary, req.size - (p - buf));
if (req.boundary_size > 0) {
req.boundary = p - req.buf;
}
}
free (boundary);
}
}
}
// TODO: Parse the path var and decide what to do with the received data.
if (req.boundary > 0) {
libp2p_logger_error("api", "boundary index = %d, size = %d\n", req.boundary, req.boundary_size);
}
libp2p_logger_error("api", "method = '%s'\n" libp2p_logger_error("api", "method = '%s'\n"
"path = '%s'\n" "path = '%s'\n"
"http_ver = '%s'\n" "http_ver = '%s'\n"
@ -288,12 +415,6 @@ void *api_connection_thread (void *ptr)
"Transfer-Encoding: chunked\r\n\r\n", req.buf + req.http_ver); "Transfer-Encoding: chunked\r\n\r\n", req.buf + req.http_ver);
write_str (s, resp); write_str (s, resp);
libp2p_logger_error("api", "resp = {\n%s\n}\n", resp); libp2p_logger_error("api", "resp = {\n%s\n}\n", resp);
if (strcmp(req.buf + req.method, "GET")==0) {
// just an error message, because it's not used.
write_dual (s, req.buf + req.http_ver, strchr (HTTP_404, ' '));
//} else if (cstrstart(buf, "POST ")) {
// TODO: Handle chunked/gzip/form-data/json POST requests.
} }
} else { } else {
libp2p_logger_error("api", "fail looking for body.\n"); libp2p_logger_error("api", "fail looking for body.\n");

View file

@ -152,6 +152,9 @@ int ipfs_node_free(struct IpfsNode* node) {
if (node->mode == MODE_ONLINE) { if (node->mode == MODE_ONLINE) {
ipfs_routing_online_free(node->routing); ipfs_routing_online_free(node->routing);
} }
if (node->mode == MODE_OFFLINE) {
ipfs_routing_offline_free(node->routing);
}
if (node->blockstore != NULL) { if (node->blockstore != NULL) {
ipfs_blockstore_free(node->blockstore); ipfs_blockstore_free(node->blockstore);
} }

View file

@ -3,6 +3,7 @@
*/ */
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> // for sleep() #include <unistd.h> // for sleep()
#include "libp2p/os/utils.h"
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
#include "ipfs/core/ipfs_node.h" #include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h" #include "ipfs/exchange/exchange.h"
@ -13,9 +14,7 @@
#include "ipfs/exchange/bitswap/want_manager.h" #include "ipfs/exchange/bitswap/want_manager.h"
int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) { int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) {
if (incoming_size < 8) char* result = strnstr((char*)incoming, "/ipfs/bitswap", incoming_size);
return 0;
char* result = strstr((char*)incoming, "/ipfs/bitswap");
if(result == NULL || result != (char*)incoming) if(result == NULL || result != (char*)incoming)
return 0; return 0;
return 1; return 1;

View file

@ -34,6 +34,8 @@ struct s_request {
int header; int header;
int body; int body;
size_t body_size; size_t body_size;
int boundary;
size_t boundary_size;
}; };
#define HTTP_400 "HTTP/1.1 400 Bad Request\r\n" \ #define HTTP_400 "HTTP/1.1 400 Bad Request\r\n" \

View file

@ -28,7 +28,7 @@ int lmdb_journal_record_free(struct JournalRecord* rec);
* @param cursor where to place the results * @param cursor where to place the results
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor); int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor, struct MDB_txn *trans_to_use);
/** /**
* Read a record from the cursor * Read a record from the cursor
@ -46,11 +46,11 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR
/** /**
* Close the cursor * Close the cursor
*/ */
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor); int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction);
int journal_record_free(struct JournalRecord* rec); int journal_record_free(struct JournalRecord* rec);
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record); int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord *journalstore_record);
/*** /***
* Attempt to get a specific record identified by its timestamp and bytes * Attempt to get a specific record identified by its timestamp and bytes
@ -61,3 +61,11 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record
*/ */
int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record); int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record);
/***
* Convert the JournalRec struct into a lmdb key and lmdb value
* @param journal_record the record to convert
* @param db_key where to store the key information
* @param db_value where to store the value information
*/
int lmdb_journalstore_build_key_value_pair(const struct JournalRecord* journal_record, struct MDB_val* db_key, struct MDB_val *db_value);

View file

@ -2,7 +2,16 @@
#include "lmdb.h" #include "lmdb.h"
struct lmdb_context {
MDB_env *db_environment;
MDB_txn *current_transaction;
MDB_dbi *datastore_db;
MDB_dbi *journal_db;
};
struct lmdb_trans_cursor { struct lmdb_trans_cursor {
MDB_env* environment;
MDB_txn* parent_transaction;
MDB_txn* transaction; MDB_txn* transaction;
MDB_dbi* database; MDB_dbi* database;
MDB_cursor* cursor; MDB_cursor* cursor;

View file

@ -82,6 +82,7 @@ ipfs_routing* ipfs_routing_new_offline (struct IpfsNode* local_node, struct RsaP
// online using secio, should probably be deprecated // online using secio, should probably be deprecated
ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key); ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key);
int ipfs_routing_online_free(ipfs_routing*); int ipfs_routing_online_free(ipfs_routing*);
int ipfs_routing_offline_free(ipfs_routing* incoming);
// online using DHT/kademlia, the recommended router // online using DHT/kademlia, the recommended router
ipfs_routing* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key); ipfs_routing* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key);
// generic routines // generic routines

View file

@ -17,9 +17,10 @@
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
*/ */
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) { int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) {
if (incoming_size < 8) const char* protocol = "/ipfs/journalio/1.0.0";
if (incoming_size < 21)
return 0; return 0;
char* result = strstr((char*)incoming, "/ipfs/journalio/1.0.0"); char* result = strnstr((char*)incoming, protocol, incoming_size);
if(result == NULL || result != (char*)incoming) if(result == NULL || result != (char*)incoming)
return 0; return 0;
libp2p_logger_debug("journal", "Handling incoming message.\n"); libp2p_logger_debug("journal", "Handling incoming message.\n");
@ -61,7 +62,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
struct Libp2pVector* vector = libp2p_utils_vector_new(1); struct Libp2pVector* vector = libp2p_utils_vector_new(1);
if (vector != NULL) { if (vector != NULL) {
struct lmdb_trans_cursor *cursor = NULL; struct lmdb_trans_cursor *cursor = NULL;
if (!lmdb_journalstore_cursor_open(database->handle, &cursor)) { if (!lmdb_journalstore_cursor_open(database->datastore_context, &cursor, NULL)) {
libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n"); libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n");
return NULL; return NULL;
} }
@ -69,7 +70,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) { if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) {
libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n"); libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n");
libp2p_utils_vector_free(vector); libp2p_utils_vector_free(vector);
lmdb_journalstore_cursor_close(cursor); lmdb_journalstore_cursor_close(cursor, 1);
return NULL; return NULL;
} }
// we've got one, now start the loop // we've got one, now start the loop
@ -83,7 +84,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) {
i++; i++;
} while(i < n); } while(i < n);
libp2p_logger_debug("journal", "Closing journalstore cursor.\n"); libp2p_logger_debug("journal", "Closing journalstore cursor.\n");
lmdb_journalstore_cursor_close(cursor); lmdb_journalstore_cursor_close(cursor, 1);
} else { } else {
libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n"); libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n");
} }
@ -147,6 +148,7 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
if (journal_records == NULL || journal_records->total == 0) { if (journal_records == NULL || journal_records->total == 0) {
// nothing to do // nothing to do
libp2p_logger_debug("journal", "There are no journal records to process.\n"); libp2p_logger_debug("journal", "There are no journal records to process.\n");
replication_peer->lastConnect = os_utils_gmtime();
return 1; return 1;
} }
// build the message // build the message
@ -169,17 +171,11 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli
return 0; return 0;
} }
memcpy(entry->hash, rec->hash, entry->hash_size); memcpy(entry->hash, rec->hash, entry->hash_size);
// debugging
size_t b58size = 100;
uint8_t *b58key = (uint8_t*) malloc(b58size);
libp2p_crypto_encoding_base58_encode(entry->hash, entry->hash_size, &b58key, &b58size);
free(b58key);
libp2p_logger_debug("journal", "Adding hash %s to JournalMessage.\n", b58key);
libp2p_utils_vector_add(message->journal_entries, entry); libp2p_utils_vector_add(message->journal_entries, entry);
} }
// send the message // send the message
message->current_epoch = os_utils_gmtime(); message->current_epoch = os_utils_gmtime();
libp2p_logger_debug("journal", "Sending message to %s.\n", peer->id); libp2p_logger_debug("journal", "Sending message to %s.\n", libp2p_peer_id_to_string(peer));
int retVal = ipfs_journal_send_message(local_node, peer, message); int retVal = ipfs_journal_send_message(local_node, peer, message);
if (retVal) { if (retVal) {
replication_peer->lastConnect = message->current_epoch; replication_peer->lastConnect = message->current_epoch;
@ -308,6 +304,7 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
if (second_read) { if (second_read) {
free(incoming_pos); free(incoming_pos);
} }
ipfs_journal_message_free(message);
return -1; return -1;
} }
struct Libp2pVector* todo_vector = NULL; struct Libp2pVector* todo_vector = NULL;
@ -345,6 +342,8 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
} }
//TODO: set new values in their ReplicationPeer struct //TODO: set new values in their ReplicationPeer struct
ipfs_journal_message_free(message);
if (second_read) if (second_read)
free(incoming_pos); free(incoming_pos);
return 1; return 1;

View file

@ -502,6 +502,7 @@ int fs_repo_open_config(struct FSRepo* repo) {
continue; continue;
// make multiaddress a peer // make multiaddress a peer
struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur); struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur);
multiaddress_free(cur);
struct ReplicationPeer* rp = repo_config_replication_peer_new(); struct ReplicationPeer* rp = repo_config_replication_peer_new();
rp->peer = peer; rp->peer = peer;
libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer)); libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer));

View file

@ -9,7 +9,9 @@
struct lmdb_trans_cursor* lmdb_trans_cursor_new() { struct lmdb_trans_cursor* lmdb_trans_cursor_new() {
struct lmdb_trans_cursor* out = (struct lmdb_trans_cursor*) malloc(sizeof(struct lmdb_trans_cursor)); struct lmdb_trans_cursor* out = (struct lmdb_trans_cursor*) malloc(sizeof(struct lmdb_trans_cursor));
if (out != NULL) { if (out != NULL) {
out->environment = NULL;
out->cursor = NULL; out->cursor = NULL;
out->parent_transaction = NULL;
out->transaction = NULL; out->transaction = NULL;
out->database = NULL; out->database = NULL;
} }
@ -22,6 +24,8 @@ struct lmdb_trans_cursor* lmdb_trans_cursor_new() {
* @returns true(1) * @returns true(1)
*/ */
int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) { int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) {
if (in != NULL) {
free(in); free(in);
}
return 1; return 1;
} }

View file

@ -81,6 +81,44 @@ int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct Datastore
return 1; return 1;
} }
/***
* retrieve a record from the database and put in a pre-sized buffer
* using an already allocated transaction, and with an already opened
* database
* @param key the key to look for
* @param key_size the length of the key
* @param record where to put the results
* @param datastore where to look for the data
* @param mdb_txn the already opened db transaction
* @param datastore_table the reference to the already opened datastore table (database)
* @returns true(1) on success
*/
int repo_fsrepo_lmdb_get_with_transaction(const unsigned char* key, size_t key_size, struct DatastoreRecord** record, MDB_txn *mdb_txn, MDB_dbi *datastore_table) {
struct MDB_val db_key;
struct MDB_val db_value;
// check parameters passed in
if (mdb_txn == NULL || datastore_table == NULL) {
libp2p_logger_error("lmdb_datastore", "get_w_tx: invalid transaction or table reference.\n");
return 0;
}
// prepare data
db_key.mv_size = key_size;
db_key.mv_data = (char*)key;
if (mdb_get(mdb_txn, *datastore_table, &db_key, &db_value) != 0) {
return 0;
}
if (!repo_fsrepo_lmdb_build_record(&db_key, &db_value, record)) {
return 0;
}
return 1;
}
/*** /***
* retrieve a record from the database and put in a pre-sized buffer * retrieve a record from the database and put in a pre-sized buffer
* @param key the key to look for * @param key the key to look for
@ -91,52 +129,44 @@ int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct Datastore
*/ */
int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) { int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) {
MDB_txn* mdb_txn; MDB_txn* mdb_txn;
MDB_dbi mdb_dbi;
struct MDB_val db_key;
struct MDB_val db_value;
MDB_env* mdb_env = (MDB_env*)datastore->handle; if (datastore == NULL || datastore->datastore_context == NULL) {
if (mdb_env == NULL) libp2p_logger_error("lmdb_datastore", "get: datastore not initialized.\n");
return 0; return 0;
}
// debug struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context;
size_t b58size = 100; if (db_context->db_environment == NULL) {
uint8_t *b58key = (uint8_t *) malloc(b58size); libp2p_logger_error("lmdb_datastore", "get: datastore environment not initialized.\n");
libp2p_crypto_encoding_base58_encode(key, key_size, &b58key, &b58size); return 0;
libp2p_logger_debug("lmdb_datastore", "Looking for key %s in datastore.\n", b58key); }
free(b58key);
// open transaction // open transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &mdb_txn) != 0)
return 0; return 0;
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, db_context->datastore_db);
mdb_txn_commit(mdb_txn); mdb_txn_commit(mdb_txn);
return retVal;
}
/**
* Open the database and create a new transaction
* @param mdb_env the database handle
* @param mdb_dbi the table handle to be created
* @param mdb_txn the transaction to be created
* @returns true(1) on success, false(0) otherwise
*/
int lmdb_datastore_create_transaction(struct lmdb_context *db_context, MDB_txn **mdb_txn) {
// open transaction
if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, mdb_txn) != 0)
return 0; return 0;
}
// prepare data
db_key.mv_size = key_size;
db_key.mv_data = (char*)key;
if (mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value) != 0) {
mdb_txn_commit(mdb_txn);
return 0;
}
if (!repo_fsrepo_lmdb_build_record(&db_key, &db_value, record)) {
mdb_txn_commit(mdb_txn);
return 0;
}
// clean up
mdb_txn_commit(mdb_txn);
return 1; return 1;
} }
/** /**
* Write data to the datastore with the specified key * Write (or update) data in the datastore with the specified key
* @param key the key * @param key the key
* @param key_size the length of the key * @param key_size the length of the key
* @param data the data to be written * @param data the data to be written
@ -146,29 +176,38 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas
*/ */
int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) { int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) {
int retVal; int retVal;
MDB_txn* mdb_txn; struct MDB_txn *child_transaction;
MDB_dbi mdb_dbi; struct MDB_val datastore_key;
struct MDB_val db_key; struct MDB_val datastore_value;
struct MDB_val db_value;
unsigned long long old_timestamp = 0;
struct DatastoreRecord *datastore_record = NULL; struct DatastoreRecord *datastore_record = NULL;
struct JournalRecord *journalstore_record = NULL; struct JournalRecord *journalstore_record = NULL;
struct lmdb_trans_cursor *journalstore_cursor = NULL; struct lmdb_trans_cursor *journalstore_cursor = NULL;
MDB_env* mdb_env = (MDB_env*)datastore->handle; if (datastore == NULL || datastore->datastore_context == NULL)
if (mdb_env == NULL)
return 0; return 0;
// open transaction struct lmdb_context *db_context = (struct lmdb_context*)datastore->datastore_context;
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
if (retVal != 0)
return 0;
retVal = mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi);
if (retVal != 0)
return 0;
// check the datastore to see if it is already there. If it is there, use its timestamp if it is older. if (db_context->db_environment == NULL) {
repo_fsrepo_lmdb_get(key, key_size, &datastore_record, datastore); libp2p_logger_error("lmdb_datastore", "put: invalid datastore handle.\n");
return 0;
}
// open a transaction to the databases
if (!lmdb_datastore_create_transaction(db_context, &child_transaction)) {
libp2p_logger_error("lmdb_datastore", "put: Unable to create db transaction.\n");
return 0;
}
// build the journalstore connectivity stuff
lmdb_journalstore_cursor_open(datastore->datastore_context, &journalstore_cursor, child_transaction);
if (journalstore_cursor == NULL) {
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for journalstore cursor.\n");
return 0;
}
// see if what we want is already in the datastore
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, child_transaction, db_context->datastore_db);
if (datastore_record != NULL) { if (datastore_record != NULL) {
// build the journalstore_record with the search criteria // build the journalstore_record with the search criteria
journalstore_record = lmdb_journal_record_new(); journalstore_record = lmdb_journal_record_new();
@ -177,13 +216,13 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
memcpy(journalstore_record->hash, key, key_size); memcpy(journalstore_record->hash, key, key_size);
journalstore_record->timestamp = datastore_record->timestamp; journalstore_record->timestamp = datastore_record->timestamp;
// look up the corresponding journalstore record for possible updating // look up the corresponding journalstore record for possible updating
journalstore_cursor = lmdb_trans_cursor_new(); lmdb_journalstore_get_record(db_context, journalstore_cursor, &journalstore_record);
journalstore_cursor->transaction = mdb_txn;
lmdb_journalstore_get_record((void*)mdb_env, journalstore_cursor, &journalstore_record);
} else { // it wasn't previously in the database } else { // it wasn't previously in the database
datastore_record = libp2p_datastore_record_new(); datastore_record = libp2p_datastore_record_new();
if (datastore_record == NULL) { if (datastore_record == NULL) {
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n"); libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n");
lmdb_trans_cursor_free(journalstore_cursor);
mdb_txn_commit(child_transaction);
return 0; return 0;
} }
} }
@ -192,14 +231,16 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
unsigned long long now = os_utils_gmtime(); unsigned long long now = os_utils_gmtime();
if (datastore_record->timestamp == 0 || datastore_record->timestamp > now) { if (datastore_record->timestamp == 0 || datastore_record->timestamp > now) {
//we need to update the timestamp. Be sure to update the journal too. (done further down) //we need to update the timestamp. Be sure to update the journal too. (done further down)
old_timestamp = datastore_record->timestamp; //old_timestamp = datastore_record->timestamp;
datastore_record->timestamp = now; datastore_record->timestamp = now;
} }
// fill in the other fields // fill in the other fields
datastore_record->key_size = key_size; datastore_record->key_size = key_size;
datastore_record->key = (uint8_t*)key; datastore_record->key = (uint8_t*) malloc(key_size);
memcpy(datastore_record->key, key, key_size);
datastore_record->value_size = data_size; datastore_record->value_size = data_size;
datastore_record->value = data; datastore_record->value = (uint8_t *) malloc(data_size);
memcpy(datastore_record->value, data, data_size);
// convert it into a byte array // convert it into a byte array
@ -208,35 +249,38 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size); repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size);
// prepare data // prepare data
db_key.mv_size = key_size; datastore_key.mv_size = key_size;
db_key.mv_data = (char*)key; datastore_key.mv_data = (char*)key;
// write // write
db_value.mv_size = record_size; datastore_value.mv_size = record_size;
db_value.mv_data = record; datastore_value.mv_data = record;
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA); retVal = mdb_put(child_transaction, *db_context->datastore_db, &datastore_key, &datastore_value, MDB_NODUPDATA);
if (retVal == 0) { if (retVal == 0) {
// added the datastore record, now work with the journalstore // Successfully added the datastore record. Now work with the journalstore.
if (journalstore_record != NULL) { if (journalstore_record != NULL) {
if (journalstore_record->timestamp != datastore_record->timestamp) { if (journalstore_record->timestamp != datastore_record->timestamp) {
// we need to update // we need to update
journalstore_record->timestamp = datastore_record->timestamp; journalstore_record->timestamp = datastore_record->timestamp;
lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record); lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record);
lmdb_journalstore_cursor_close(journalstore_cursor, 0);
lmdb_journal_record_free(journalstore_record); lmdb_journal_record_free(journalstore_record);
} }
} else { } else {
// add it to the journalstore // add it to the journalstore
journalstore_record = lmdb_journal_record_new(); journalstore_record = lmdb_journal_record_new();
journalstore_record->hash = (uint8_t*)key; journalstore_record->hash = (uint8_t*) malloc(key_size);
memcpy(journalstore_record->hash, key, key_size);
journalstore_record->hash_size = key_size; journalstore_record->hash_size = key_size;
journalstore_record->timestamp = datastore_record->timestamp; journalstore_record->timestamp = datastore_record->timestamp;
journalstore_record->pending = 1; // TODO: Calculate this correctly journalstore_record->pending = 1; // TODO: Calculate this correctly
journalstore_record->pin = 1; journalstore_record->pin = 1;
if (!lmdb_journalstore_journal_add(mdb_txn, journalstore_record)) { if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) {
libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n"); libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n");
} }
lmdb_journalstore_cursor_close(journalstore_cursor, 0);
lmdb_journal_record_free(journalstore_record); lmdb_journal_record_free(journalstore_record);
retVal = 1; retVal = 1;
} }
@ -247,8 +291,11 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
} }
// cleanup // cleanup
if (mdb_txn_commit(child_transaction) != 0) {
libp2p_logger_error("lmdb_datastore", "lmdb_put: transaction commit failed.\n");
}
free(record); free(record);
mdb_txn_commit(mdb_txn); libp2p_datastore_record_free(datastore_record);
return retVal; return retVal;
} }
@ -257,6 +304,8 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
* Note: for now, the parameters are not used * Note: for now, the parameters are not used
* @param argc number of parameters in the following array * @param argc number of parameters in the following array
* @param argv an array of parameters * @param argv an array of parameters
* @param datastore the datastore struct
* @returns true(1) on success, false(0) otherwise
*/ */
int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
// create environment // create environment
@ -279,109 +328,56 @@ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) {
return 0; return 0;
} }
datastore->handle = (void*)mdb_env; struct lmdb_context *db_context = (struct lmdb_context *) malloc(sizeof(struct lmdb_context));
datastore->datastore_context = (void*) db_context;
db_context->db_environment = (void*)mdb_env;
db_context->datastore_db = (MDB_dbi*) malloc(sizeof(MDB_dbi));
db_context->journal_db = (MDB_dbi*) malloc(sizeof(MDB_dbi));
// open the 2 databases
if (mdb_txn_begin(mdb_env, NULL, 0, &db_context->current_transaction) != 0) {
mdb_env_close(mdb_env);
db_context->db_environment = NULL;
return 0;
}
if (mdb_dbi_open(db_context->current_transaction, "DATASTORE", MDB_DUPSORT | MDB_CREATE, db_context->datastore_db ) != 0) {
mdb_txn_abort(db_context->current_transaction);
mdb_env_close(mdb_env);
db_context->db_environment = NULL;
return 0;
}
if (mdb_dbi_open(db_context->current_transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, db_context->journal_db) != 0) {
mdb_txn_abort(db_context->current_transaction);
mdb_env_close(mdb_env);
db_context->db_environment = NULL;
return 0;
}
return 1; return 1;
} }
/*** /***
* Close an LMDB database * Close an LMDB database
* NOTE: for now, argc and argv are not used
* @param argc number of parameters in the argv array
* @param argv parameters to be passed in
* @param datastore the datastore struct that contains information about the opened database * @param datastore the datastore struct that contains information about the opened database
* @returns true(1) on success, otherwise false(0)
*/ */
int repo_fsrepo_lmdb_close(struct Datastore* datastore) { int repo_fsrepo_lmdb_close(struct Datastore* datastore) {
struct MDB_env* mdb_env = (struct MDB_env*)datastore->handle; // check parameters
mdb_env_close(mdb_env); if (datastore == NULL || datastore->datastore_context == NULL)
return 1; return 0;
}
/*** // close the db environment
* Create a new cursor on the datastore database struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context;
* @param datastore the place to store the cursor if (db_context->current_transaction != NULL) {
* @returns true(1) on success, false(0) otherwise mdb_txn_commit(db_context->current_transaction);
*/
int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) {
if (datastore->handle != NULL) {
MDB_env* mdb_env = (MDB_env*)datastore->handle;
MDB_dbi mdb_dbi;
if (datastore->cursor == NULL ) {
datastore->cursor = malloc(sizeof(struct lmdb_trans_cursor));
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)datastore->cursor;
// open transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0)
return 0;
MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction;
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
mdb_txn_commit(mdb_txn);
return 0;
} }
// open cursor mdb_env_close(db_context->db_environment);
if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) {
mdb_txn_commit(mdb_txn); free(db_context->datastore_db);
return 0; free(db_context->journal_db);
}
return 1; free(db_context);
}
}
return 0;
}
/***
* Get a record using a cursor
* @param key the key from the record
* @param key_length the length of the key
* @param value the value of the record
* @param value_length the length of the value
* @param CURSOR_FIRST or CURSOR_NEXT
* @param datastore holds the reference to the opened cursor
* @returns true(1) on success, false(0) otherwise
*/
int repo_fsrepo_lmdb_cursor_get(unsigned char** key, int* key_length,
unsigned char** value, int* value_length,
enum DatastoreCursorOp op, struct Datastore* datastore)
{
if (datastore->cursor != NULL) {
struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)datastore->cursor;
MDB_val mdb_key;
MDB_val mdb_value;
MDB_cursor_op co = MDB_FIRST;
if (op == CURSOR_FIRST)
co = MDB_FIRST;
else if (op == CURSOR_NEXT)
co = MDB_NEXT;
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) {
return 0;
}
*key = (unsigned char*)malloc(mdb_key.mv_size);
memcpy(*key, mdb_key.mv_data, mdb_key.mv_size);
*key_length = mdb_key.mv_size;
if (value != NULL) { // don't do this if a null is passed in, time saver
*value = (unsigned char*)malloc(mdb_value.mv_size);
memcpy(*value, mdb_value.mv_data, mdb_value.mv_size);
*value_length = mdb_value.mv_size;
}
return 1; return 1;
}
return 0;
}
/**
* Close an existing cursor
* @param datastore the context
* @returns true(1) on success
*/
int repo_fsrepo_lmdb_cursor_close(struct Datastore* datastore) {
if (datastore->cursor != NULL) {
struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)datastore->cursor;
if (cursor->cursor != NULL) {
mdb_cursor_close(cursor->cursor);
mdb_txn_commit(cursor->transaction);
free(cursor);
return 1;
}
free(cursor);
}
return 0;
} }
/*** /***
@ -394,10 +390,6 @@ int repo_fsrepo_lmdb_cast(struct Datastore* datastore) {
datastore->datastore_close = &repo_fsrepo_lmdb_close; datastore->datastore_close = &repo_fsrepo_lmdb_close;
datastore->datastore_put = &repo_fsrepo_lmdb_put; datastore->datastore_put = &repo_fsrepo_lmdb_put;
datastore->datastore_get = &repo_fsrepo_lmdb_get; datastore->datastore_get = &repo_fsrepo_lmdb_get;
datastore->datastore_cursor_open = &repo_fsrepo_lmdb_cursor_open;
datastore->datastore_cursor_get = &repo_fsrepo_lmdb_cursor_get;
datastore->datastore_cursor_close = &repo_fsrepo_lmdb_cursor_close;
datastore->cursor = NULL;
return 1; return 1;
} }

View file

@ -1,4 +1,5 @@
#include <string.h> #include <string.h>
#include <errno.h>
#include "varint.h" #include "varint.h"
#include "lmdb.h" #include "lmdb.h"
@ -39,6 +40,7 @@ int lmdb_journalstore_generate_key(const struct JournalRecord* journal_record, s
db_key->mv_data = time_varint; db_key->mv_data = time_varint;
return 1; return 1;
} }
/*** /***
* Convert the JournalRec struct into a lmdb key and lmdb value * Convert the JournalRec struct into a lmdb key and lmdb value
* @param journal_record the record to convert * @param journal_record the record to convert
@ -113,27 +115,35 @@ int lmdb_journalstore_build_record(const struct MDB_val* db_key, const struct MD
* @param hash_size the size of the hash * @param hash_size the size of the hash
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journal_record) { int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord *journalstore_record) {
MDB_dbi mdb_dbi;
struct MDB_val db_key;
struct MDB_val db_value;
if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) { MDB_val journalstore_key;
libp2p_logger_error("lmdb_journalstore", "Unable to generate key value pair for journal_add.\n"); MDB_val journalstore_value;
int createdTransaction = 0;
if (!lmdb_journalstore_build_key_value_pair(journalstore_record, &journalstore_key, &journalstore_value)) {
libp2p_logger_error("lmdbd_journalstore", "add: Unable to convert journalstore record to key/value.\n");
return 0; return 0;
} }
// open the journal table // create transaction if necessary
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { if (journalstore_cursor->transaction == NULL) {
libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); mdb_txn_begin(journalstore_cursor->environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction);
return 0; createdTransaction = 1;
} }
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) {
libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n"); libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n");
return 0; return 0;
} }
if (createdTransaction) {
if (mdb_txn_commit(journalstore_cursor->transaction) != 0) {
libp2p_logger_error("lmdb_journalstore", "Unable to commit JOURNALSTORE transaction.\n");
return 0;
}
}
return 1; return 1;
} }
@ -146,24 +156,34 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journa
*/ */
int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record) int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record)
{ {
if (journalstore_cursor == NULL || journalstore_cursor->transaction == NULL) {
libp2p_logger_error("lmdb_journalstore", "get_record: journalstore cursor not initialized properly.\n"); if (handle == NULL) {
libp2p_logger_error("lmdb_journalstore", "get_record: database environment not set up.\n");
return 0; return 0;
} }
struct lmdb_context *db_context = (struct lmdb_context*)handle;
// create a new transaction if necessary
if (journalstore_cursor->transaction == NULL) {
if (mdb_txn_begin(db_context->db_environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) {
libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n");
return 0;
}
}
if (journalstore_cursor->cursor == NULL) { if (journalstore_cursor->cursor == NULL) {
if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) { if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor, NULL)) {
libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n"); libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n");
return 0; return 0;
} }
} }
// search for the timestamp // search for the timestamp
if (!lmdb_journalstore_cursor_get(journalstore_cursor, CURSOR_FIRST, journalstore_record)) { if (!lmdb_journalstore_cursor_get(journalstore_cursor, CURSOR_FIRST, journalstore_record)) {
libp2p_logger_error("lmdb_journalstore", "Unable to find any records in table.\n"); libp2p_logger_debug("lmdb_journalstore", "Unable to find any records in table.\n");
return 0; return 0;
} }
// now look for the hash key
return 0; return 1;
} }
/** /**
@ -172,9 +192,9 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal
* @param cursor where to place the results * @param cursor where to place the results
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) { int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr, struct MDB_txn* trans_to_use) {
if (handle != NULL) { if (handle != NULL) {
MDB_env* mdb_env = (MDB_env*)handle; struct lmdb_context *db_context = (struct lmdb_context*)handle;
struct lmdb_trans_cursor *cursor = *crsr; struct lmdb_trans_cursor *cursor = *crsr;
if (cursor == NULL ) { if (cursor == NULL ) {
cursor = lmdb_trans_cursor_new(); cursor = lmdb_trans_cursor_new();
@ -182,19 +202,20 @@ int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr)
return 0; return 0;
*crsr = cursor; *crsr = cursor;
} }
cursor->database = db_context->journal_db;
cursor->environment = db_context->db_environment;
cursor->parent_transaction = db_context->current_transaction;
if (cursor->transaction == NULL) { if (cursor->transaction == NULL) {
if (trans_to_use != NULL)
cursor->transaction = trans_to_use;
else {
// open transaction // open transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) { if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &cursor->transaction) != 0) {
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n"); libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n");
return 0; return 0;
} }
} }
if (cursor->database == NULL) {
if (mdb_dbi_open(cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, cursor->database) != 0) {
libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open the dbi to the journalstore");
mdb_txn_commit(cursor->transaction);
return 0;
}
} }
if (cursor->cursor == NULL) { if (cursor->cursor == NULL) {
// open cursor // open cursor
@ -266,10 +287,23 @@ int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *tc, enum DatastoreCur
lmdb_journalstore_generate_key(*record, &mdb_key); lmdb_journalstore_generate_key(*record, &mdb_key);
} }
if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { int retVal = mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co);
if (retVal != 0) {
if (retVal == MDB_NOTFOUND) {
libp2p_logger_debug("lmdb_journalstore", "cursor_get: No records found in db.\n");
} else if (retVal == EINVAL) {
libp2p_logger_debug("lmdb_journalstore", "cursor_get: Invalid parameter specified.\n");
}
return 0; return 0;
} }
if (*record == NULL) {
// make a new record and pass it back
if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, record))
return 0;
return 1;
}
// see if the passed in record has a specific record in mind (take care of duplicate keys) // see if the passed in record has a specific record in mind (take care of duplicate keys)
if ( (*record)->hash_size > 0) { if ( (*record)->hash_size > 0) {
struct JournalRecord* curr_record = NULL; struct JournalRecord* curr_record = NULL;
@ -332,17 +366,21 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR
} }
/** /**
* Close the cursor * Close the cursor and commits the transaction.
* @param crsr a lmdb_trans_cursor pointer * @param crsr a lmdb_trans_cursor pointer
* @returns true(1)
*/ */
int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) { int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction) {
if (cursor != NULL) {
if (cursor->cursor != NULL) { if (cursor->cursor != NULL) {
mdb_cursor_close(cursor->cursor); //mdb_cursor_close(cursor->cursor);
mdb_txn_commit(cursor->transaction);
free(cursor);
return 1;
} else {
free(cursor);
} }
return 0; if (cursor->transaction != NULL && commitTransaction) {
mdb_txn_commit(cursor->transaction);
}
cursor->cursor = NULL;
cursor->transaction = NULL;
lmdb_trans_cursor_free(cursor);
}
return 1;
} }

View file

@ -115,3 +115,8 @@ struct IpfsRouting* ipfs_routing_new_offline (struct IpfsNode* local_node, struc
return offlineRouting; return offlineRouting;
} }
int ipfs_routing_offline_free(ipfs_routing* incoming) {
free(incoming);
return 1;
}

View file

@ -2,6 +2,7 @@
#include "ipfs/journal/journal_entry.h" #include "ipfs/journal/journal_entry.h"
#include "ipfs/journal/journal_message.h" #include "ipfs/journal/journal_message.h"
#include "ipfs/repo/fsrepo/journalstore.h"
int test_journal_encode_decode() { int test_journal_encode_decode() {
int retVal = 0; int retVal = 0;
@ -120,6 +121,7 @@ int test_journal_server_1() {
ipfs_node_offline_new(ipfs_path, &local_node); ipfs_node_offline_new(ipfs_path, &local_node);
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
ipfs_node_free(local_node); ipfs_node_free(local_node);
ipfs_hashtable_node_free(node);
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n"); libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n");
@ -128,6 +130,8 @@ int test_journal_server_1() {
sleep(45); sleep(45);
libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n");
retVal = 1; retVal = 1;
exit: exit:
ipfs_daemon_stop(); ipfs_daemon_stop();
@ -182,7 +186,9 @@ int test_journal_server_2() {
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
thread_started = 1; thread_started = 1;
sleep(120); sleep(45);
libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n");
retVal = 1; retVal = 1;
exit: exit:
@ -191,3 +197,88 @@ int test_journal_server_2() {
pthread_join(daemon_thread, NULL); pthread_join(daemon_thread, NULL);
return retVal; return retVal;
} }
#include "lmdb.h"
// test the lightning db process
int test_journal_db() {
MDB_env *mdb_env = NULL;
MDB_txn *mdb_txn = NULL;
MDB_dbi datastore_db;
MDB_dbi journalstore_db;
MDB_val datastore_key;
MDB_val datastore_value;
MDB_val *journalstore_key;
MDB_val *journalstore_value;
MDB_val returned_value;
// set up records
char* key = "ABC123";
char* value = "Hello, world!";
datastore_key.mv_size = strlen(key);
datastore_key.mv_data = (void*)key;
datastore_value.mv_size = strlen(value);
datastore_value.mv_data = (void*)value;
journalstore_key = (MDB_val*) malloc(sizeof(MDB_val));
journalstore_key->mv_size = strlen(key);
journalstore_key->mv_data = (void*)key;
journalstore_value = (MDB_val*) malloc(sizeof(MDB_val));
journalstore_value->mv_size = strlen(value);
journalstore_value->mv_data = (void*)value;
// clean up the old stuff
unlink ("/tmp/lock.mdb");
unlink ("/tmp/data.mdb");
// create environment
if (mdb_env_create(&mdb_env) != 0)
return 0;
if (mdb_env_set_maxdbs(mdb_env, (MDB_dbi)2) != 0)
return 0;
if (mdb_env_open(mdb_env, "/tmp", 0, S_IRWXU) != 0) {
fprintf(stderr, "Unable to open environment.\n");
return 0;
}
// create a transaction
if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) {
fprintf(stderr, "Unable to open transaction.\n");
return 0;
}
// open databases
if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &datastore_db) != 0)
return 0;
if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &journalstore_db) != 0)
return 0;
// search for a record in the datastore
if (mdb_get(mdb_txn, datastore_db, &datastore_key, &returned_value) != MDB_NOTFOUND) {
return 0;
}
// add record to datastore
if (mdb_put(mdb_txn, datastore_db, &datastore_key, &datastore_value, 0) != 0)
return 0;
// add record to journalstore
if (mdb_put(mdb_txn, journalstore_db, journalstore_key, journalstore_value, 0) != 0)
return 0;
// get rid of MDB_val values from journalstore to see if commit still works
free(journalstore_key);
free(journalstore_value);
journalstore_key = NULL;
journalstore_value = NULL;
// close everything up
if (mdb_txn_commit(mdb_txn) != 0)
return 0;
mdb_env_close(mdb_env);
return 1;
}

View file

@ -8,6 +8,7 @@
#include "mh/multihash.h" #include "mh/multihash.h"
#include "libp2p/crypto/encoding/base58.h" #include "libp2p/crypto/encoding/base58.h"
#include "ipfs/core/ipfs_node.h" #include "ipfs/core/ipfs_node.h"
#include "ipfs/repo/fsrepo/lmdb_cursor.h"
int test_import_large_file() { int test_import_large_file() {
size_t bytes_size = 1000000; //1mb size_t bytes_size = 1000000; //1mb
@ -152,7 +153,8 @@ int test_import_small_file() {
// get the repo // get the repo
drop_and_build_repository(repo_path, 4001, NULL, NULL); drop_and_build_repository(repo_path, 4001, NULL, NULL);
ipfs_node_online_new(repo_path, &local_node);
ipfs_node_offline_new(repo_path, &local_node);
// write to ipfs // write to ipfs
struct HashtableNode* write_node; struct HashtableNode* write_node;
@ -165,11 +167,6 @@ int test_import_small_file() {
// cid should be the same each time // cid should be the same each time
unsigned char cid_test[10] = { 0x1e, 0xcf, 0x04, 0xce, 0x6a, 0xe8, 0xbf, 0xc0, 0xeb, 0xe4 }; unsigned char cid_test[10] = { 0x1e, 0xcf, 0x04, 0xce, 0x6a, 0xe8, 0xbf, 0xc0, 0xeb, 0xe4 };
/*
for (int i = 0; i < 10; i++) {
printf("%02x\n", write_node->hash[i]);
}
*/
for(int i = 0; i < 10; i++) { for(int i = 0; i < 10; i++) {
if (write_node->hash[i] != cid_test[i]) { if (write_node->hash[i] != cid_test[i]) {
@ -207,6 +204,19 @@ int test_import_small_file() {
} }
} }
// attempt to look in the journal for the entry
struct lmdb_context *context = (struct lmdb_context*)local_node->repo->config->datastore->datastore_context;
struct JournalRecord* record = NULL;
struct lmdb_trans_cursor *cursor = lmdb_trans_cursor_new();
cursor->environment = context->db_environment;
cursor->database = context->journal_db;
cursor->parent_transaction = context->current_transaction;
if (mdb_cursor_open(context->current_transaction, *cursor->database, &cursor->cursor) != 0) {
fprintf(stderr, "Unable to open cursor.\n");
} else if (!lmdb_journalstore_cursor_get(cursor, CURSOR_FIRST, &record)) {
fprintf(stderr, "Unable to find any records in the database.\n");
}
ipfs_node_free(local_node); ipfs_node_free(local_node);
ipfs_hashtable_node_free(write_node); ipfs_hashtable_node_free(write_node);
ipfs_hashtable_node_free(read_node); ipfs_hashtable_node_free(read_node);

View file

@ -73,11 +73,15 @@ int test_ipfs_datastore_put() {
* List what is in the journal * List what is in the journal
*/ */
int test_datastore_list_journal() { int test_datastore_list_journal() {
int recCount = 0;
libp2p_logger_add_class("test_datastore"); libp2p_logger_add_class("test_datastore");
libp2p_logger_add_class("lmdb_datastore"); libp2p_logger_add_class("lmdb_datastore");
// need to run test_import_small_file first
// open database // open database
struct FSRepo* fs_repo; struct FSRepo* fs_repo;
if (ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo) == 0) { if (ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo) == 0) {
return 0; return 0;
} }
if (ipfs_repo_fsrepo_open(fs_repo) == 0) { if (ipfs_repo_fsrepo_open(fs_repo) == 0) {
@ -85,7 +89,7 @@ int test_datastore_list_journal() {
} }
// open cursor // open cursor
struct lmdb_trans_cursor *crsr = NULL; struct lmdb_trans_cursor *crsr = NULL;
if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->handle, &crsr)) { if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_context, &crsr, NULL)) {
ipfs_repo_fsrepo_free(fs_repo); ipfs_repo_fsrepo_free(fs_repo);
return 0; return 0;
} }
@ -96,7 +100,9 @@ int test_datastore_list_journal() {
if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) { if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) {
lmdb_journal_record_free(record); lmdb_journal_record_free(record);
record = NULL; record = NULL;
break;
} }
recCount++;
// display record // display record
libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp); libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp);
libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N"); libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N");
@ -105,5 +111,6 @@ int test_datastore_list_journal() {
record = NULL; record = NULL;
op = CURSOR_NEXT; op = CURSOR_NEXT;
} while (record != NULL); } while (record != NULL);
libp2p_logger_error("test_datastore", "Found %d records.\n", recCount);
return 1; return 1;
} }

View file

@ -205,7 +205,7 @@ int drop_build_open_repo(const char* path, struct FSRepo** fs_repo, const char*
if (config_filename_to_copy != NULL) { if (config_filename_to_copy != NULL) {
// attach config filename to path // attach config filename to path
char *config = (char*) malloc(strlen(path) + 7); char *config = (char*) malloc(strlen(path) + 8);
strcpy(config, path); strcpy(config, path);
// erase slash if there is one // erase slash if there is one
if (config[strlen(path)-1] == '/') if (config[strlen(path)-1] == '/')

View file

@ -49,6 +49,7 @@ const char* names[] = {
"test_core_api_startup_shutdown", "test_core_api_startup_shutdown",
"test_daemon_startup_shutdown", "test_daemon_startup_shutdown",
"test_datastore_list_journal", "test_datastore_list_journal",
"test_journal_db",
"test_journal_encode_decode", "test_journal_encode_decode",
"test_journal_server_1", "test_journal_server_1",
"test_journal_server_2", "test_journal_server_2",
@ -110,6 +111,7 @@ int (*funcs[])(void) = {
test_core_api_startup_shutdown, test_core_api_startup_shutdown,
test_daemon_startup_shutdown, test_daemon_startup_shutdown,
test_datastore_list_journal, test_datastore_list_journal,
test_journal_db,
test_journal_encode_decode, test_journal_encode_decode,
test_journal_server_1, test_journal_server_1,
test_journal_server_2, test_journal_server_2,