diff --git a/include/ipfs/journal/journal_message.h b/include/ipfs/journal/journal_message.h index 3289b7d..e29fb8b 100644 --- a/include/ipfs/journal/journal_message.h +++ b/include/ipfs/journal/journal_message.h @@ -39,4 +39,4 @@ int ipfs_journal_message_encode(struct JournalMessage* entry, uint8_t *buffer, s * @param results where to put the new JournalMessage * @returns true(1) on success, false(0) otherwise */ -int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **results); +int ipfs_journal_message_decode(const uint8_t *incoming, size_t incoming_size, struct JournalMessage **results); diff --git a/journal/journal.c b/journal/journal.c index 20e844e..1aeb5ce 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -2,6 +2,7 @@ * The journal protocol attempts to keep a journal in sync with other (approved) nodes */ #include "libp2p/os/utils.h" +#include "libp2p/utils/logger.h" #include "ipfs/journal/journal.h" #include "ipfs/journal/journal_message.h" #include "ipfs/journal/journal_entry.h" @@ -32,20 +33,6 @@ int ipfs_journal_shutdown_handler(void* context) { return 1; } -/*** - * Handles a message - * @param incoming the message - * @param incoming_size the size of the message - * @param session_context details of the remote peer - * @param protocol_context in this case, an IpfsNode - * @returns 0 if the caller should not continue looping, <0 on error, >0 on success - */ -int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { - //struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; - //TODO: handle the message - return -1; -} - /*** * Build the protocol handler struct for the Journal protocol * @param local_node what to stuff in the context @@ -177,3 +164,104 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli return retVal; } +enum JournalAction { JOURNAL_ENTRY_NEEDED, JOURNAL_TIME_ADJUST, JOURNAL_REMOTE_NEEDS }; + +struct JournalToDo { + enum JournalAction action; // what needs to be done + unsigned long long local_timestamp; // what we have in our journal + unsigned long long remote_timestamp; // what they have in their journal + uint8_t* hash; // the hash + size_t hash_size; // the size of the hash +}; + +struct JournalToDo* ipfs_journal_todo_new() { + struct JournalToDo* j = (struct JournalToDo*) malloc(sizeof(struct JournalToDo)); + if (j != NULL) { + j->action = JOURNAL_ENTRY_NEEDED; + j->hash = NULL; + j->hash_size = 0; + j->local_timestamp = 0; + j->remote_timestamp = 0; + } + return j; +} + +int ipfs_journal_todo_free(struct JournalToDo *in) { + if (in != NULL) { + free(in); + } + return 1; +} + +int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage* incoming, struct Libp2pVector** todo_vector) { + *todo_vector = libp2p_utils_vector_new(1); + if (*todo_vector == NULL) + return -1; + struct Libp2pVector *todos = *todo_vector; + // for every file in message + for(int i = 0; i < incoming->journal_entries->total; i++) { + struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(incoming->journal_entries, i); + // do we have the file? + struct DatastoreRecord *datastore_record = NULL; + if (!local_node->repo->config->datastore->datastore_get(entry->hash, entry->hash_size, &datastore_record, local_node->repo->config->datastore)) { + struct JournalToDo* td = ipfs_journal_todo_new(); + td->action = JOURNAL_ENTRY_NEEDED; + td->hash = entry->hash; + td->hash_size = entry->hash_size; + td->remote_timestamp = entry->timestamp; + libp2p_utils_vector_add(todos, td); + } else { + // do we need to adjust the time? + if (datastore_record->timestamp != entry->timestamp) { + struct JournalToDo* td = ipfs_journal_todo_new(); + td->action = JOURNAL_TIME_ADJUST; + td->hash = entry->hash; + td->hash_size = entry->hash_size; + td->local_timestamp = datastore_record->timestamp; + td->remote_timestamp = entry->timestamp; + libp2p_utils_vector_add(todos, td); + } + } + libp2p_datastore_record_free(datastore_record); + } + // TODO: get all files of same second + // are they perhaps missing something? + //struct Libp2pVector* local_records_for_second; + return 0; +} + +/*** + * Handles a message + * @param incoming the message + * @param incoming_size the size of the message + * @param session_context details of the remote peer + * @param protocol_context in this case, an IpfsNode + * @returns 0 if the caller should not continue looping, <0 on error, >0 on success + */ +int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { + struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; + // un-protobuf the message + struct JournalMessage* message = NULL; + if (!ipfs_journal_message_decode(incoming, incoming_size, &message)) + return -1; + // see if the remote's time is within 5 minutes of now + unsigned long long start_time = os_utils_gmtime(); + long long our_time_diff = start_time - message->current_epoch; + // NOTE: If our_time_diff is negative, the remote's clock is faster than ours. + // if it is positive, our clock is faster than theirs. + if ( llabs(our_time_diff) > 300) { + libp2p_logger_error("journal", "The clock of peer %s is out of 5 minute range. Seconds difference: %llu", session_context->remote_peer_id, our_time_diff); + return -1; + } + // TODO: get our records for the same period + // TODO: compare the two sets of records + // we will build a list of todo items: + // ask for files + // adjust time on files + // notify remote that we have files that they probably do not have + struct Libp2pVector* todo_vector = NULL; + ipfs_journal_build_todo(local_node, message, &todo_vector); + // set new values in their ReplicationPeer struct + return 1; +} + diff --git a/journal/journal_message.c b/journal/journal_message.c index 94c6f52..97bcf1f 100644 --- a/journal/journal_message.c +++ b/journal/journal_message.c @@ -104,7 +104,7 @@ int ipfs_journal_message_encode(struct JournalMessage* message, uint8_t *buffer, * @param results where to put the new JournalMessage * @returns true(1) on success, false(0) otherwise */ -int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **out) { +int ipfs_journal_message_decode(const uint8_t *incoming, size_t incoming_size, struct JournalMessage **out) { size_t pos = 0; int retVal = 0, got_something = 0;; diff --git a/merkledag/merkledag.c b/merkledag/merkledag.c index 5a84835..0d40d30 100644 --- a/merkledag/merkledag.c +++ b/merkledag/merkledag.c @@ -61,20 +61,19 @@ int ipfs_merkledag_add(struct HashtableNode* node, struct FSRepo* fs_repo, size_ */ int ipfs_merkledag_get(const unsigned char* hash, size_t hash_size, struct HashtableNode** node, const struct FSRepo* fs_repo) { int retVal = 1; - size_t key_length = 100; - unsigned char key[key_length]; + struct DatastoreRecord* datastore_record = NULL; // look for the node in the datastore. If it is not there, it is not a node. // If it exists, it is only a block. - retVal = fs_repo->config->datastore->datastore_get((char*)hash, hash_size, key, key_length, &key_length, fs_repo->config->datastore); + retVal = fs_repo->config->datastore->datastore_get(hash, hash_size, &datastore_record, fs_repo->config->datastore); if (retVal == 0) return 0; + libp2p_datastore_record_free(datastore_record); + // we have the record from the db. Go get the node from the blockstore - retVal = ipfs_repo_fsrepo_node_read(hash, hash_size, node, fs_repo); - if (retVal == 0) { + if (!ipfs_repo_fsrepo_node_read(hash, hash_size, node, fs_repo)) return 0; - } // set the hash ipfs_hashtable_node_set_hash(*node, hash, hash_size); diff --git a/repo/fsrepo/fs_repo.c b/repo/fsrepo/fs_repo.c index 5c7d752..105b6f3 100644 --- a/repo/fsrepo/fs_repo.c +++ b/repo/fsrepo/fs_repo.c @@ -787,11 +787,10 @@ int ipfs_repo_fsrepo_node_read(const unsigned char* hash, size_t hash_length, st // get the base32 hash from the database // We do this only to see if it is in the database - size_t fs_key_length = 100; - unsigned char fs_key[fs_key_length]; - retVal = fs_repo->config->datastore->datastore_get((const char*)hash, hash_length, fs_key, fs_key_length, &fs_key_length, fs_repo->config->datastore); - if (retVal == 0) // maybe it doesn't exist? + struct DatastoreRecord *datastore_record = NULL; + if (!fs_repo->config->datastore->datastore_get(hash, hash_length, &datastore_record, fs_repo->config->datastore)) return 0; + libp2p_datastore_record_free(datastore_record); // now get the block from the blockstore retVal = ipfs_blockstore_get_node(hash, hash_length, node, fs_repo); return retVal; @@ -804,11 +803,12 @@ int ipfs_repo_fsrepo_block_read(const unsigned char* hash, size_t hash_length, s // get the base32 hash from the database // We do this only to see if it is in the database - size_t fs_key_length = 100; - unsigned char fs_key[fs_key_length]; - retVal = fs_repo->config->datastore->datastore_get((const char*)hash, hash_length, fs_key, fs_key_length, &fs_key_length, fs_repo->config->datastore); - if (retVal == 0) // maybe it doesn't exist? + struct DatastoreRecord *datastore_record = NULL; + if (!fs_repo->config->datastore->datastore_get(hash, hash_length, &datastore_record, fs_repo->config->datastore)) return 0; + + libp2p_datastore_record_free(datastore_record); + // now get the block from the blockstore struct Cid* cid = ipfs_cid_new(0, hash, hash_length, CID_PROTOBUF); if (cid == NULL) @@ -829,11 +829,10 @@ int ipfs_repo_fsrepo_unixfs_read(const unsigned char* hash, size_t hash_length, // get the base32 hash from the database // We do this only to see if it is in the database - size_t fs_key_length = 100; - unsigned char fs_key[fs_key_length]; - retVal = fs_repo->config->datastore->datastore_get((const char*)hash, hash_length, fs_key, fs_key_length, &fs_key_length, fs_repo->config->datastore); - if (retVal == 0) // maybe it doesn't exist? + struct DatastoreRecord *datastore_record = NULL; + if (!fs_repo->config->datastore->datastore_get(hash, hash_length, &datastore_record, fs_repo->config->datastore)) return 0; + libp2p_datastore_record_free(datastore_record); // now get the block from the blockstore retVal = ipfs_blockstore_get_unixfs(hash, hash_length, unix_fs, fs_repo); return retVal; diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index 873101c..adc6c2b 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -13,64 +13,81 @@ #include "lmdb.h" #include "libp2p/utils/logger.h" #include "libp2p/os/utils.h" +#include "libp2p/db/datastore.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h" #include "ipfs/repo/fsrepo/journalstore.h" +#include "libp2p/db/datastore.h" #include "varint.h" /** * Build a "value" section for a datastore record - * @param timestamp the timestamp - * @param data the data (usually a base32 of the cid hash) - * @param data_length the length of data - * @param result the resultant data object + * @param record the data + * @param result the data (usually a base32 of the cid hash) + the timestamp as varint * @param result_size the size of the result * @returns true(1) on success, otherwise 0 */ -int repo_fsrepo_lmdb_build_record(const unsigned long long timestamp, const uint8_t *data, size_t data_length, uint8_t **result, size_t *result_size) { +int repo_fsrepo_lmdb_encode_record(struct DatastoreRecord* record, uint8_t **result, size_t *result_size) { // turn timestamp into varint uint8_t ts_varint[8]; size_t num_bytes; - if (varint_encode(timestamp, &ts_varint[0], 8, &num_bytes) == NULL) { + if (varint_encode(record->timestamp, &ts_varint[0], 8, &num_bytes) == NULL) { return 0; } // make new structure - *result = (uint8_t *) malloc(num_bytes + data_length); + *result = (uint8_t *) malloc(num_bytes + record->value_size); if (*result == NULL) { return 0; } memcpy(*result, ts_varint, num_bytes); - memcpy(&(*result)[num_bytes], data, data_length); - *result_size = data_length + num_bytes; + memcpy(&(*result)[num_bytes], record->value, record->value_size); + *result_size = record->value_size + num_bytes; return 1; } /** - * read a "value" section from a datastore record. - * @param data what we read from the datastore - * @param data_length the length of what we read from the datastore - * @param timestamp the timestamp that was read from the datastore - * @param record_pos where the data starts (without the timestamp) - * @param record_size the size of the data section of the record + * turn lmdb components into a DatastoreRecord structure + * @param key the key that we searched for in the database + * @param value the result of the search + * @param record the complete structure * @returns true(1) on success, false(0) otherwise */ -int repo_fsrepo_lmdb_parse_record(uint8_t *data, size_t data_length, unsigned long long *timestamp, uint8_t *record_pos, size_t *record_size) { - size_t varint_size = 0; - *timestamp = varint_decode(data, data_length, &varint_size); - record_pos = &data[varint_size]; - return 1; +int repo_fsrepo_lmdb_build_record(MDB_val *key, MDB_val *value, struct DatastoreRecord** record) { + *record = libp2p_datastore_record_new(); + if (*record != NULL) { + size_t varint_size = 0; + struct DatastoreRecord *rec = *record; + // set key + rec->key_size = key->mv_size; + rec->key = (uint8_t *) malloc(rec->key_size); + if (rec->key == NULL) { + libp2p_datastore_record_free(*record); + *record = NULL; + return 0; + } + memcpy(rec->key, key->mv_data, key->mv_size); + // set value + rec->timestamp = varint_decode(value->mv_data, value->mv_size, &varint_size); + rec->value_size = value->mv_size - varint_size; + rec->value = (uint8_t *) malloc(rec->value_size); + if (rec->value == NULL) { + libp2p_datastore_record_free(*record); + *record = NULL; + return 0; + } + memcpy(rec->value, &value->mv_data[varint_size], rec->value_size); + } + return 0; } /*** * retrieve a record from the database and put in a pre-sized buffer * @param key the key to look for * @param key_size the length of the key - * @param data the data that is retrieved - * @param max_data_size the length of the data buffer - * @param data_size the length of the data that was found in the database + * @param record where to put the results * @param datastore where to look for the data * @returns true(1) on success */ -int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, size_t max_data_size, size_t* data_size, const struct Datastore* datastore) { +int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct DatastoreRecord **record, const struct Datastore* datastore) { MDB_txn* mdb_txn; MDB_dbi mdb_dbi; struct MDB_val db_key; @@ -98,25 +115,11 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data, return 0; } - // the data from the database includes a timestamp. We'll need to strip it off. - unsigned long long timestamp; - uint8_t *pos = NULL; - size_t size = 0; - if (!repo_fsrepo_lmdb_parse_record(db_key.mv_data, db_key.mv_size, ×tamp, pos, &size)) { + if (!repo_fsrepo_lmdb_build_record(&db_key, &db_value, record)) { mdb_txn_commit(mdb_txn); return 0; } - // Was it too big to fit in the buffer they sent? - if (size > max_data_size) { - mdb_txn_commit(mdb_txn); - return 0; - } - - // set return values - memcpy(data, pos, size); - (*data_size) = size; - // clean up mdb_txn_commit(mdb_txn); @@ -153,9 +156,10 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha // add the timestamp unsigned long long timestamp = os_utils_gmtime(); + struct DatastoreRecord *datastore_record = libp2p_datastore_record_new(); + size_t record_size = 0; uint8_t *record; - size_t record_size; - repo_fsrepo_lmdb_build_record(timestamp, data, data_size, &record, &record_size); + repo_fsrepo_lmdb_encode_record(datastore_record, &record, &record_size); // prepare data db_key.mv_size = key_size;