Began implementing updates to journal timestamp

yamux
jmjatlanta 2017-09-13 12:40:48 -05:00
parent 478fa403fd
commit 3eec8553a6
17 changed files with 181 additions and 120 deletions

View File

@ -147,7 +147,7 @@ int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid
* @param block the block to store
* @returns true(1) on success
*/
int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block) {
int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block, size_t* bytes_written) {
// from blockstore.go line 118
int retVal = 0;
@ -177,9 +177,9 @@ int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* b
}
FILE* file = fopen(filename, "wb");
int bytes_written = fwrite(protobuf, 1, protobuf_len, file);
*bytes_written = fwrite(protobuf, 1, protobuf_len, file);
fclose(file);
if (bytes_written != protobuf_len) {
if (*bytes_written != protobuf_len) {
free(key);
free(filename);
return 0;

View File

@ -79,6 +79,8 @@ int ipfs_cid_set_add (struct CidSet *set, struct Cid *cid, int visit)
}
set = set->next;
}
//this should never get hit
return 0;
}
int ipfs_cid_set_has (struct CidSet *set, struct Cid *cid)
@ -96,6 +98,7 @@ int ipfs_cid_set_has (struct CidSet *set, struct Cid *cid)
}
set = set->next;
}
return 0;
}
int ipfs_cid_set_remove (struct CidSet *set, struct Cid *cid)
@ -128,6 +131,7 @@ int ipfs_cid_set_remove (struct CidSet *set, struct Cid *cid)
prev = set;
set = set->next;
}
return 0;
}
int ipfs_cid_set_len (struct CidSet *set)

View File

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../lmdb/libraries/liblmdb -Wall
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../lmdb/libraries/liblmdb -I../../c-protobuf -Wall
ifdef DEBUG
CFLAGS += -g3

View File

@ -2,6 +2,7 @@
* Some code to help with the datastore / blockstore interface
* NOTE: the datastore stores things under a multihash key
*/
#include <stdlib.h>
#include "libp2p/crypto/encoding/base32.h"
#include "ipfs/datastore/ds_helper.h"
/**
@ -54,3 +55,41 @@ int ipfs_datastore_helper_binary_from_ds_key(const unsigned char* ds_key, size_t
}
return 1;
}
/***
* Add a record in the datastore based on a block
* @param block the block
* @param datastore the Datastore
* @reutrns true(1) on success, false(0) otherwise
*/
int ipfs_datastore_helper_add_block_to_datastore(struct Block* block, struct Datastore* datastore) {
struct DatastoreRecord* rec = libp2p_datastore_record_new();
if (rec == NULL)
return 0;
rec->key_size = block->cid->hash_length;
rec->key = (uint8_t*) malloc(rec->key_size);
if (rec->key == NULL) {
libp2p_datastore_record_free(rec);
return 0;
}
memcpy(rec->key, block->cid->hash, rec->key_size);
rec->timestamp = 0;
// convert the key to base32, and store it in the DatabaseRecord->value section
size_t fs_key_length = 100;
uint8_t fs_key[fs_key_length];
if (!ipfs_datastore_helper_ds_key_from_binary(block->cid->hash, block->cid->hash_length, fs_key, fs_key_length, &fs_key_length)) {
libp2p_datastore_record_free(rec);
return 0;
}
rec->value_size = fs_key_length;
rec->value = malloc(rec->value_size);
if (rec->value == NULL) {
libp2p_datastore_record_free(rec);
return 0;
}
memcpy(rec->value, fs_key, rec->value_size);
int retVal = datastore->datastore_put(rec, datastore);
libp2p_datastore_record_free(rec);
return retVal;
}

View File

@ -6,6 +6,7 @@
#include "libp2p/os/utils.h"
#include "libp2p/utils/logger.h"
#include "ipfs/core/ipfs_node.h"
#include "ipfs/datastore/ds_helper.h"
#include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/bitswap.h"
#include "ipfs/exchange/bitswap/message.h"
@ -140,8 +141,10 @@ int ipfs_bitswap_close(struct Exchange* exchange) {
int ipfs_bitswap_has_block(struct Exchange* exchange, struct Block* block) {
// add the block to the blockstore
struct BitswapContext* context = exchange->exchangeContext;
context->ipfsNode->blockstore->Put(context->ipfsNode->blockstore->blockstoreContext, block);
context->ipfsNode->repo->config->datastore->datastore_put(block->cid->hash, block->cid->hash_length, block->data, block->data_length, context->ipfsNode->repo->config->datastore);
size_t bytes_written;
context->ipfsNode->blockstore->Put(context->ipfsNode->blockstore->blockstoreContext, block, &bytes_written);
// add it to the datastore
ipfs_datastore_helper_add_block_to_datastore(block, context->ipfsNode->repo->config->datastore);
// update requests
struct WantListQueueEntry* queueEntry = ipfs_bitswap_wantlist_queue_find(context->localWantlist, block->cid);
if (queueEntry != NULL) {

View File

@ -20,7 +20,7 @@ struct Blockstore {
* Retrieve a block from the blockstore
*/
int (*Get)(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block);
int (*Put)(const struct BlockstoreContext* context, struct Block* block);
int (*Put)(const struct BlockstoreContext* context, struct Block* block, size_t* bytes_written);
};
/***
@ -67,7 +67,7 @@ int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid
* @param block the block to store
* @returns true(1) on success
*/
int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block);
int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block, size_t* bytes_written);
/***
* Put a struct UnixFS in the blockstore

View File

@ -1,10 +1,12 @@
#pragma once
/**
* Some code to help with the datastore / blockstore interface
*/
#ifndef __IPFS_DATASTORE_DS_HELPER_H__
#define __IPFS_DATASTORE_DS_HELPER_H__
#include <string.h>
#include "ipfs/blocks/block.h"
#include "libp2p/db/datastore.h"
/**
* Generate a key based on the passed in binary_array
@ -30,4 +32,10 @@ int ipfs_datastore_helper_ds_key_from_binary(const unsigned char* binary_array,
int ipfs_datastore_helper_binary_from_ds_key(const unsigned char* ds_key, size_t key_length, unsigned char* binary_array,
size_t max_binary_array_length, size_t* completed_binary_array_length);
#endif
/***
* Add a record in the datastore based on a block
* @param block the block
* @param datastore the Datastore
* @reutrns true(1) on success, false(0) otherwise
*/
int ipfs_datastore_helper_add_block_to_datastore(struct Block* block, struct Datastore* datastore);

View File

@ -73,7 +73,7 @@ int ipfs_repo_fsrepo_init(struct FSRepo* config);
* @param fs_repo the repo to write to
* @returns true(1) on success
*/
int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_repo);
int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_repo, size_t* bytes_written);
int ipfs_repo_fsrepo_block_read(const unsigned char* hash, size_t hash_length, struct Block** block, const struct FSRepo* fs_repo);
/***

View File

@ -243,7 +243,8 @@ int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage*
libp2p_utils_vector_add(todos, td);
} else {
// do we need to adjust the time?
if (datastore_record->timestamp != entry->timestamp) {
if ( (datastore_record->timestamp == 0 && entry->timestamp != 0) ||
(entry->timestamp != 0 && entry->timestamp < datastore_record->timestamp) ) {
struct JournalToDo* td = ipfs_journal_todo_new();
td->action = JOURNAL_TIME_ADJUST;
td->hash = entry->hash;
@ -261,6 +262,36 @@ int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage*
return 0;
}
/***
* Adjust the time in the journal
* @param todo the JournalToDo struct that contains the new time
* @param local_node the context
* @returns true(1) if success, or if no change was needed, false(0) if there was an error
*/
int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_node) {
// grab the datastore record
struct DatastoreRecord* datastore_record = NULL;
if (!local_node->repo->config->datastore->datastore_get(todo->hash, todo->hash_size, &datastore_record, local_node->repo->config->datastore)) {
// did not find the record
libp2p_logger_error("journal", "Attempted time_adjust, but could not find the hash.\n");
return 0;
}
// record found
if (todo->remote_timestamp != 0) {
if ( datastore_record == 0 || datastore_record->timestamp > todo->remote_timestamp) {
datastore_record->timestamp = todo->remote_timestamp;
} else {
// we don't need to change the time
return 1;
}
}
if (!local_node->repo->config->datastore->datastore_put(datastore_record, local_node->repo->config->datastore)) {
libp2p_logger_error("journal", "Attempted time_adjust put, but failed.\n");
return 0;
}
return 1;
}
/***
* Handles a message
* @param incoming the message
@ -317,21 +348,17 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
// go get a file
struct Block* block = NULL;
struct Cid* cid = ipfs_cid_new(0, curr->hash, curr->hash_size, CID_DAG_PROTOBUF);
// debugging
char* str = NULL;
libp2p_logger_debug("journal", "Looking for block %s.\n", ipfs_cid_to_string(cid, &str));
if (str != NULL)
free(str);
if (local_node->exchange->GetBlockAsync(local_node->exchange, cid, &block)) {
// set timestamp
// set timestamp (if we got the block already, but we probably didn't)
if (block != NULL)
ipfs_journal_adjust_time(curr, local_node);
}
ipfs_cid_free(cid);
ipfs_block_free(block);
}
break;
case (JOURNAL_TIME_ADJUST): {
ipfs_journal_adjust_time(curr, local_node);
}
break;
case (JOURNAL_REMOTE_NEEDS): {

View File

@ -11,6 +11,37 @@
#include "ipfs/merkledag/merkledag.h"
#include "ipfs/unixfs/unixfs.h"
/***
* Convert a HashtableNode into a Block
* @param node the node to convert
* @param blockResult where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_merkledag_convert_node_to_block(struct HashtableNode* node, struct Block** blockResult) {
*blockResult = ipfs_block_new();
if (*blockResult == NULL)
return 0;
struct Block* block = *blockResult;
block->cid = ipfs_cid_new(1, node->hash, node->hash_size, CID_DAG_PROTOBUF);
if (block->cid == NULL) {
ipfs_block_free(block);
*blockResult = NULL;
return 0;
}
block->data_length = ipfs_hashtable_node_protobuf_encode_size(node);
block->data = malloc(block->data_length);
if (block->data == NULL) {
ipfs_block_free(block);
*blockResult = NULL;
return 0;
}
if (!ipfs_hashtable_node_protobuf_encode(node, block->data, block->data_length, &block->data_length)) {
ipfs_block_free(block);
*blockResult = NULL;
return 0;
}
return 1;
}
/***
* Adds a node to the dagService and blockService
@ -42,8 +73,13 @@ int ipfs_merkledag_add(struct HashtableNode* node, struct FSRepo* fs_repo, size_
}
// write to block store & datastore
retVal = ipfs_repo_fsrepo_node_write(node, fs_repo, bytes_written);
struct Block* block = NULL;
if (!ipfs_merkledag_convert_node_to_block(node, &block)) {
return 0;
}
retVal = ipfs_repo_fsrepo_block_write(block, fs_repo, bytes_written);
if (retVal == 0) {
ipfs_block_free(block);
return 0;
}

View File

@ -707,7 +707,7 @@ int fs_repo_write_config_file(char* path, struct RepoConfig* config) {
* @param fs_repo the repo to write to
* @returns true(1) on success
*/
int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_repo) {
int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_repo, size_t* bytes_written) {
/**
* What is put in the blockstore is the block.
* What is put in the datastore is the multihash (the Cid) as the key,
@ -717,75 +717,11 @@ int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_re
struct Blockstore* blockstore = ipfs_blockstore_new(fs_repo);
if (blockstore == NULL)
return 0;
retVal = ipfs_blockstore_put(blockstore->blockstoreContext, block);
retVal = ipfs_blockstore_put(blockstore->blockstoreContext, block, bytes_written);
ipfs_blockstore_free(blockstore);
if (retVal == 0)
return 0;
// take the cid, base32 it, and send both to the datastore
size_t fs_key_length = 100;
unsigned char fs_key[fs_key_length];
retVal = ipfs_datastore_helper_ds_key_from_binary(block->cid->hash, block->cid->hash_length, fs_key, fs_key_length, &fs_key_length);
if (retVal == 0)
return 0;
retVal = fs_repo->config->datastore->datastore_put(block->cid->hash, block->cid->hash_length, fs_key, fs_key_length, fs_repo->config->datastore);
if (retVal == 0)
return 0;
return 1;
}
/***
* Write a unixfs to the datastore and blockstore
* @param unix_fs the struct to write
* @param fs_repo the repo to write to
* @param bytes_written number of bytes written to the repo
* @returns true(1) on success
*/
int ipfs_repo_fsrepo_unixfs_write(const struct UnixFS* unix_fs, const struct FSRepo* fs_repo, size_t* bytes_written) {
/**
* What is put in the blockstore is the block.
* What is put in the datastore is the multihash (the Cid) as the key,
* and the base32 encoded multihash as the value.
*/
int retVal = 1;
retVal = ipfs_blockstore_put_unixfs(unix_fs, fs_repo, bytes_written);
if (retVal == 0)
return 0;
// take the hash, base32 it, and send both to the datastore
size_t fs_key_length = 100;
unsigned char fs_key[fs_key_length];
retVal = ipfs_datastore_helper_ds_key_from_binary(unix_fs->hash, unix_fs->hash_length, fs_key, fs_key_length, &fs_key_length);
if (retVal == 0)
return 0;
retVal = fs_repo->config->datastore->datastore_put(unix_fs->hash, unix_fs->hash_length, fs_key, fs_key_length, fs_repo->config->datastore);
if (retVal == 0)
return 0;
return 1;
}
/***
* Write a unixfs to the datastore and blockstore
* @param unix_fs the struct to write
* @param fs_repo the repo to write to
* @param bytes_written number of bytes written to the repo
* @returns true(1) on success
*/
int ipfs_repo_fsrepo_node_write(const struct HashtableNode* node, const struct FSRepo* fs_repo, size_t* bytes_written) {
/**
* What is put in the blockstore is the node.
* What is put in the datastore is the multihash as the key,
* and the base32 encoded multihash as the value.
*/
int retVal = 1;
retVal = ipfs_blockstore_put_node(node, fs_repo, bytes_written);
if (retVal == 0)
return 0;
// take the hash, base32 it, and send both to the datastore
size_t fs_key_length = 100;
unsigned char fs_key[fs_key_length];
retVal = ipfs_datastore_helper_ds_key_from_binary(node->hash, node->hash_size, fs_key, fs_key_length, &fs_key_length);
if (retVal == 0)
return 0;
retVal = fs_repo->config->datastore->datastore_put(node->hash, node->hash_size, fs_key, fs_key_length, fs_repo->config->datastore);
retVal = ipfs_datastore_helper_add_block_to_datastore(block, fs_repo->config->datastore);
if (retVal == 0)
return 0;
return 1;

View File

@ -174,12 +174,12 @@ int lmdb_datastore_create_transaction(struct lmdb_context *db_context, MDB_txn *
* @param datastore the datastore to write to
* @returns true(1) on success
*/
int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned char* data, size_t data_size, const struct Datastore* datastore) {
int repo_fsrepo_lmdb_put(struct DatastoreRecord* datastore_record, const struct Datastore* datastore) {
int retVal;
struct MDB_txn *child_transaction;
struct MDB_val datastore_key;
struct MDB_val datastore_value;
struct DatastoreRecord *datastore_record = NULL;
struct DatastoreRecord* existingRecord = NULL;
struct JournalRecord *journalstore_record = NULL;
struct lmdb_trans_cursor *journalstore_cursor = NULL;
@ -207,24 +207,20 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
}
// see if what we want is already in the datastore
repo_fsrepo_lmdb_get_with_transaction(key, key_size, &datastore_record, child_transaction, db_context->datastore_db);
if (datastore_record != NULL) {
repo_fsrepo_lmdb_get_with_transaction(datastore_record->key, datastore_record->key_size, &existingRecord, child_transaction, db_context->datastore_db);
if (existingRecord != NULL) {
// overwrite the timestamp of the incoming record if what we have is older than what is coming in
if ( existingRecord->timestamp != 0 && datastore_record->timestamp > existingRecord->timestamp) {
datastore_record->timestamp = existingRecord->timestamp;
}
// build the journalstore_record with the search criteria
journalstore_record = lmdb_journal_record_new();
journalstore_record->hash_size = key_size;
journalstore_record->hash = malloc(key_size);
memcpy(journalstore_record->hash, key, key_size);
journalstore_record->hash_size = datastore_record->key_size;
journalstore_record->hash = malloc(datastore_record->key_size);
memcpy(journalstore_record->hash, datastore_record->key, datastore_record->key_size);
journalstore_record->timestamp = datastore_record->timestamp;
// look up the corresponding journalstore record for possible updating
lmdb_journalstore_get_record(db_context, journalstore_cursor, &journalstore_record);
} else { // it wasn't previously in the database
datastore_record = libp2p_datastore_record_new();
if (datastore_record == NULL) {
libp2p_logger_error("lmdb_datastore", "put: Unable to allocate memory for DatastoreRecord.\n");
lmdb_trans_cursor_free(journalstore_cursor);
mdb_txn_commit(child_transaction);
return 0;
}
}
// Put in the timestamp if it isn't there already (or is newer)
@ -234,13 +230,6 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
//old_timestamp = datastore_record->timestamp;
datastore_record->timestamp = now;
}
// fill in the other fields
datastore_record->key_size = key_size;
datastore_record->key = (uint8_t*) malloc(key_size);
memcpy(datastore_record->key, key, key_size);
datastore_record->value_size = data_size;
datastore_record->value = (uint8_t *) malloc(data_size);
memcpy(datastore_record->value, data, data_size);
// convert it into a byte array
@ -249,8 +238,8 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size);
// prepare data
datastore_key.mv_size = key_size;
datastore_key.mv_data = (char*)key;
datastore_key.mv_size = datastore_record->key_size;
datastore_key.mv_data = (char*)datastore_record->key;
// write
datastore_value.mv_size = record_size;
@ -271,9 +260,9 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
} else {
// add it to the journalstore
journalstore_record = lmdb_journal_record_new();
journalstore_record->hash = (uint8_t*) malloc(key_size);
memcpy(journalstore_record->hash, key, key_size);
journalstore_record->hash_size = key_size;
journalstore_record->hash = (uint8_t*) malloc(datastore_record->key_size);
memcpy(journalstore_record->hash, datastore_record->key, datastore_record->key_size);
journalstore_record->hash_size = datastore_record->key_size;
journalstore_record->timestamp = datastore_record->timestamp;
journalstore_record->pending = 1; // TODO: Calculate this correctly
journalstore_record->pin = 1;
@ -295,7 +284,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
libp2p_logger_error("lmdb_datastore", "lmdb_put: transaction commit failed.\n");
}
free(record);
libp2p_datastore_record_free(datastore_record);
libp2p_datastore_record_free(existingRecord);
return retVal;
}

View File

@ -188,7 +188,13 @@ int test_journal_server_2() {
sleep(45);
libp2p_logger_error("test_journal", "Sleep is over. Shutting down.\n");
// see if we have the file that we should have...
if (!have_file_in_blockstore("/tmp/ipfs_2/.ipfs/blockstore", "2PD7A7OALR6OCEDZNKYAX363LMX3SBXZQPD3IAVTT")) {
libp2p_logger_error("test_journal", "We don't have the file that we think we should.\n");
goto exit;
} else {
libp2p_logger_debug("test_journal", "File is here. Success!\n");
}
retVal = 1;
exit:

View File

@ -58,7 +58,8 @@ int test_repo_fsrepo_write_read_block() {
return 0;
}
retVal = ipfs_repo_fsrepo_block_write(block, fs_repo);
size_t bytes_written;
retVal = ipfs_repo_fsrepo_block_write(block, fs_repo, &bytes_written);
if (retVal == 0) {
ipfs_repo_fsrepo_free(fs_repo);
ipfs_block_free(block);

View File

@ -56,7 +56,7 @@ int test_ipfs_datastore_put() {
return 0;
}
// send to Put with key
retVal = fs_repo->config->datastore->datastore_put((const unsigned char*)key, key_length, block->data, block->data_length, fs_repo->config->datastore);
retVal = ipfs_datastore_helper_add_block_to_datastore(block, fs_repo->config->datastore);
if (retVal == 0) {
ipfs_block_free(block);
return 0;

View File

@ -255,3 +255,7 @@ void* test_daemon_start(void* arg) {
ipfs_daemon_start((char*)arg);
return NULL;
}
int have_file_in_blockstore(const char* dir, const char* filename) {
return 0;
}

View File

@ -53,3 +53,11 @@ int create_bytes(unsigned char* buffer, size_t num_bytes);
* @returns NULL
*/
void* test_daemon_start(void* arg);
/**
* Determine if a file exists in the blockstore
* @param dir the directory of the blockstore
* @param filename the file name (a base32 hash)
* @returns true(1) if the file exists, false(0) otherwise
*/
int have_file_in_blockstore(const char* dir, const char* filename);