Storing large files

Files larger than about 200K are split into smaller files and stored in
the ipfs file system in blocks.
This commit is contained in:
jmjatlanta 2016-12-15 05:40:24 -05:00
parent 5168bc87e0
commit 34301c286e
9 changed files with 128 additions and 68 deletions

View file

@ -18,6 +18,7 @@
size_t ipfs_import_chunk(FILE* file, struct Node* node, struct FSRepo* fs_repo) {
unsigned char buffer[MAX_DATA_SIZE];
size_t bytes_read = fread(buffer, 1, MAX_DATA_SIZE, file);
if (node->data_size == 0) {
ipfs_node_set_data(node, buffer, bytes_read);
} else {
@ -26,7 +27,7 @@ size_t ipfs_import_chunk(FILE* file, struct Node* node, struct FSRepo* fs_repo)
ipfs_node_new_from_data(buffer, bytes_read, &new_node);
// persist
ipfs_merkledag_add(new_node, fs_repo);
// put link in node
// put link in parent node
struct NodeLink* new_link = NULL;
ipfs_node_link_new("", new_node->cached->hash, &new_link);
ipfs_node_add_link(node, new_link);

View file

@ -21,9 +21,7 @@ struct Datastore {
int (*datastore_open)(int argc, char** argv, struct Datastore* datastore);
int (*datastore_close)(struct Datastore* datastore);
int (*datastore_put)(const unsigned char* key, size_t key_size, unsigned char* data, size_t data_length, const struct Datastore* datastore);
int (*datastore_put_block)(const struct Block* block, const struct Datastore* datastore);
int (*datastore_get)(const char* key, size_t key_size, unsigned char* data, size_t max_data_length, size_t* data_length, const struct Datastore* datastore);
int (*datastore_get_block)(const struct Cid* cid, struct Block** block, const struct Datastore* datastore);
// a handle to the datastore "context" used by the datastore
void* handle;
};

View file

@ -13,6 +13,8 @@
int ipfs_merkledag_add(struct Node* node, struct FSRepo* fs_repo) {
// taken from merkledag.go line 59
struct Block* block = NULL;
// protobuf the node
size_t protobuf_len = ipfs_node_protobuf_encode_size(node);
size_t bytes_written = 0;
@ -20,7 +22,6 @@ int ipfs_merkledag_add(struct Node* node, struct FSRepo* fs_repo) {
ipfs_node_protobuf_encode(node, protobuf, protobuf_len, &bytes_written);
// turn the node into a block
struct Block* block;
ipfs_blocks_block_new(&block);
ipfs_blocks_block_add_data(protobuf, bytes_written, block);
@ -32,6 +33,8 @@ int ipfs_merkledag_add(struct Node* node, struct FSRepo* fs_repo) {
}
ipfs_node_set_cached(node, block->cid);
if (block != NULL)
ipfs_blocks_block_free(block);
// TODO: call HasBlock (unsure why as yet)

View file

@ -528,6 +528,7 @@ int ipfs_node_new_from_link(struct NodeLink * mylink, struct Node** node)
ipfs_node_add_link(*node, mylink);
(*node)->cached = NULL;
(*node)->data = NULL;
(*node)->data_size = 0;
(*node)->encoded = NULL;
return 1;
}

View file

@ -12,54 +12,6 @@
#include "lmdb.h"
#include "ipfs/repo/fsrepo/lmdb_datastore.h"
int repo_fsrepo_lmdb_get_block(const struct Cid* cid, struct Block** block, const struct Datastore* datastore) {
int retVal;
MDB_txn* mdb_txn;
MDB_dbi mdb_dbi;
struct MDB_val db_key;
struct MDB_val db_value;
MDB_env* mdb_env = (MDB_env*)datastore->handle;
if (mdb_env == NULL)
return 0;
// open transaction
retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn);
if (retVal != 0)
return 0;
retVal = mdb_dbi_open(mdb_txn, NULL, MDB_DUPSORT, &mdb_dbi);
if (retVal != 0) {
mdb_txn_commit(mdb_txn);
return 0;
}
// prepare data
db_key.mv_size = cid->hash_length;
db_key.mv_data = cid->hash;
retVal = mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value);
if (retVal != 0) {
mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 0;
}
// now copy the data
retVal = ipfs_blocks_block_new(block);
if (retVal == 0) {
mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 0;
}
retVal = ipfs_blocks_block_add_data(db_value.mv_data, db_value.mv_size, *block);
// clean up
mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 1;
}
/***
* retrieve a record from the database and put in a pre-sized buffer
* @param key the key to look for
@ -171,16 +123,6 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
return retVal;
}
/**
* Write a block to the datastore with the specified key
* @param block the block to be written
* @param datastore the datastore to write to
* @returns true(1) on success
*/
int repo_fsrepo_lmdb_put_block(const struct Block* block, const struct Datastore* datastore) {
return repo_fsrepo_lmdb_put(block->cid->hash, block->cid->hash_length, block->data, block->data_length, datastore);
}
/**
* Open an lmdb database with the given parameters.
* Note: for now, the parameters are not used
@ -229,9 +171,7 @@ int repo_fsrepo_lmdb_cast(struct Datastore* datastore) {
datastore->datastore_open = &repo_fsrepro_lmdb_open;
datastore->datastore_close = &repo_fsrepo_lmdb_close;
datastore->datastore_put = &repo_fsrepo_lmdb_put;
datastore->datastore_put_block = &repo_fsrepo_lmdb_put_block;
datastore->datastore_get = &repo_fsrepo_lmdb_get;
datastore->datastore_get_block = &repo_fsrepo_lmdb_get_block;
return 1;
}

View file

@ -190,6 +190,37 @@ int test_merkledag_add_data() {
return 1;
}
int test_merkledag_add_node() {
int retVal = 0;
struct Node* node1 = NULL;
struct FSRepo* fs_repo = createAndOpenRepo("/tmp/.ipfs");
if (fs_repo == NULL) {
printf("Unable to create repo\n");
return 0;
}
retVal = ipfs_node_new(&node1);
if (retVal == 0) {
printf("Unable to make node\n");
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
retVal = ipfs_merkledag_add(node1, fs_repo);
if (retVal == 0) {
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(node1);
printf("Unable to add node\n");
return 0;
}
ipfs_node_free(node1);
ipfs_repo_fsrepo_free(fs_repo);
return 1;
}
/**
* Should save links
*/
@ -197,6 +228,7 @@ int test_merkledag_add_node_with_links() {
int retVal = 0;
struct NodeLink* link = NULL;
struct Node* node1 = NULL;
struct Node* node2 = NULL;
struct FSRepo* fs_repo = createAndOpenRepo("/tmp/.ipfs");
if (fs_repo == NULL) {
@ -206,7 +238,17 @@ int test_merkledag_add_node_with_links() {
// make link
retVal = ipfs_node_link_new("", (unsigned char*)"abc123", &link);
if (retVal == 0) {
printf("Unable to make new link\n");
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
retVal = ipfs_node_new_from_link(link, &node1);
if (retVal == 0) {
printf("Unable to make node\n");
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
retVal = ipfs_merkledag_add(node1, fs_repo);
if (retVal == 0) {
@ -217,7 +259,6 @@ int test_merkledag_add_node_with_links() {
}
// now look for it
struct Node* node2 = NULL;
retVal = ipfs_merkledag_get(node1->cached, &node2, fs_repo);
if (retVal == 0) {
ipfs_repo_fsrepo_free(fs_repo);
@ -227,6 +268,14 @@ int test_merkledag_add_node_with_links() {
struct NodeLink* node1_link = node1->head_link;
struct NodeLink* node2_link = node2->head_link;
if (node1_link->cid->hash_length != node2_link->cid->hash_length) {
printf("Hashes are not of the same length. Hash1: %lu, Hash2: %lu\n", node1_link->cid->hash_length, node2_link->cid->hash_length);
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(node1);
ipfs_node_free(node2);
return 0;
}
while(node1_link != NULL) {
for(int i = 0; i < node1_link->cid->hash_length; i++) {
if(node1_link->cid->hash[i] != node2_link->cid->hash[i]) {

View file

@ -24,6 +24,67 @@ int create_bytes(unsigned char* buffer, size_t num_bytes) {
return 1;
}
int test_import_large_file() {
size_t bytes_size = 1000000; //1mb
unsigned char file_bytes[bytes_size];
const char* fileName = "/tmp/test_import_large.tmp";
// create the necessary file
create_bytes(file_bytes, bytes_size);
create_file(fileName, file_bytes, bytes_size);
// get the repo
drop_and_build_repository("/tmp/.ipfs");
struct FSRepo* fs_repo;
ipfs_repo_fsrepo_new("/tmp/.ipfs", NULL, &fs_repo);
ipfs_repo_fsrepo_open(fs_repo);
// write to ipfs
struct Node* write_node;
if (ipfs_import_file(fileName, &write_node, fs_repo) == 0) {
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
// make sure all went okay
struct Node* read_node;
if (ipfs_merkledag_get(write_node->cached, &read_node, fs_repo) == 0) {
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
return 0;
}
// compare data
if (write_node->data_size != read_node->data_size) {
printf("Data size of nodes are not equal. Should be %lu but are %lu\n", write_node->data_size, read_node->data_size);
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
ipfs_node_free(read_node);
return 0;
}
for(int i = 0; i < write_node->data_size; i++) {
if (write_node->data[i] != read_node->data[i]) {
printf("Data within node is different at position %d. The value should be %02x, but was %02x.\n", i, write_node->data[i], read_node->data[i]);
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
ipfs_node_free(read_node);
return 0;
}
}
// attempt to write file
// compare original with new
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
ipfs_node_free(read_node);
return 1;
}
int test_import_small_file() {
size_t bytes_size = 1000;
unsigned char file_bytes[bytes_size];

View file

@ -37,7 +37,7 @@ int test_repo_fsrepo_write_read_block() {
return 0;
// make some data
size_t data_size = 100;
size_t data_size = 10000;
unsigned char data[data_size];
int counter = 0;

View file

@ -35,6 +35,7 @@ const char* names[] = {
"test_repo_fsrepo_write_read_block",
"test_get_init_command",
"test_import_small_file",
"test_import_large_file",
"test_repo_fsrepo_open_config",
"test_flatfs_get_directory",
"test_flatfs_get_filename",
@ -48,6 +49,7 @@ const char* names[] = {
"test_node_encode_decode",
"test_merkledag_add_data",
"test_merkledag_get_data",
"test_merkledag_add_node",
"test_merkledag_add_node_with_links"
};
@ -64,6 +66,7 @@ int (*funcs[])(void) = {
test_repo_fsrepo_write_read_block,
test_get_init_command,
test_import_small_file,
test_import_large_file,
test_repo_fsrepo_open_config,
test_flatfs_get_directory,
test_flatfs_get_filename,
@ -77,6 +80,7 @@ int (*funcs[])(void) = {
test_node_encode_decode,
test_merkledag_add_data,
test_merkledag_get_data,
test_merkledag_add_node,
test_merkledag_add_node_with_links
};
@ -98,6 +102,10 @@ int main(int argc, char** argv) {
test_wanted = argv[1];
}
int array_length = sizeof(funcs) / sizeof(funcs[0]);
int array2_length = sizeof(names) / sizeof(names[0]);
if (array_length != array2_length) {
printf("Test arrays are not of the same length. Funcs: %d, Names: %d\n", array_length, array2_length);
}
for (int i = 0; i < array_length; i++) {
if (only_one) {
const char* currName = names[i];
@ -111,7 +119,6 @@ int main(int argc, char** argv) {
tests_ran++;
counter += testit(names[i], funcs[i]);
}
}
if (tests_ran == 0)