From cb1ea3ceff305f65693c074e3494b1a8cb9f36ae Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 7 Sep 2017 11:05:56 -0500 Subject: [PATCH 1/4] Sorting a memory leak in datastore/journalstore --- core/ipfs_node.c | 3 + include/ipfs/repo/fsrepo/journalstore.h | 10 +- include/ipfs/repo/fsrepo/lmdb_cursor.h | 2 + include/ipfs/routing/routing.h | 1 + journal/journal.c | 2 +- repo/fsrepo/lmdb_cursor.c | 9 +- repo/fsrepo/lmdb_datastore.c | 255 ++++++++++-------------- repo/fsrepo/lmdb_journalstore.c | 86 +++++--- routing/offline.c | 5 + test/node/test_importer.h | 3 +- test/storage/test_datastore.h | 2 +- test/test_helper.c | 2 +- 12 files changed, 200 insertions(+), 180 deletions(-) diff --git a/core/ipfs_node.c b/core/ipfs_node.c index b5233c4..6186ff4 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -152,6 +152,9 @@ int ipfs_node_free(struct IpfsNode* node) { if (node->mode == MODE_ONLINE) { ipfs_routing_online_free(node->routing); } + if (node->mode == MODE_OFFLINE) { + ipfs_routing_offline_free(node->routing); + } if (node->blockstore != NULL) { ipfs_blockstore_free(node->blockstore); } diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h index 056c725..231c704 100644 --- a/include/ipfs/repo/fsrepo/journalstore.h +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -50,7 +50,7 @@ int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor); int journal_record_free(struct JournalRecord* rec); -int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record); +int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord *journalstore_record); /*** * Attempt to get a specific record identified by its timestamp and bytes @@ -61,3 +61,11 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* record */ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record); +/*** + * Convert the JournalRec struct into a lmdb key and lmdb value + * @param journal_record the record to convert + * @param db_key where to store the key information + * @param db_value where to store the value information + */ +int lmdb_journalstore_build_key_value_pair(const struct JournalRecord* journal_record, struct MDB_val* db_key, struct MDB_val *db_value); + diff --git a/include/ipfs/repo/fsrepo/lmdb_cursor.h b/include/ipfs/repo/fsrepo/lmdb_cursor.h index fe6ba92..5ff0a46 100644 --- a/include/ipfs/repo/fsrepo/lmdb_cursor.h +++ b/include/ipfs/repo/fsrepo/lmdb_cursor.h @@ -3,6 +3,8 @@ #include "lmdb.h" struct lmdb_trans_cursor { + MDB_env* environment; + MDB_txn* parent_transaction; MDB_txn* transaction; MDB_dbi* database; MDB_cursor* cursor; diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index 609a5db..fbfc7db 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -82,6 +82,7 @@ ipfs_routing* ipfs_routing_new_offline (struct IpfsNode* local_node, struct RsaP // online using secio, should probably be deprecated ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key); int ipfs_routing_online_free(ipfs_routing*); +int ipfs_routing_offline_free(ipfs_routing* incoming); // online using DHT/kademlia, the recommended router ipfs_routing* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key); // generic routines diff --git a/journal/journal.c b/journal/journal.c index 33452f1..7f18b33 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -61,7 +61,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { struct Libp2pVector* vector = libp2p_utils_vector_new(1); if (vector != NULL) { struct lmdb_trans_cursor *cursor = NULL; - if (!lmdb_journalstore_cursor_open(database->handle, &cursor)) { + if (!lmdb_journalstore_cursor_open(database->datastore_handle, &cursor)) { libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n"); return NULL; } diff --git a/repo/fsrepo/lmdb_cursor.c b/repo/fsrepo/lmdb_cursor.c index f45be2a..ee0f4db 100644 --- a/repo/fsrepo/lmdb_cursor.c +++ b/repo/fsrepo/lmdb_cursor.c @@ -9,7 +9,9 @@ struct lmdb_trans_cursor* lmdb_trans_cursor_new() { struct lmdb_trans_cursor* out = (struct lmdb_trans_cursor*) malloc(sizeof(struct lmdb_trans_cursor)); if (out != NULL) { + out->environment = NULL; out->cursor = NULL; + out->parent_transaction = NULL; out->transaction = NULL; out->database = NULL; } @@ -22,6 +24,11 @@ struct lmdb_trans_cursor* lmdb_trans_cursor_new() { * @returns true(1) */ int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) { - free(in); + if (in != NULL) { + if (in->database != NULL) { + free(in->database); + } + free(in); + } return 1; } diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index 990d0b9..d27ef35 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -81,6 +81,44 @@ int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct Datastore return 1; } +/*** + * retrieve a record from the database and put in a pre-sized buffer + * using an already allocated transaction, and with an already opened + * database + * @param key the key to look for + * @param key_size the length of the key + * @param record where to put the results + * @param datastore where to look for the data + * @param mdb_txn the already opened db transaction + * @param datastore_table the reference to the already opened datastore table (database) + * @returns true(1) on success + */ +int repo_fsrepo_lmdb_get_with_transaction(const unsigned char* key, size_t key_size, struct DatastoreRecord** record, MDB_txn *mdb_txn, MDB_dbi *datastore_table) { + struct MDB_val db_key; + struct MDB_val db_value; + + // check parameters passed in + if (mdb_txn == NULL || datastore_table == NULL) { + libp2p_logger_error("lmdb_datastore", "get_w_tx: invalid transaction or table reference.\n"); + return 0; + } + + // prepare data + db_key.mv_size = key_size; + db_key.mv_data = (char*)key; + + if (mdb_get(mdb_txn, *datastore_table, &db_key, &db_value) != 0) { + return 0; + } + + if (!repo_fsrepo_lmdb_build_record(&db_key, &db_value, record)) { + return 0; + } + + return 1; + +} + /*** * retrieve a record from the database and put in a pre-sized buffer * @param key the key to look for @@ -92,20 +130,11 @@ int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct Datastore int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) { MDB_txn* mdb_txn; MDB_dbi mdb_dbi; - struct MDB_val db_key; - struct MDB_val db_value; - MDB_env* mdb_env = (MDB_env*)datastore->handle; + MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle; if (mdb_env == NULL) return 0; - // debug - size_t b58size = 100; - uint8_t *b58key = (uint8_t *) malloc(b58size); - libp2p_crypto_encoding_base58_encode(key, key_size, &b58key, &b58size); - libp2p_logger_debug("lmdb_datastore", "Looking for key %s in datastore.\n", b58key); - free(b58key); - // open transaction if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) return 0; @@ -115,28 +144,33 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas return 0; } - // prepare data - db_key.mv_size = key_size; - db_key.mv_data = (char*)key; + int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, &mdb_dbi); - if (mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value) != 0) { - mdb_txn_commit(mdb_txn); - return 0; - } - - if (!repo_fsrepo_lmdb_build_record(&db_key, &db_value, record)) { - mdb_txn_commit(mdb_txn); - return 0; - } - - // clean up mdb_txn_commit(mdb_txn); + return retVal; +} + +/** + * Open the database and create a new transaction + * @param mdb_env the database handle + * @param mdb_dbi the table handle to be created + * @param mdb_txn the transaction to be created + * @returns true(1) on success, false(0) otherwise + */ +int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_dbi *mdb_dbi, MDB_txn **mdb_txn) { + // open transaction + if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0) + return 0; + if (mdb_dbi_open(*mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, mdb_dbi) != 0) { + mdb_txn_commit(*mdb_txn); + return 0; + } return 1; } /** - * Write data to the datastore with the specified key + * Write (or update) data in the datastore with the specified key * @param key the key * @param key_size the length of the key * @param data the data to be written @@ -146,29 +180,36 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas */ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) { int retVal; - MDB_txn* mdb_txn; - MDB_dbi mdb_dbi; - struct MDB_val db_key; - struct MDB_val db_value; - unsigned long long old_timestamp = 0; + MDB_txn *datastore_txn; + MDB_dbi datastore_table; + struct MDB_val datastore_key; + struct MDB_val datastore_value; struct DatastoreRecord *datastore_record = NULL; struct JournalRecord *journalstore_record = NULL; struct lmdb_trans_cursor *journalstore_cursor = NULL; - MDB_env* mdb_env = (MDB_env*)datastore->handle; - if (mdb_env == NULL) + MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle; + if (mdb_env == NULL) { + libp2p_logger_error("lmdb_datastore", "put: invalid datastore handle.\n"); return 0; + } - // open transaction - retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn); - if (retVal != 0) - return 0; - retVal = mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi); - if (retVal != 0) + // open a transaction to the databases + if (!lmdb_datastore_create_transaction(mdb_env, &datastore_table, &datastore_txn)) { return 0; + } - // check the datastore to see if it is already there. If it is there, use its timestamp if it is older. - repo_fsrepo_lmdb_get(key, key_size, &datastore_record, datastore); + // build the journalstore connectivity stuff + journalstore_cursor = lmdb_trans_cursor_new(); + if (journalstore_cursor == NULL) { + libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for journalstore cursor.\n"); + return 0; + } + journalstore_cursor->environment = mdb_env; + journalstore_cursor->parent_transaction = datastore_txn; + + // see if what we want is already in the datastore + repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table); if (datastore_record != NULL) { // build the journalstore_record with the search criteria journalstore_record = lmdb_journal_record_new(); @@ -177,13 +218,14 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha memcpy(journalstore_record->hash, key, key_size); journalstore_record->timestamp = datastore_record->timestamp; // look up the corresponding journalstore record for possible updating - journalstore_cursor = lmdb_trans_cursor_new(); - journalstore_cursor->transaction = mdb_txn; - lmdb_journalstore_get_record((void*)mdb_env, journalstore_cursor, &journalstore_record); + lmdb_journalstore_get_record(datastore->datastore_handle, journalstore_cursor, &journalstore_record); + lmdb_journalstore_cursor_close(journalstore_cursor); } else { // it wasn't previously in the database datastore_record = libp2p_datastore_record_new(); if (datastore_record == NULL) { libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n"); + lmdb_trans_cursor_free(journalstore_cursor); + mdb_txn_commit(datastore_txn); return 0; } } @@ -192,14 +234,16 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha unsigned long long now = os_utils_gmtime(); if (datastore_record->timestamp == 0 || datastore_record->timestamp > now) { //we need to update the timestamp. Be sure to update the journal too. (done further down) - old_timestamp = datastore_record->timestamp; + //old_timestamp = datastore_record->timestamp; datastore_record->timestamp = now; } // fill in the other fields datastore_record->key_size = key_size; - datastore_record->key = (uint8_t*)key; + datastore_record->key = (uint8_t*) malloc(key_size); + memcpy(datastore_record->key, key, key_size); datastore_record->value_size = data_size; - datastore_record->value = data; + datastore_record->value = (uint8_t *) malloc(data_size); + memcpy(datastore_record->value, data, data_size); // convert it into a byte array @@ -208,35 +252,38 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size); // prepare data - db_key.mv_size = key_size; - db_key.mv_data = (char*)key; + datastore_key.mv_size = key_size; + datastore_key.mv_data = (char*)key; // write - db_value.mv_size = record_size; - db_value.mv_data = record; + datastore_value.mv_size = record_size; + datastore_value.mv_data = record; - retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA); + retVal = mdb_put(datastore_txn, datastore_table, &datastore_key, &datastore_value, MDB_NODUPDATA); if (retVal == 0) { - // added the datastore record, now work with the journalstore + // Successfully added the datastore record. Now work with the journalstore. if (journalstore_record != NULL) { if (journalstore_record->timestamp != datastore_record->timestamp) { // we need to update journalstore_record->timestamp = datastore_record->timestamp; lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record); + lmdb_journalstore_cursor_close(journalstore_cursor); lmdb_journal_record_free(journalstore_record); } } else { // add it to the journalstore journalstore_record = lmdb_journal_record_new(); - journalstore_record->hash = (uint8_t*)key; + journalstore_record->hash = (uint8_t*) malloc(key_size); + memcpy(journalstore_record->hash, key, key_size); journalstore_record->hash_size = key_size; journalstore_record->timestamp = datastore_record->timestamp; journalstore_record->pending = 1; // TODO: Calculate this correctly journalstore_record->pin = 1; - if (!lmdb_journalstore_journal_add(mdb_txn, journalstore_record)) { + if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) { libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n"); } + lmdb_journalstore_cursor_close(journalstore_cursor); lmdb_journal_record_free(journalstore_record); retVal = 1; } @@ -247,8 +294,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha } // cleanup + mdb_txn_commit(datastore_txn); free(record); - mdb_txn_commit(mdb_txn); + lmdb_trans_cursor_free(journalstore_cursor); + libp2p_datastore_record_free(datastore_record); return retVal; } @@ -279,7 +328,7 @@ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { return 0; } - datastore->handle = (void*)mdb_env; + datastore->datastore_handle = (void*)mdb_env; return 1; } @@ -291,99 +340,11 @@ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { * @param datastore the datastore struct that contains information about the opened database */ int repo_fsrepo_lmdb_close(struct Datastore* datastore) { - struct MDB_env* mdb_env = (struct MDB_env*)datastore->handle; + struct MDB_env* mdb_env = (struct MDB_env*)datastore->datastore_handle; mdb_env_close(mdb_env); return 1; } -/*** - * Create a new cursor on the datastore database - * @param datastore the place to store the cursor - * @returns true(1) on success, false(0) otherwise - */ -int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) { - if (datastore->handle != NULL) { - MDB_env* mdb_env = (MDB_env*)datastore->handle; - MDB_dbi mdb_dbi; - if (datastore->cursor == NULL ) { - datastore->cursor = malloc(sizeof(struct lmdb_trans_cursor)); - struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)datastore->cursor; - // open transaction - if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) - return 0; - MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; - if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { - mdb_txn_commit(mdb_txn); - return 0; - } - // open cursor - if (mdb_cursor_open(mdb_txn, mdb_dbi, &cursor->cursor) != 0) { - mdb_txn_commit(mdb_txn); - return 0; - } - return 1; - } - } - return 0; -} - -/*** - * Get a record using a cursor - * @param key the key from the record - * @param key_length the length of the key - * @param value the value of the record - * @param value_length the length of the value - * @param CURSOR_FIRST or CURSOR_NEXT - * @param datastore holds the reference to the opened cursor - * @returns true(1) on success, false(0) otherwise - */ -int repo_fsrepo_lmdb_cursor_get(unsigned char** key, int* key_length, - unsigned char** value, int* value_length, - enum DatastoreCursorOp op, struct Datastore* datastore) -{ - if (datastore->cursor != NULL) { - struct lmdb_trans_cursor* tc = (struct lmdb_trans_cursor*)datastore->cursor; - MDB_val mdb_key; - MDB_val mdb_value; - MDB_cursor_op co = MDB_FIRST; - if (op == CURSOR_FIRST) - co = MDB_FIRST; - else if (op == CURSOR_NEXT) - co = MDB_NEXT; - if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { - return 0; - } - *key = (unsigned char*)malloc(mdb_key.mv_size); - memcpy(*key, mdb_key.mv_data, mdb_key.mv_size); - *key_length = mdb_key.mv_size; - if (value != NULL) { // don't do this if a null is passed in, time saver - *value = (unsigned char*)malloc(mdb_value.mv_size); - memcpy(*value, mdb_value.mv_data, mdb_value.mv_size); - *value_length = mdb_value.mv_size; - } - return 1; - } - return 0; -} -/** - * Close an existing cursor - * @param datastore the context - * @returns true(1) on success - */ -int repo_fsrepo_lmdb_cursor_close(struct Datastore* datastore) { - if (datastore->cursor != NULL) { - struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)datastore->cursor; - if (cursor->cursor != NULL) { - mdb_cursor_close(cursor->cursor); - mdb_txn_commit(cursor->transaction); - free(cursor); - return 1; - } - free(cursor); - } - return 0; -} - /*** * Places the LMDB methods into the datastore's function pointers * @param datastore the datastore to fill @@ -394,10 +355,6 @@ int repo_fsrepo_lmdb_cast(struct Datastore* datastore) { datastore->datastore_close = &repo_fsrepo_lmdb_close; datastore->datastore_put = &repo_fsrepo_lmdb_put; datastore->datastore_get = &repo_fsrepo_lmdb_get; - datastore->datastore_cursor_open = &repo_fsrepo_lmdb_cursor_open; - datastore->datastore_cursor_get = &repo_fsrepo_lmdb_cursor_get; - datastore->datastore_cursor_close = &repo_fsrepo_lmdb_cursor_close; - datastore->cursor = NULL; return 1; } diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index 826c207..18c63de 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -39,6 +39,7 @@ int lmdb_journalstore_generate_key(const struct JournalRecord* journal_record, s db_key->mv_data = time_varint; return 1; } + /*** * Convert the JournalRec struct into a lmdb key and lmdb value * @param journal_record the record to convert @@ -113,23 +114,31 @@ int lmdb_journalstore_build_record(const struct MDB_val* db_key, const struct MD * @param hash_size the size of the hash * @returns true(1) on success, false(0) otherwise */ -int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journal_record) { - MDB_dbi mdb_dbi; - struct MDB_val db_key; - struct MDB_val db_value; +int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord *journalstore_record) { - if (!lmdb_journalstore_build_key_value_pair(journal_record, &db_key, &db_value)) { - libp2p_logger_error("lmdb_journalstore", "Unable to generate key value pair for journal_add.\n"); + MDB_val journalstore_key; + MDB_val journalstore_value; + + if (!lmdb_journalstore_build_key_value_pair(journalstore_record, &journalstore_key, &journalstore_value)) { + libp2p_logger_error("lmdbd_journalstore", "add: Unable to convert journalstore record to key/value.\n"); return 0; } - // open the journal table - if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { - libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); - return 0; + // create transaction if necessary + if (journalstore_cursor->transaction == NULL) { + mdb_txn_begin(journalstore_cursor->environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction); } - if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) { + if (journalstore_cursor->database == NULL) { + // open the journal table + journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi)); + if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); + return 0; + } + } + + if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) { libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n"); return 0; } @@ -146,10 +155,34 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, struct JournalRecord* journa */ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journalstore_cursor, struct JournalRecord **journalstore_record) { - if (journalstore_cursor == NULL || journalstore_cursor->transaction == NULL) { - libp2p_logger_error("lmdb_journalstore", "get_record: journalstore cursor not initialized properly.\n"); + + if (handle == NULL) { + libp2p_logger_error("lmdb_journalstore", "get_record: database environment not set up.\n"); return 0; } + struct MDB_env *mdb_env = (struct MDB_env*)handle; + + // create a new transaction if necessary + if (journalstore_cursor->transaction == NULL) { + if (mdb_txn_begin(mdb_env, NULL, 0, &journalstore_cursor->transaction) != 0) { + libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n"); + return 0; + } + } + + if (journalstore_cursor->database == NULL) { + // open the journal table + journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi)); + if (journalstore_cursor->database == NULL) { + libp2p_logger_error("lmdb_journalstore", "get_record: Unable to allocate memory for journalstore database.\n"); + return 0; + } + if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); + return 0; + } + } + if (journalstore_cursor->cursor == NULL) { if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) { libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n"); @@ -158,12 +191,11 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal } // search for the timestamp if (!lmdb_journalstore_cursor_get(journalstore_cursor, CURSOR_FIRST, journalstore_record)) { - libp2p_logger_error("lmdb_journalstore", "Unable to find any records in table.\n"); + libp2p_logger_debug("lmdb_journalstore", "Unable to find any records in table.\n"); return 0; } - // now look for the hash key - return 0; + return 1; } /** @@ -332,17 +364,21 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR } /** - * Close the cursor + * Close the cursor, but does not free the struct. It simply closes the cursor + * and commits the transaction. * @param crsr a lmdb_trans_cursor pointer + * @returns true(1) */ int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) { - if (cursor->cursor != NULL) { - mdb_cursor_close(cursor->cursor); - mdb_txn_commit(cursor->transaction); - free(cursor); - return 1; - } else { - free(cursor); + if (cursor != NULL) { + if (cursor->cursor != NULL) { + mdb_cursor_close(cursor->cursor); + cursor->cursor = NULL; + } + if (cursor->transaction != NULL) { + mdb_txn_commit(cursor->transaction); + cursor->transaction = NULL; + } } - return 0; + return 1; } diff --git a/routing/offline.c b/routing/offline.c index bbf642a..4b79930 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -115,3 +115,8 @@ struct IpfsRouting* ipfs_routing_new_offline (struct IpfsNode* local_node, struc return offlineRouting; } + +int ipfs_routing_offline_free(ipfs_routing* incoming) { + free(incoming); + return 1; +} diff --git a/test/node/test_importer.h b/test/node/test_importer.h index cfe7b69..ca5a650 100644 --- a/test/node/test_importer.h +++ b/test/node/test_importer.h @@ -152,7 +152,8 @@ int test_import_small_file() { // get the repo drop_and_build_repository(repo_path, 4001, NULL, NULL); - ipfs_node_online_new(repo_path, &local_node); + + ipfs_node_offline_new(repo_path, &local_node); // write to ipfs struct HashtableNode* write_node; diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index e591be5..dfe449a 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -85,7 +85,7 @@ int test_datastore_list_journal() { } // open cursor struct lmdb_trans_cursor *crsr = NULL; - if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->handle, &crsr)) { + if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_handle, &crsr)) { ipfs_repo_fsrepo_free(fs_repo); return 0; } diff --git a/test/test_helper.c b/test/test_helper.c index ec283f3..1e24b82 100644 --- a/test/test_helper.c +++ b/test/test_helper.c @@ -205,7 +205,7 @@ int drop_build_open_repo(const char* path, struct FSRepo** fs_repo, const char* if (config_filename_to_copy != NULL) { // attach config filename to path - char *config = (char*) malloc(strlen(path) + 7); + char *config = (char*) malloc(strlen(path) + 8); strcpy(config, path); // erase slash if there is one if (config[strlen(path)-1] == '/') From 78904ff1b6312fea309b6d5cd721387e0f62c277 Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 7 Sep 2017 14:58:02 -0500 Subject: [PATCH 2/4] refactored datastore/journalstore for readability --- repo/fsrepo/fs_repo.c | 1 + repo/fsrepo/lmdb_cursor.c | 3 -- repo/fsrepo/lmdb_datastore.c | 34 +++++++++++-- repo/fsrepo/lmdb_journalstore.c | 18 +++++-- test/journal/test_journal.h | 85 +++++++++++++++++++++++++++++++++ test/testit.c | 2 + 6 files changed, 132 insertions(+), 11 deletions(-) diff --git a/repo/fsrepo/fs_repo.c b/repo/fsrepo/fs_repo.c index 0abf04d..b2748c5 100644 --- a/repo/fsrepo/fs_repo.c +++ b/repo/fsrepo/fs_repo.c @@ -502,6 +502,7 @@ int fs_repo_open_config(struct FSRepo* repo) { continue; // make multiaddress a peer struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur); + multiaddress_free(cur); struct ReplicationPeer* rp = repo_config_replication_peer_new(); rp->peer = peer; libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer)); diff --git a/repo/fsrepo/lmdb_cursor.c b/repo/fsrepo/lmdb_cursor.c index ee0f4db..c411547 100644 --- a/repo/fsrepo/lmdb_cursor.c +++ b/repo/fsrepo/lmdb_cursor.c @@ -25,9 +25,6 @@ struct lmdb_trans_cursor* lmdb_trans_cursor_new() { */ int lmdb_trans_cursor_free(struct lmdb_trans_cursor* in) { if (in != NULL) { - if (in->database != NULL) { - free(in->database); - } free(in); } return 1; diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index d27ef35..3034599 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -158,12 +158,28 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas * @param mdb_txn the transaction to be created * @returns true(1) on success, false(0) otherwise */ -int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_dbi *mdb_dbi, MDB_txn **mdb_txn) { +int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_txn **mdb_txn) { // open transaction if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0) return 0; - if (mdb_dbi_open(*mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, mdb_dbi) != 0) { - mdb_txn_commit(*mdb_txn); + return 1; +} + +int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *datastore_table, MDB_dbi* journalstore_table) { + if (mdb_env == NULL) { + libp2p_logger_error("lmdb_datastore", "open_databases: environment not set.\n"); + return 0; + } + if (mdb_txn == NULL) { + libp2p_logger_error("lmdb_datastore", "open_database: transaction does not exist.\n"); + return 0; + } + if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, datastore_table) != 0) { + libp2p_logger_error("lmdb_datastore", "open_database: Unable to open datastore.\n"); + return 0; + } + if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_table) != 0) { + libp2p_logger_error("lmdb_datastore", "open_database: Unable to open journalstore.\n"); return 0; } return 1; @@ -182,6 +198,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha int retVal; MDB_txn *datastore_txn; MDB_dbi datastore_table; + MDB_dbi journalstore_table; struct MDB_val datastore_key; struct MDB_val datastore_value; struct DatastoreRecord *datastore_record = NULL; @@ -195,7 +212,13 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha } // open a transaction to the databases - if (!lmdb_datastore_create_transaction(mdb_env, &datastore_table, &datastore_txn)) { + if (!lmdb_datastore_create_transaction(mdb_env, &datastore_txn)) { + libp2p_logger_error("lmdb_datastore", "put: Unable to create db transaction.\n"); + return 0; + } + + if (!lmdb_datastore_open_databases(mdb_env, datastore_txn, &datastore_table, &journalstore_table)) { + libp2p_logger_error("lmdb_datastore", "put: Unable to open database tables.\n"); return 0; } @@ -207,6 +230,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha } journalstore_cursor->environment = mdb_env; journalstore_cursor->parent_transaction = datastore_txn; + journalstore_cursor->database = &journalstore_table; // see if what we want is already in the datastore repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table); @@ -283,7 +307,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) { libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n"); } - lmdb_journalstore_cursor_close(journalstore_cursor); + //lmdb_journalstore_cursor_close(journalstore_cursor); lmdb_journal_record_free(journalstore_record); retVal = 1; } diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index 18c63de..e369dd5 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -118,6 +118,7 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, MDB_val journalstore_key; MDB_val journalstore_value; + int createdTransaction = 0; if (!lmdb_journalstore_build_key_value_pair(journalstore_record, &journalstore_key, &journalstore_value)) { libp2p_logger_error("lmdbd_journalstore", "add: Unable to convert journalstore record to key/value.\n"); @@ -127,6 +128,7 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, // create transaction if necessary if (journalstore_cursor->transaction == NULL) { mdb_txn_begin(journalstore_cursor->environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction); + createdTransaction = 1; } if (journalstore_cursor->database == NULL) { @@ -138,11 +140,21 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, } } + // for debuggin + MDB_txn *tx = journalstore_cursor->transaction; + MDB_dbi dbi = *journalstore_cursor->database; if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) { libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n"); return 0; } + if (createdTransaction) { + if (mdb_txn_commit(journalstore_cursor->transaction) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to commit JOURNALSTORE transaction.\n"); + return 0; + } + } + return 1; } @@ -164,7 +176,7 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal // create a new transaction if necessary if (journalstore_cursor->transaction == NULL) { - if (mdb_txn_begin(mdb_env, NULL, 0, &journalstore_cursor->transaction) != 0) { + if (mdb_txn_begin(mdb_env, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) { libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n"); return 0; } @@ -373,12 +385,12 @@ int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) { if (cursor != NULL) { if (cursor->cursor != NULL) { mdb_cursor_close(cursor->cursor); - cursor->cursor = NULL; } if (cursor->transaction != NULL) { mdb_txn_commit(cursor->transaction); - cursor->transaction = NULL; } + cursor->cursor = NULL; + cursor->transaction = NULL; } return 1; } diff --git a/test/journal/test_journal.h b/test/journal/test_journal.h index 91b86dd..119ac6f 100644 --- a/test/journal/test_journal.h +++ b/test/journal/test_journal.h @@ -120,6 +120,7 @@ int test_journal_server_1() { ipfs_node_offline_new(ipfs_path, &local_node); ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); ipfs_node_free(local_node); + ipfs_hashtable_node_free(node); libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n"); @@ -191,3 +192,87 @@ int test_journal_server_2() { pthread_join(daemon_thread, NULL); return retVal; } + +#include "lmdb.h" + +// test the lightning db process +int test_journal_db() { + + MDB_env *mdb_env = NULL; + MDB_txn *mdb_txn = NULL; + MDB_dbi datastore_db; + MDB_dbi journalstore_db; + MDB_val datastore_key; + MDB_val datastore_value; + MDB_val *journalstore_key; + MDB_val *journalstore_value; + MDB_val returned_value; + + // set up records + char* key = "ABC123"; + char* value = "Hello, world!"; + datastore_key.mv_size = strlen(key); + datastore_key.mv_data = (void*)key; + datastore_value.mv_size = strlen(value); + datastore_value.mv_data = (void*)value; + journalstore_key = (MDB_val*) malloc(sizeof(MDB_val)); + journalstore_key->mv_size = strlen(key); + journalstore_key->mv_data = (void*)key; + journalstore_value = (MDB_val*) malloc(sizeof(MDB_val)); + journalstore_value->mv_size = strlen(value); + journalstore_value->mv_data = (void*)value; + + // clean up the old stuff + unlink ("/tmp/lock.mdb"); + unlink ("/tmp/data.mdb"); + + // create environment + if (mdb_env_create(&mdb_env) != 0) + return 0; + + if (mdb_env_set_maxdbs(mdb_env, (MDB_dbi)2) != 0) + return 0; + + if (mdb_env_open(mdb_env, "/tmp", 0, S_IRWXU) != 0) { + fprintf(stderr, "Unable to open environment.\n"); + return 0; + } + + // create a transaction + if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) { + fprintf(stderr, "Unable to open transaction.\n"); + return 0; + } + + // open databases + if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &datastore_db) != 0) + return 0; + if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &journalstore_db) != 0) + return 0; + + // search for a record in the datastore + if (mdb_get(mdb_txn, datastore_db, &datastore_key, &returned_value) != MDB_NOTFOUND) { + return 0; + } + + // add record to datastore + if (mdb_put(mdb_txn, datastore_db, &datastore_key, &datastore_value, 0) != 0) + return 0; + + // add record to journalstore + if (mdb_put(mdb_txn, journalstore_db, journalstore_key, journalstore_value, 0) != 0) + return 0; + + // get rid of MDB_val values from journalstore to see if commit still works + free(journalstore_key); + free(journalstore_value); + journalstore_key = NULL; + journalstore_value = NULL; + + // close everything up + if (mdb_txn_commit(mdb_txn) != 0) + return 0; + mdb_env_close(mdb_env); + + return 1; +} diff --git a/test/testit.c b/test/testit.c index 38560ac..b353905 100644 --- a/test/testit.c +++ b/test/testit.c @@ -47,6 +47,7 @@ const char* names[] = { "test_cid_protobuf_encode_decode", "test_daemon_startup_shutdown", "test_datastore_list_journal", + "test_journal_db", "test_journal_encode_decode", "test_journal_server_1", "test_journal_server_2", @@ -107,6 +108,7 @@ int (*funcs[])(void) = { test_cid_protobuf_encode_decode, test_daemon_startup_shutdown, test_datastore_list_journal, + test_journal_db, test_journal_encode_decode, test_journal_server_1, test_journal_server_2, From a9481631df560add15c2d23f75bd262df09f20cc Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 7 Sep 2017 18:45:09 -0500 Subject: [PATCH 3/4] Several memory leak fixes for journal code --- exchange/bitswap/bitswap.c | 5 +- include/ipfs/repo/fsrepo/journalstore.h | 4 +- include/ipfs/repo/fsrepo/lmdb_cursor.h | 7 ++ journal/journal.c | 23 ++-- repo/fsrepo/lmdb_datastore.c | 137 +++++++++++++----------- repo/fsrepo/lmdb_journalstore.c | 84 +++++++-------- test/journal/test_journal.h | 8 +- test/node/test_importer.h | 19 +++- test/storage/test_datastore.h | 11 +- 9 files changed, 163 insertions(+), 135 deletions(-) diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 1973678..39bb667 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -3,6 +3,7 @@ */ #include #include // for sleep() +#include "libp2p/os/utils.h" #include "libp2p/utils/logger.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/exchange/exchange.h" @@ -13,9 +14,7 @@ #include "ipfs/exchange/bitswap/want_manager.h" int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) { - if (incoming_size < 8) - return 0; - char* result = strstr((char*)incoming, "/ipfs/bitswap"); + char* result = strnstr((char*)incoming, "/ipfs/bitswap", incoming_size); if(result == NULL || result != (char*)incoming) return 0; return 1; diff --git a/include/ipfs/repo/fsrepo/journalstore.h b/include/ipfs/repo/fsrepo/journalstore.h index 231c704..eb1fcc0 100644 --- a/include/ipfs/repo/fsrepo/journalstore.h +++ b/include/ipfs/repo/fsrepo/journalstore.h @@ -28,7 +28,7 @@ int lmdb_journal_record_free(struct JournalRecord* rec); * @param cursor where to place the results * @returns true(1) on success, false(0) otherwise */ -int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor); +int lmdb_journalstore_cursor_open(void* db_handle, struct lmdb_trans_cursor **cursor, struct MDB_txn *trans_to_use); /** * Read a record from the cursor @@ -46,7 +46,7 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR /** * Close the cursor */ -int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor); +int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction); int journal_record_free(struct JournalRecord* rec); diff --git a/include/ipfs/repo/fsrepo/lmdb_cursor.h b/include/ipfs/repo/fsrepo/lmdb_cursor.h index 5ff0a46..7e7b147 100644 --- a/include/ipfs/repo/fsrepo/lmdb_cursor.h +++ b/include/ipfs/repo/fsrepo/lmdb_cursor.h @@ -2,6 +2,13 @@ #include "lmdb.h" +struct lmdb_context { + MDB_env *db_environment; + MDB_txn *current_transaction; + MDB_dbi *datastore_db; + MDB_dbi *journal_db; +}; + struct lmdb_trans_cursor { MDB_env* environment; MDB_txn* parent_transaction; diff --git a/journal/journal.c b/journal/journal.c index 7f18b33..487ab25 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -17,9 +17,10 @@ * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. */ int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) { - if (incoming_size < 8) + const char* protocol = "/ipfs/journalio/1.0.0"; + if (incoming_size < 21) return 0; - char* result = strstr((char*)incoming, "/ipfs/journalio/1.0.0"); + char* result = strnstr((char*)incoming, protocol, incoming_size); if(result == NULL || result != (char*)incoming) return 0; libp2p_logger_debug("journal", "Handling incoming message.\n"); @@ -61,7 +62,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { struct Libp2pVector* vector = libp2p_utils_vector_new(1); if (vector != NULL) { struct lmdb_trans_cursor *cursor = NULL; - if (!lmdb_journalstore_cursor_open(database->datastore_handle, &cursor)) { + if (!lmdb_journalstore_cursor_open(database->datastore_context, &cursor, NULL)) { libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n"); return NULL; } @@ -69,7 +70,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { if (!lmdb_journalstore_cursor_get(cursor, CURSOR_LAST, &rec)) { libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n"); libp2p_utils_vector_free(vector); - lmdb_journalstore_cursor_close(cursor); + lmdb_journalstore_cursor_close(cursor, 1); return NULL; } // we've got one, now start the loop @@ -83,7 +84,7 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { i++; } while(i < n); libp2p_logger_debug("journal", "Closing journalstore cursor.\n"); - lmdb_journalstore_cursor_close(cursor); + lmdb_journalstore_cursor_close(cursor, 1); } else { libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n"); } @@ -147,6 +148,7 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli if (journal_records == NULL || journal_records->total == 0) { // nothing to do libp2p_logger_debug("journal", "There are no journal records to process.\n"); + replication_peer->lastConnect = os_utils_gmtime(); return 1; } // build the message @@ -169,17 +171,11 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli return 0; } memcpy(entry->hash, rec->hash, entry->hash_size); - // debugging - size_t b58size = 100; - uint8_t *b58key = (uint8_t*) malloc(b58size); - libp2p_crypto_encoding_base58_encode(entry->hash, entry->hash_size, &b58key, &b58size); - free(b58key); - libp2p_logger_debug("journal", "Adding hash %s to JournalMessage.\n", b58key); libp2p_utils_vector_add(message->journal_entries, entry); } // send the message message->current_epoch = os_utils_gmtime(); - libp2p_logger_debug("journal", "Sending message to %s.\n", peer->id); + libp2p_logger_debug("journal", "Sending message to %s.\n", libp2p_peer_id_to_string(peer)); int retVal = ipfs_journal_send_message(local_node, peer, message); if (retVal) { replication_peer->lastConnect = message->current_epoch; @@ -308,6 +304,7 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s if (second_read) { free(incoming_pos); } + ipfs_journal_message_free(message); return -1; } struct Libp2pVector* todo_vector = NULL; @@ -345,6 +342,8 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s } //TODO: set new values in their ReplicationPeer struct + ipfs_journal_message_free(message); + if (second_read) free(incoming_pos); return 1; diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index 3034599..9a41a01 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -129,22 +129,22 @@ int repo_fsrepo_lmdb_get_with_transaction(const unsigned char* key, size_t key_s */ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) { MDB_txn* mdb_txn; - MDB_dbi mdb_dbi; - MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle; - if (mdb_env == NULL) + if (datastore == NULL || datastore->datastore_context == NULL) { + libp2p_logger_error("lmdb_datastore", "get: datastore not initialized.\n"); return 0; - - // open transaction - if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) - return 0; - - if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { - mdb_txn_commit(mdb_txn); + } + struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context; + if (db_context->db_environment == NULL) { + libp2p_logger_error("lmdb_datastore", "get: datastore environment not initialized.\n"); return 0; } - int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, &mdb_dbi); + // open transaction + if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &mdb_txn) != 0) + return 0; + + int retVal = repo_fsrepo_lmdb_get_with_transaction(key, key_size, record, mdb_txn, db_context->datastore_db); mdb_txn_commit(mdb_txn); @@ -158,33 +158,13 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas * @param mdb_txn the transaction to be created * @returns true(1) on success, false(0) otherwise */ -int lmdb_datastore_create_transaction(MDB_env *mdb_env, MDB_txn **mdb_txn) { +int lmdb_datastore_create_transaction(struct lmdb_context *db_context, MDB_txn **mdb_txn) { // open transaction - if (mdb_txn_begin(mdb_env, NULL, 0, mdb_txn) != 0) + if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, mdb_txn) != 0) return 0; return 1; } -int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *datastore_table, MDB_dbi* journalstore_table) { - if (mdb_env == NULL) { - libp2p_logger_error("lmdb_datastore", "open_databases: environment not set.\n"); - return 0; - } - if (mdb_txn == NULL) { - libp2p_logger_error("lmdb_datastore", "open_database: transaction does not exist.\n"); - return 0; - } - if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, datastore_table) != 0) { - libp2p_logger_error("lmdb_datastore", "open_database: Unable to open datastore.\n"); - return 0; - } - if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_table) != 0) { - libp2p_logger_error("lmdb_datastore", "open_database: Unable to open journalstore.\n"); - return 0; - } - return 1; -} - /** * Write (or update) data in the datastore with the specified key * @param key the key @@ -196,44 +176,38 @@ int lmdb_datastore_open_databases(MDB_env *mdb_env, MDB_txn *mdb_txn, MDB_dbi *d */ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) { int retVal; - MDB_txn *datastore_txn; - MDB_dbi datastore_table; - MDB_dbi journalstore_table; + struct MDB_txn *child_transaction; struct MDB_val datastore_key; struct MDB_val datastore_value; struct DatastoreRecord *datastore_record = NULL; struct JournalRecord *journalstore_record = NULL; struct lmdb_trans_cursor *journalstore_cursor = NULL; - MDB_env* mdb_env = (MDB_env*)datastore->datastore_handle; - if (mdb_env == NULL) { + if (datastore == NULL || datastore->datastore_context == NULL) + return 0; + + struct lmdb_context *db_context = (struct lmdb_context*)datastore->datastore_context; + + if (db_context->db_environment == NULL) { libp2p_logger_error("lmdb_datastore", "put: invalid datastore handle.\n"); return 0; } // open a transaction to the databases - if (!lmdb_datastore_create_transaction(mdb_env, &datastore_txn)) { + if (!lmdb_datastore_create_transaction(db_context, &child_transaction)) { libp2p_logger_error("lmdb_datastore", "put: Unable to create db transaction.\n"); return 0; } - if (!lmdb_datastore_open_databases(mdb_env, datastore_txn, &datastore_table, &journalstore_table)) { - libp2p_logger_error("lmdb_datastore", "put: Unable to open database tables.\n"); - return 0; - } - // build the journalstore connectivity stuff - journalstore_cursor = lmdb_trans_cursor_new(); + lmdb_journalstore_cursor_open(datastore->datastore_context, &journalstore_cursor, child_transaction); if (journalstore_cursor == NULL) { libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for journalstore cursor.\n"); return 0; } - journalstore_cursor->environment = mdb_env; - journalstore_cursor->parent_transaction = datastore_txn; - journalstore_cursor->database = &journalstore_table; // see if what we want is already in the datastore - repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, datastore_txn, &datastore_table); + repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, child_transaction, db_context->datastore_db); if (datastore_record != NULL) { // build the journalstore_record with the search criteria journalstore_record = lmdb_journal_record_new(); @@ -242,14 +216,13 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha memcpy(journalstore_record->hash, key, key_size); journalstore_record->timestamp = datastore_record->timestamp; // look up the corresponding journalstore record for possible updating - lmdb_journalstore_get_record(datastore->datastore_handle, journalstore_cursor, &journalstore_record); - lmdb_journalstore_cursor_close(journalstore_cursor); + lmdb_journalstore_get_record(db_context, journalstore_cursor, &journalstore_record); } else { // it wasn't previously in the database datastore_record = libp2p_datastore_record_new(); if (datastore_record == NULL) { libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n"); lmdb_trans_cursor_free(journalstore_cursor); - mdb_txn_commit(datastore_txn); + mdb_txn_commit(child_transaction); return 0; } } @@ -283,7 +256,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha datastore_value.mv_size = record_size; datastore_value.mv_data = record; - retVal = mdb_put(datastore_txn, datastore_table, &datastore_key, &datastore_value, MDB_NODUPDATA); + retVal = mdb_put(child_transaction, *db_context->datastore_db, &datastore_key, &datastore_value, MDB_NODUPDATA); if (retVal == 0) { // Successfully added the datastore record. Now work with the journalstore. @@ -292,7 +265,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha // we need to update journalstore_record->timestamp = datastore_record->timestamp; lmdb_journalstore_cursor_put(journalstore_cursor, journalstore_record); - lmdb_journalstore_cursor_close(journalstore_cursor); + lmdb_journalstore_cursor_close(journalstore_cursor, 0); lmdb_journal_record_free(journalstore_record); } } else { @@ -307,7 +280,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha if (!lmdb_journalstore_journal_add(journalstore_cursor, journalstore_record)) { libp2p_logger_error("lmdb_datastore", "Datastore record was added, but problem adding Journalstore record. Continuing.\n"); } - //lmdb_journalstore_cursor_close(journalstore_cursor); + lmdb_journalstore_cursor_close(journalstore_cursor, 0); lmdb_journal_record_free(journalstore_record); retVal = 1; } @@ -318,9 +291,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha } // cleanup - mdb_txn_commit(datastore_txn); + if (mdb_txn_commit(child_transaction) != 0) { + libp2p_logger_error("lmdb_datastore", "lmdb_put: transaction commit failed.\n"); + } free(record); - lmdb_trans_cursor_free(journalstore_cursor); libp2p_datastore_record_free(datastore_record); return retVal; } @@ -330,6 +304,8 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha * Note: for now, the parameters are not used * @param argc number of parameters in the following array * @param argv an array of parameters + * @param datastore the datastore struct + * @returns true(1) on success, false(0) otherwise */ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { // create environment @@ -352,20 +328,55 @@ int repo_fsrepro_lmdb_open(int argc, char** argv, struct Datastore* datastore) { return 0; } - datastore->datastore_handle = (void*)mdb_env; + struct lmdb_context *db_context = (struct lmdb_context *) malloc(sizeof(struct lmdb_context)); + datastore->datastore_context = (void*) db_context; + db_context->db_environment = (void*)mdb_env; + db_context->datastore_db = (MDB_dbi*) malloc(sizeof(MDB_dbi)); + db_context->journal_db = (MDB_dbi*) malloc(sizeof(MDB_dbi)); + + // open the 2 databases + if (mdb_txn_begin(mdb_env, NULL, 0, &db_context->current_transaction) != 0) { + mdb_env_close(mdb_env); + db_context->db_environment = NULL; + return 0; + } + if (mdb_dbi_open(db_context->current_transaction, "DATASTORE", MDB_DUPSORT | MDB_CREATE, db_context->datastore_db ) != 0) { + mdb_txn_abort(db_context->current_transaction); + mdb_env_close(mdb_env); + db_context->db_environment = NULL; + return 0; + } + if (mdb_dbi_open(db_context->current_transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, db_context->journal_db) != 0) { + mdb_txn_abort(db_context->current_transaction); + mdb_env_close(mdb_env); + db_context->db_environment = NULL; + return 0; + } return 1; } /*** * Close an LMDB database - * NOTE: for now, argc and argv are not used - * @param argc number of parameters in the argv array - * @param argv parameters to be passed in * @param datastore the datastore struct that contains information about the opened database + * @returns true(1) on success, otherwise false(0) */ int repo_fsrepo_lmdb_close(struct Datastore* datastore) { - struct MDB_env* mdb_env = (struct MDB_env*)datastore->datastore_handle; - mdb_env_close(mdb_env); + // check parameters + if (datastore == NULL || datastore->datastore_context == NULL) + return 0; + + // close the db environment + struct lmdb_context *db_context = (struct lmdb_context*) datastore->datastore_context; + if (db_context->current_transaction != NULL) { + mdb_txn_commit(db_context->current_transaction); + } + mdb_env_close(db_context->db_environment); + + free(db_context->datastore_db); + free(db_context->journal_db); + + free(db_context); + return 1; } diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index e369dd5..eaa29fa 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -1,4 +1,5 @@ #include +#include #include "varint.h" #include "lmdb.h" @@ -131,18 +132,6 @@ int lmdb_journalstore_journal_add(struct lmdb_trans_cursor *journalstore_cursor, createdTransaction = 1; } - if (journalstore_cursor->database == NULL) { - // open the journal table - journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi)); - if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) { - libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); - return 0; - } - } - - // for debuggin - MDB_txn *tx = journalstore_cursor->transaction; - MDB_dbi dbi = *journalstore_cursor->database; if (mdb_put(journalstore_cursor->transaction, *journalstore_cursor->database, &journalstore_key, &journalstore_value, 0) != 0) { libp2p_logger_error("lmdb_journalstore", "Unable to add to JOURNALSTORE database.\n"); return 0; @@ -172,31 +161,18 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal libp2p_logger_error("lmdb_journalstore", "get_record: database environment not set up.\n"); return 0; } - struct MDB_env *mdb_env = (struct MDB_env*)handle; + struct lmdb_context *db_context = (struct lmdb_context*)handle; // create a new transaction if necessary if (journalstore_cursor->transaction == NULL) { - if (mdb_txn_begin(mdb_env, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) { + if (mdb_txn_begin(db_context->db_environment, journalstore_cursor->parent_transaction, 0, &journalstore_cursor->transaction) != 0) { libp2p_logger_error("lmdb_journanstore", "get_record: Attempt to begin transaction failed.\n"); return 0; } } - if (journalstore_cursor->database == NULL) { - // open the journal table - journalstore_cursor->database = (MDB_dbi*) malloc(sizeof(MDB_dbi)); - if (journalstore_cursor->database == NULL) { - libp2p_logger_error("lmdb_journalstore", "get_record: Unable to allocate memory for journalstore database.\n"); - return 0; - } - if (mdb_dbi_open(journalstore_cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, journalstore_cursor->database) != 0) { - libp2p_logger_error("lmdb_journalstore", "Unable to open JOURNALSTORE database.\n"); - return 0; - } - } - if (journalstore_cursor->cursor == NULL) { - if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor)) { + if (!lmdb_journalstore_cursor_open(handle, &journalstore_cursor, NULL)) { libp2p_logger_error("lmdb_journalstore", "Unable to open cursor in get_record.\n"); return 0; } @@ -216,9 +192,9 @@ int lmdb_journalstore_get_record(void* handle, struct lmdb_trans_cursor *journal * @param cursor where to place the results * @returns true(1) on success, false(0) otherwise */ -int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) { +int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr, struct MDB_txn* trans_to_use) { if (handle != NULL) { - MDB_env* mdb_env = (MDB_env*)handle; + struct lmdb_context *db_context = (struct lmdb_context*)handle; struct lmdb_trans_cursor *cursor = *crsr; if (cursor == NULL ) { cursor = lmdb_trans_cursor_new(); @@ -226,18 +202,19 @@ int lmdb_journalstore_cursor_open(void *handle, struct lmdb_trans_cursor **crsr) return 0; *crsr = cursor; } + cursor->database = db_context->journal_db; + cursor->environment = db_context->db_environment; + cursor->parent_transaction = db_context->current_transaction; + if (cursor->transaction == NULL) { - // open transaction - if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) { - libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n"); - return 0; - } - } - if (cursor->database == NULL) { - if (mdb_dbi_open(cursor->transaction, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, cursor->database) != 0) { - libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to open the dbi to the journalstore"); - mdb_txn_commit(cursor->transaction); - return 0; + if (trans_to_use != NULL) + cursor->transaction = trans_to_use; + else { + // open transaction + if (mdb_txn_begin(db_context->db_environment, db_context->current_transaction, 0, &cursor->transaction) != 0) { + libp2p_logger_error("lmdb_journalstore", "cursor_open: Unable to begin a transaction.\n"); + return 0; + } } } if (cursor->cursor == NULL) { @@ -310,10 +287,23 @@ int lmdb_journalstore_cursor_get(struct lmdb_trans_cursor *tc, enum DatastoreCur lmdb_journalstore_generate_key(*record, &mdb_key); } - if (mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co) != 0) { + int retVal = mdb_cursor_get(tc->cursor, &mdb_key, &mdb_value, co); + if (retVal != 0) { + if (retVal == MDB_NOTFOUND) { + libp2p_logger_debug("lmdb_journalstore", "cursor_get: No records found in db.\n"); + } else if (retVal == EINVAL) { + libp2p_logger_debug("lmdb_journalstore", "cursor_get: Invalid parameter specified.\n"); + } return 0; } + if (*record == NULL) { + // make a new record and pass it back + if (!lmdb_journalstore_build_record(&mdb_key, &mdb_value, record)) + return 0; + return 1; + } + // see if the passed in record has a specific record in mind (take care of duplicate keys) if ( (*record)->hash_size > 0) { struct JournalRecord* curr_record = NULL; @@ -376,21 +366,21 @@ int lmdb_journalstore_cursor_put(struct lmdb_trans_cursor *crsr, struct JournalR } /** - * Close the cursor, but does not free the struct. It simply closes the cursor - * and commits the transaction. + * Close the cursor and commits the transaction. * @param crsr a lmdb_trans_cursor pointer * @returns true(1) */ -int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor) { +int lmdb_journalstore_cursor_close(struct lmdb_trans_cursor *cursor, int commitTransaction) { if (cursor != NULL) { if (cursor->cursor != NULL) { - mdb_cursor_close(cursor->cursor); + //mdb_cursor_close(cursor->cursor); } - if (cursor->transaction != NULL) { + if (cursor->transaction != NULL && commitTransaction) { mdb_txn_commit(cursor->transaction); } cursor->cursor = NULL; cursor->transaction = NULL; + lmdb_trans_cursor_free(cursor); } return 1; } diff --git a/test/journal/test_journal.h b/test/journal/test_journal.h index 119ac6f..18d6cb1 100644 --- a/test/journal/test_journal.h +++ b/test/journal/test_journal.h @@ -2,6 +2,7 @@ #include "ipfs/journal/journal_entry.h" #include "ipfs/journal/journal_message.h" +#include "ipfs/repo/fsrepo/journalstore.h" int test_journal_encode_decode() { int retVal = 0; @@ -129,6 +130,8 @@ int test_journal_server_1() { sleep(45); + libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n"); + retVal = 1; exit: ipfs_daemon_stop(); @@ -183,7 +186,9 @@ int test_journal_server_2() { pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); thread_started = 1; - sleep(120); + sleep(45); + + libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n"); retVal = 1; exit: @@ -272,6 +277,7 @@ int test_journal_db() { // close everything up if (mdb_txn_commit(mdb_txn) != 0) return 0; + mdb_env_close(mdb_env); return 1; diff --git a/test/node/test_importer.h b/test/node/test_importer.h index ca5a650..15e7a6e 100644 --- a/test/node/test_importer.h +++ b/test/node/test_importer.h @@ -8,6 +8,7 @@ #include "mh/multihash.h" #include "libp2p/crypto/encoding/base58.h" #include "ipfs/core/ipfs_node.h" +#include "ipfs/repo/fsrepo/lmdb_cursor.h" int test_import_large_file() { size_t bytes_size = 1000000; //1mb @@ -166,11 +167,6 @@ int test_import_small_file() { // cid should be the same each time unsigned char cid_test[10] = { 0x1e, 0xcf, 0x04, 0xce, 0x6a, 0xe8, 0xbf, 0xc0, 0xeb, 0xe4 }; - /* - for (int i = 0; i < 10; i++) { - printf("%02x\n", write_node->hash[i]); - } - */ for(int i = 0; i < 10; i++) { if (write_node->hash[i] != cid_test[i]) { @@ -208,6 +204,19 @@ int test_import_small_file() { } } + // attempt to look in the journal for the entry + struct lmdb_context *context = (struct lmdb_context*)local_node->repo->config->datastore->datastore_context; + struct JournalRecord* record = NULL; + struct lmdb_trans_cursor *cursor = lmdb_trans_cursor_new(); + cursor->environment = context->db_environment; + cursor->database = context->journal_db; + cursor->parent_transaction = context->current_transaction; + if (mdb_cursor_open(context->current_transaction, *cursor->database, &cursor->cursor) != 0) { + fprintf(stderr, "Unable to open cursor.\n"); + } else if (!lmdb_journalstore_cursor_get(cursor, CURSOR_FIRST, &record)) { + fprintf(stderr, "Unable to find any records in the database.\n"); + } + ipfs_node_free(local_node); ipfs_hashtable_node_free(write_node); ipfs_hashtable_node_free(read_node); diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index dfe449a..234139d 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -73,11 +73,15 @@ int test_ipfs_datastore_put() { * List what is in the journal */ int test_datastore_list_journal() { + int recCount = 0; libp2p_logger_add_class("test_datastore"); libp2p_logger_add_class("lmdb_datastore"); + + // need to run test_import_small_file first + // open database struct FSRepo* fs_repo; - if (ipfs_repo_fsrepo_new(NULL, NULL, &fs_repo) == 0) { + if (ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo) == 0) { return 0; } if (ipfs_repo_fsrepo_open(fs_repo) == 0) { @@ -85,7 +89,7 @@ int test_datastore_list_journal() { } // open cursor struct lmdb_trans_cursor *crsr = NULL; - if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_handle, &crsr)) { + if (!lmdb_journalstore_cursor_open(fs_repo->config->datastore->datastore_context, &crsr, NULL)) { ipfs_repo_fsrepo_free(fs_repo); return 0; } @@ -96,7 +100,9 @@ int test_datastore_list_journal() { if (lmdb_journalstore_cursor_get(crsr, op, &record) == 0) { lmdb_journal_record_free(record); record = NULL; + break; } + recCount++; // display record libp2p_logger_debug("test_datastore", "Timestamp: %llu.\n", record->timestamp); libp2p_logger_debug("test_datastore", "Pin: %s.\n", record->pin == 1 ? "Y" : "N"); @@ -105,5 +111,6 @@ int test_datastore_list_journal() { record = NULL; op = CURSOR_NEXT; } while (record != NULL); + libp2p_logger_error("test_datastore", "Found %d records.\n", recCount); return 1; } From 0b113cb95d6cd2eb24ac4605ec40424b3dd49e86 Mon Sep 17 00:00:00 2001 From: Jose Marcial Vieira Bisneto Date: Thu, 7 Sep 2017 23:59:57 -0300 Subject: [PATCH 4/4] Initial implementation of multipart in API. --- core/api.c | 177 +++++++++++++++++++++++++++++++++------- include/ipfs/core/api.h | 2 + 2 files changed, 151 insertions(+), 28 deletions(-) diff --git a/core/api.c b/core/api.c index e296276..dd45ac2 100644 --- a/core/api.c +++ b/core/api.c @@ -1,6 +1,8 @@ /** * Methods for lightweight/specific HTTP for API communication. */ +#define _GNU_SOURCE +#define __USE_GNU #include #include #include @@ -178,6 +180,94 @@ int read_all(int fd, struct s_request *req, char *already, size_t alread_size) return 1; } +/** + * Find a token in a string array. + * @param string array and token string. + * @returns the pointer after where the token was found or NULL if it fails. + */ +char *str_tok(char *str, char *tok) +{ + char *p = strstr(str, tok); + if (p) { + p += strlen(tok); + while(*p == ' ') p++; + } + return p; +} + +/** + * Find a token in a binary array. + * @param array, size of array, token and size of token. + * @returns the pointer after where the token was found or NULL if it fails. + */ +char *bin_tok(char *bin, size_t limit, char *tok, size_t tok_size) +{ + char *p = memmem(bin, limit, tok, tok_size); + if (p) { + p += tok_size; + } + return p; +} + +/** + * Check if header contain a especific value. + * @param request structure, header name and value to check. + * @returns the pointer where the value was found or NULL if it fails. + */ +char *header_value_cmp(struct s_request *req, char *header, char *value) +{ + char *p = str_tok(req->buf + req->header, header); + if (p) { + if (strstart(p, value)) { + return p; + } + } + return NULL; +} + +/** + * Lookup for boundary at buffer string. + * @param body buffer string, boundary id, filename and content-type string. + * @returns the pointer where the multipart start. + */ +char *boundary_find(char *str, char *boundary, char **filename, char **contenttype) +{ + char *p = str_tok(str, "--"); + while (p) { + if (strstart(p, boundary)) { + // skip to the beginning, ignoring the header for now, if there is. + // TODO: return filename and content-type + p = strstr(p, "\r\n\r\n"); + if (p) { + return p + 4; // ignore 4 bytes CRLF 2x + } + break; + } + p = str_tok(str, "--"); + } + return NULL; +} + +/** + * Return the size of boundary. + * @param boundary buffer, boundary id. + * @returns the size of boundary or 0 if fails. + */ +size_t boundary_size(char *str, char *boundary, size_t limit) +{ + char *p = bin_tok(str, limit, "\r\n--", 4); + while (p) { + if (strstart(p, boundary)) { + if (cstrstart(p + strlen(boundary), "--\r\n")) { + p -= 4; + return (size_t)(p - str); + } + } + p = bin_tok(p, limit, "\r\n--", 4); + } + return 0; +} + /** * Pthread to take care of each client connection. * @param ptr is the connection index in api_list, integer not pointer, cast required. @@ -255,13 +345,8 @@ void *api_connection_thread (void *ptr) req.body = req.size; req.body_size = 0; - p = strstr(req.buf + req.header, "Transfer-Encoding:"); - if (p) { - p += strlen("Transfer-Encoding:"); - while(*p == ' ') p++; - if (cstrstart(p, "chunked\r\n") || strcmp(p, "chunked")==0) { - read_func = read_chunked; - } + if (header_value_cmp(&req, "Transfer-Encoding:", "chunked")) { + read_func = read_chunked; } if (!read_func(s, &req, body, r - (body - buf))) { @@ -270,29 +355,65 @@ void *api_connection_thread (void *ptr) goto quit; } - p = strstr(req.buf + req.header, "Accept-Encoding:"); - libp2p_logger_error("api", "method = '%s'\n" - "path = '%s'\n" - "http_ver = '%s'\n" - "header {\n%s\n}\n" - "body_size = %d\n", - req.buf+req.method, req.buf+req.path, req.buf+req.http_ver, - req.buf+req.header, req.body_size); - - snprintf(resp, sizeof(resp), "%s 200 OK\r\n" \ - "Content-Type: application/json\r\n" - "Server: c-ipfs/0.0.0-dev\r\n" - "X-Chunked-Output: 1\r\n" - "Connection: close\r\n" - "Transfer-Encoding: chunked\r\n\r\n", req.buf + req.http_ver); - write_str (s, resp); - libp2p_logger_error("api", "resp = {\n%s\n}\n", resp); - if (strcmp(req.buf + req.method, "GET")==0) { - // just an error message, because it's not used. + // just an error message, because it's not used yet. + // TODO: implement gateway requests and GUI (javascript) for API. write_dual (s, req.buf + req.http_ver, strchr (HTTP_404, ' ')); - //} else if (cstrstart(buf, "POST ")) { - // TODO: Handle chunked/gzip/form-data/json POST requests. + } else if (cstrstart(buf, "POST ")) { + // TODO: Handle gzip/json POST requests. + + p = header_value_cmp(&req, "Content-Type:", "multipart/form-data;"); + if (p) { + p = str_tok(p, "boundary="); + if (p) { + char *boundary, *l; + int len; + if (*p == '"') { + p++; + l = strchr(p, '"'); + } else { + l = p; + while (*l != '\r' && *l != '\0') l++; + } + len = l - p; + boundary = malloc (len+1); + if (boundary) { + memcpy(boundary, p, len); + boundary[len] = '\0'; + + p = boundary_find(req.buf + req.body, boundary, NULL, NULL); + if (p) { + req.boundary_size = boundary_size(p, boundary, req.size - (p - buf)); + if (req.boundary_size > 0) { + req.boundary = p - req.buf; + } + } + + free (boundary); + } + } + } + // TODO: Parse the path var and decide what to do with the received data. + if (req.boundary > 0) { + libp2p_logger_error("api", "boundary index = %d, size = %d\n", req.boundary, req.boundary_size); + } + + libp2p_logger_error("api", "method = '%s'\n" + "path = '%s'\n" + "http_ver = '%s'\n" + "header {\n%s\n}\n" + "body_size = %d\n", + req.buf+req.method, req.buf+req.path, req.buf+req.http_ver, + req.buf+req.header, req.body_size); + + snprintf(resp, sizeof(resp), "%s 200 OK\r\n" \ + "Content-Type: application/json\r\n" + "Server: c-ipfs/0.0.0-dev\r\n" + "X-Chunked-Output: 1\r\n" + "Connection: close\r\n" + "Transfer-Encoding: chunked\r\n\r\n", req.buf + req.http_ver); + write_str (s, resp); + libp2p_logger_error("api", "resp = {\n%s\n}\n", resp); } } else { libp2p_logger_error("api", "fail looking for body.\n"); diff --git a/include/ipfs/core/api.h b/include/ipfs/core/api.h index b5597c4..2d7c5b7 100644 --- a/include/ipfs/core/api.h +++ b/include/ipfs/core/api.h @@ -32,6 +32,8 @@ struct s_request { int header; int body; size_t body_size; + int boundary; + size_t boundary_size; }; #define HTTP_400 "HTTP/1.1 400 Bad Request\r\n" \