Persisting large files in chunks

This commit is contained in:
jmjatlanta 2016-12-15 12:38:08 -05:00
parent 34301c286e
commit 1d63cdb4a1
14 changed files with 228 additions and 30 deletions

View file

@ -9,6 +9,7 @@
#include "ipfs/cid/cid.h"
#include "libp2p/crypto/encoding/base58.h"
#include "ipfs/multibase/multibase.h"
#include "mh/hashes.h"
#include "mh/multihash.h"
#include "varint.h"
@ -103,15 +104,19 @@ int ipfs_cid_new(int version, unsigned char* hash, size_t hash_length, const cha
struct Cid* cid = *ptrToCid;
if (cid == NULL)
return 0;
cid->hash = malloc(sizeof(unsigned char) * hash_length);
if (cid->hash == NULL) {
free(cid);
return 0;
if (hash == NULL) {
cid->hash = NULL;
} else {
cid->hash = malloc(sizeof(unsigned char) * hash_length);
if (cid->hash == NULL) {
free(cid);
return 0;
}
memcpy(cid->hash, hash, hash_length);
}
// assign values
cid->version = version;
cid->codec = codec;
memcpy(cid->hash, hash, hash_length);
cid->hash_length = hash_length;
return 1;
@ -131,13 +136,13 @@ int ipfs_cid_free(struct Cid* cid) {
}
/***
* Fill a Cid struct based on a base 58 encoded string
* Fill a Cid struct based on a base 58 encoded multihash
* @param incoming the string
* @param incoming_size the size of the string
* @cid the Cid struct to fill
* @return true(1) on success
*/
int ipfs_cid_decode_from_string(const unsigned char* incoming, size_t incoming_length, struct Cid** cid) {
int ipfs_cid_decode_hash_from_base58(const unsigned char* incoming, size_t incoming_length, struct Cid** cid) {
int retVal = 0;
if (incoming_length < 2)
@ -152,7 +157,7 @@ int ipfs_cid_decode_from_string(const unsigned char* incoming, size_t incoming_l
if (retVal == 0)
return 0;
// now we have the hash, build the object
return ipfs_cid_new(0, hash, hash_length, CID_PROTOBUF, cid);
return ipfs_cid_new(0, &hash[2], hash_length - 2, CID_PROTOBUF, cid);
}
// TODO: finish this
@ -174,15 +179,43 @@ int ipfs_cid_decode_from_string(const unsigned char* incoming, size_t incoming_l
return 0;
}
/**
* Turn a cid into a base 58
* @param cid the cid to work with
* @param buffer where to put the results
* @param max_buffer_length the maximum space reserved for the results
* @returns true(1) on success
*/
int ipfs_cid_hash_to_base58(struct Cid* cid, unsigned char* buffer, size_t max_buffer_length) {
int multihash_len = cid->hash_length + 2;
unsigned char multihash[multihash_len];
if (mh_new(multihash, MH_H_SHA2_256, cid->hash, cid->hash_length) < 0) {
return 0;
}
// base58
size_t b58_size = libp2p_crypto_encoding_base58_encode_size(multihash_len);
if (b58_size > max_buffer_length) // buffer too small
return 0;
if( libp2p_crypto_encoding_base58_encode(multihash, multihash_len, &buffer, &max_buffer_length) == 0) {
return 0;
}
return 1;
}
/***
* Turn a multibase decoded string of bytes into a Cid struct
* @param incoming the multibase decoded array
* @param incoming_size the size of the array
* @param cid the Cid structure to fill
*/
int ipfs_cid_cast(unsigned char* incoming, size_t incoming_size, struct Cid* cid) {
// this is a multihash
int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Cid* cid) {
if (incoming_size == 34 && incoming[0] == 18 && incoming[1] == 32) {
// this is a multihash
cid->hash_length = mh_multihash_length(incoming, incoming_size);
cid->codec = CID_PROTOBUF;
cid->version = 0;
@ -208,7 +241,7 @@ int ipfs_cid_cast(unsigned char* incoming, size_t incoming_size, struct Cid* cid
pos += num_bytes;
// now what is left
cid->hash_length = incoming_size - pos;
cid->hash = &incoming[pos];
cid->hash = (unsigned char*)(&incoming[pos]);
return 1;
}

View file

@ -7,7 +7,7 @@ endif
LFLAGS =
DEPS =
OBJS = importer.o
OBJS = importer.o exporter.o
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)

68
importer/exporter.c Normal file
View file

@ -0,0 +1,68 @@
#include <stdio.h>
#include <string.h>
#include "ipfs/cid/cid.h"
#include "ipfs/merkledag/merkledag.h"
#include "ipfs/node/node.h"
#include "ipfs/repo/fsrepo/fs_repo.h"
/**
* pull objects from ipfs
*/
/**
* get a file by its hash, and write the data to a file
* @param hash the base58 multihash of the cid
* @param file_name the file name to write to
* @returns true(1) on success
*/
int ipfs_exporter_to_file(const unsigned char* hash, const char* file_name, const struct FSRepo* fs_repo) {
// convert hash to cid
struct Cid* cid = NULL;
if ( ipfs_cid_decode_hash_from_base58(hash, strlen((char*)hash), &cid) == 0) {
return 0;
}
// find block
struct Node* read_node = NULL;
if (ipfs_merkledag_get(cid, &read_node, fs_repo) == 0) {
ipfs_cid_free(cid);
return 0;
}
// process blocks
FILE* file = fopen(file_name, "wb");
if (file == NULL) {
ipfs_node_free(read_node);
return 0;
}
size_t bytes_written = fwrite(read_node->data, 1, read_node->data_size, file);
if (bytes_written != read_node->data_size) {
fclose(file);
ipfs_node_free(read_node);
return 0;
}
struct NodeLink* link = read_node->head_link;
struct Node* link_node = NULL;
while (link != NULL) {
if ( ipfs_merkledag_get(link->cid, &link_node, fs_repo) == 0) {
fclose(file);
ipfs_node_free(read_node);
return 0;
}
bytes_written = fwrite(link_node->data, 1, link_node->data_size, file);
ipfs_node_free(link_node);
if (bytes_written != link_node->data_size) {
fclose(file);
ipfs_node_free(read_node);
return 0;
}
link = link->next;
}
fclose(file);
if (read_node != NULL)
ipfs_node_free(read_node);
return 1;
}

View file

@ -29,7 +29,7 @@ size_t ipfs_import_chunk(FILE* file, struct Node* node, struct FSRepo* fs_repo)
ipfs_merkledag_add(new_node, fs_repo);
// put link in parent node
struct NodeLink* new_link = NULL;
ipfs_node_link_new("", new_node->cached->hash, &new_link);
ipfs_node_link_new("", new_node->cached->hash, new_node->cached->hash_length, &new_link);
ipfs_node_add_link(node, new_link);
ipfs_node_free(new_node);
}

View file

@ -76,7 +76,16 @@ int ipfs_cid_free(struct Cid* cid);
* @cid the Cid struct to fill
* @return true(1) on success
*/
int ipfs_cid_decode_from_string(const unsigned char* incoming, size_t incoming_length, struct Cid** cid);
int ipfs_cid_decode_hash_from_base58(const unsigned char* incoming, size_t incoming_length, struct Cid** cid);
/**
* Turn a cid into a base 58 of a multihash of the cid hash
* @param cid the cid to work with
* @param buffer where to put the results
* @param max_buffer_length the maximum space reserved for the results
* @returns true(1) on success
*/
int ipfs_cid_hash_to_base58(struct Cid* cid, unsigned char* buffer, size_t max_buffer_length);
/***
* Turn a multibase decoded string of bytes into a Cid struct
@ -84,6 +93,6 @@ int ipfs_cid_decode_from_string(const unsigned char* incoming, size_t incoming_l
* @param incoming_size the size of the array
* @param cid the Cid structure to fill
*/
int ipfs_cid_cast(unsigned char* incoming, size_t incoming_size, struct Cid* cid);
int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Cid* cid);
#endif

View file

@ -0,0 +1,9 @@
#pragma once
/**
* get a file by its hash, and write the data to a file
* @param hash the base58 multihash of the cid
* @param file_name the file name to write to
* @returns true(1) on success
*/
int ipfs_exporter_to_file(const unsigned char* hash, const char* file_name, const struct FSRepo* fs_repo);

View file

@ -42,10 +42,11 @@ struct Node
/* Create_Link
* @Param name: The name of the link (char *)
* @Param ahash: An Qmhash
* @param hash_size the size of the hash
* @param node_link a pointer to the new struct NodeLink
* @returns true(1) on success
*/
int ipfs_node_link_new(char * name, unsigned char * ahash, struct NodeLink** node_link);
int ipfs_node_link_new(char * name, unsigned char * ahash, size_t hash_size, struct NodeLink** node_link);
/* ipfs_node_link_free
* @param L: Free the link you have allocated.

View file

@ -65,7 +65,7 @@ int ipfs_merkledag_get(const struct Cid* cid, struct Node** node, const struct F
// now convert the block into a node
ipfs_node_protobuf_decode(block->data, block->data_length, node);
// doesn't decode do this?
// doesn't decode do this? No
ipfs_node_set_cached(*node, cid);
// free resources

View file

@ -24,7 +24,7 @@ enum WireType ipfs_node_link_message_fields[] = { WIRETYPE_LENGTH_DELIMITED, WIR
* @Param size: Size of the link (size_t)
* @Param ahash: An Qmhash
*/
int ipfs_node_link_new(char * name, unsigned char * ahash, struct NodeLink** node_link)
int ipfs_node_link_new(char * name, unsigned char * ahash, size_t hash_size, struct NodeLink** node_link)
{
*node_link = malloc(sizeof(struct NodeLink));
if (*node_link == NULL)
@ -37,8 +37,7 @@ int ipfs_node_link_new(char * name, unsigned char * ahash, struct NodeLink** nod
strcpy((*node_link)->name, name);
(*node_link)->next = NULL;
int ver = 0;
size_t lenhash = strlen((char*)ahash);
if (ipfs_cid_new(ver, ahash, lenhash, CID_PROTOBUF, &(*node_link)->cid) == 0) {
if (ipfs_cid_new(ver, ahash, hash_size, CID_PROTOBUF, &(*node_link)->cid) == 0) {
free(*node_link);
return 0;
}

View file

@ -47,16 +47,18 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data,
db_key.mv_size = key_size;
db_key.mv_data = (char*)key;
//printf("Looking for data that has a key size of %lu that starts with %02x and ends with %02x\n", db_key.mv_size, ((char*)db_key.mv_data)[0], ((char*)db_key.mv_data)[db_key.mv_size - 1]);
retVal = mdb_get(mdb_txn, mdb_dbi, &db_key, &db_value);
if (retVal != 0) {
mdb_dbi_close(mdb_env, mdb_dbi);
//mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 0;
}
// now copy the data
if (db_value.mv_size > max_data_size) {
mdb_dbi_close(mdb_env, mdb_dbi);
//mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 0;
}
@ -66,7 +68,7 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data,
(*data_size) = db_value.mv_size;
// clean up
mdb_dbi_close(mdb_env, mdb_dbi);
//mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return 1;
@ -104,6 +106,9 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
db_key.mv_size = key_size;
db_key.mv_data = (char*)key;
// JMJ debugging
//printf("Saving key of %lu bytes that starts with %02x and ends with %02x\n", db_key.mv_size, ((char*)db_key.mv_data)[0], ((char*)db_key.mv_data)[db_key.mv_size-1]);
// write
db_value.mv_size = data_size;
db_value.mv_data = data;
@ -118,7 +123,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
}
// cleanup
mdb_dbi_close(mdb_env, mdb_dbi);
//mdb_dbi_close(mdb_env, mdb_dbi);
mdb_txn_commit(mdb_txn);
return retVal;
}

View file

@ -11,6 +11,7 @@ OBJS = testit.o test_helper.o \
../datastore/ds_helper.o \
../flatfs/flatfs.o \
../importer/importer.o \
../importer/exporter.o \
../merkledag/merkledag.o \
../multibase/multibase.o \
../node/node.o \

View file

@ -237,7 +237,7 @@ int test_merkledag_add_node_with_links() {
}
// make link
retVal = ipfs_node_link_new("", (unsigned char*)"abc123", &link);
retVal = ipfs_node_link_new("", (unsigned char*)"abc123", 6, &link);
if (retVal == 0) {
printf("Unable to make new link\n");
ipfs_repo_fsrepo_free(fs_repo);

View file

@ -1,7 +1,11 @@
#include <stdio.h>
#include "ipfs/importer/importer.h"
#include "ipfs/importer/exporter.h"
#include "ipfs/merkledag/merkledag.h"
#include "mh/hashes.h"
#include "mh/multihash.h"
#include "libp2p/crypto/encoding/base58.h"
/***
* Helper to create a test file in the OS
@ -46,6 +50,25 @@ int test_import_large_file() {
return 0;
}
// cid should be the same each time
unsigned char cid_test[10] = { 0xec ,0x79 ,0x18 ,0x4c, 0xe8, 0xb0, 0x66, 0x39, 0xaa, 0xed };
/*
for (int i = 0; i < 10; i++) {
printf(" %02x ", write_node->cached->hash[i]);
}
printf("\n");
*/
for(int i = 0; i < 10; i++) {
if (write_node->cached->hash[i] != cid_test[i]) {
printf("Hashes should be the same each time, and do not match at position %d, should be %02x but is %02x\n", i, cid_test[i], write_node->cached->hash[i]);
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
}
// make sure all went okay
struct Node* read_node;
if (ipfs_merkledag_get(write_node->cached, &read_node, fs_repo) == 0) {
@ -54,6 +77,17 @@ int test_import_large_file() {
return 0;
}
// the second block should be there
struct Node* read_node2;
if (ipfs_merkledag_get(read_node->head_link->cid, &read_node2, fs_repo) == 0) {
printf("Unable to find the linked node.\n");
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
return 0;
}
ipfs_node_free(read_node2);
// compare data
if (write_node->data_size != read_node->data_size) {
printf("Data size of nodes are not equal. Should be %lu but are %lu\n", write_node->data_size, read_node->data_size);
@ -73,9 +107,31 @@ int test_import_large_file() {
}
}
// convert cid to multihash
size_t base58_size = 55;
unsigned char base58[base58_size];
if ( ipfs_cid_hash_to_base58(read_node->cached, base58, base58_size) == 0) {
printf("Unable to convert cid to multihash\n");
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
ipfs_node_free(read_node);
return 0;
}
// attempt to write file
if (ipfs_exporter_to_file(base58, "/tmp/test_import_large_file.rsl", fs_repo) == 0) {
printf("Unable to write file.\n");
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
ipfs_node_free(read_node);
return 0;
}
// compare original with new
size_t new_file_size = os_utils_file_size("/tmp/test_import_large_file.rsl");
if (new_file_size != bytes_size) {
printf("File sizes are different. Should be %lu but the new one is %lu\n", bytes_size, new_file_size);
}
ipfs_repo_fsrepo_free(fs_repo);
ipfs_node_free(write_node);
@ -107,6 +163,23 @@ int test_import_small_file() {
return 0;
}
// cid should be the same each time
unsigned char cid_test[10] = { 0x47,0x51,0x40,0x0a, 0xdf, 0x62, 0xf9, 0xcc, 0x8d, 0xbb };
/**
for (int i = 0; i < 10; i++) {
printf("%02x\n", write_node->cached->hash[i]);
}
*/
for(int i = 0; i < 10; i++) {
if (write_node->cached->hash[i] != cid_test[i]) {
printf("Hashes do not match at position %d, should be %02x but is %02x\n", i, cid_test[i], write_node->cached->hash[i]);
ipfs_repo_fsrepo_free(fs_repo);
return 0;
}
}
// make sure all went okay
struct Node* read_node;
if (ipfs_merkledag_get(write_node->cached, &read_node, fs_repo) == 0) {

View file

@ -5,7 +5,7 @@ int test_node() {
char * name = "Alex";
unsigned char * ahash = (unsigned char*)"QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnPbdG";
struct NodeLink * mylink;
int retVal = ipfs_node_link_new(name,ahash, &mylink);
int retVal = ipfs_node_link_new(name,ahash, strlen((char*)ahash), &mylink);
if (retVal == 0)
return 0;
@ -13,7 +13,7 @@ int test_node() {
char * name2 = "Simo";
unsigned char * ahash2 = (unsigned char*)"QmYwAPJzv5CZsnA625s3Xf2nemtYgPpHdWEz79ojWnSimo";
struct NodeLink * mylink2;
retVal = ipfs_node_link_new(name2, ahash2, &mylink2);
retVal = ipfs_node_link_new(name2, ahash2, strlen((char*)ahash2), &mylink2);
//Nodes
struct Node * Mynode;
@ -59,7 +59,7 @@ int test_node_link_encode_decode() {
int retVal = 0;
// make a NodeLink
if (ipfs_node_link_new("My Name", (unsigned char*)"QmMyHash", &control) == 0)
if (ipfs_node_link_new("My Name", (unsigned char*)"QmMyHash", 8, &control) == 0)
goto l_exit;
// encode it
@ -107,14 +107,14 @@ int test_node_encode_decode() {
goto ed_exit;
// first link
if (ipfs_node_link_new((char*)"Link1", (unsigned char*)"QmLink1", &link1) == 0)
if (ipfs_node_link_new((char*)"Link1", (unsigned char*)"QmLink1", 7, &link1) == 0)
goto ed_exit;
if ( ipfs_node_add_link(control, link1) == 0)
goto ed_exit;
// second link
if (ipfs_node_link_new((char*)"Link2", (unsigned char*)"QmLink2", &link2) == 0)
if (ipfs_node_link_new((char*)"Link2", (unsigned char*)"QmLink2", 7, &link2) == 0)
goto ed_exit;
if ( ipfs_node_add_link(control, link2) == 0)
goto ed_exit;