journaling protocol

This commit is contained in:
jmjatlanta 2017-08-24 10:08:27 -05:00
parent d13e4b4318
commit 0bc975dfcf
17 changed files with 694 additions and 50 deletions

View file

@ -11,6 +11,7 @@ all:
cd core; make all; cd core; make all;
cd exchange; make all; cd exchange; make all;
cd importer; make all; cd importer; make all;
cd journal; make all;
cd merkledag; make all; cd merkledag; make all;
cd multibase; make all; cd multibase; make all;
cd pin; make all; cd pin; make all;
@ -35,6 +36,7 @@ clean:
cd core; make clean; cd core; make clean;
cd exchange; make clean; cd exchange; make clean;
cd importer; make clean; cd importer; make clean;
cd journal; make clean;
cd merkledag; make clean; cd merkledag; make clean;
cd multibase; make clean; cd multibase; make clean;
cd pin; make clean; cd pin; make clean;

View file

@ -5,14 +5,15 @@
#include "libp2p/routing/dht_protocol.h" #include "libp2p/routing/dht_protocol.h"
#include "ipfs/core/ipfs_node.h" #include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/exchange/bitswap/bitswap.h"
#include "ipfs/journal/journal.h"
struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* node) { struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* node) {
struct Libp2pVector* retVal = libp2p_utils_vector_new(1); struct Libp2pVector* retVal = libp2p_utils_vector_new(1);
if (retVal != NULL) { if (retVal != NULL) {
// secio // secio
libp2p_utils_vector_add(retVal, libp2p_secio_build_protocol_handler(&node->identity->private_key, node->peerstore)); libp2p_utils_vector_add(retVal, libp2p_secio_build_protocol_handler(&node->identity->private_key, node->peerstore));
// nodeio // journal
//libp2p_utils_vector_add(retVal, libp2p_nodeio_build_protocol_handler()); libp2p_utils_vector_add(retVal, ipfs_journal_build_protocol_handler(node));
// kademlia // kademlia
libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore)); libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore));
// bitswap // bitswap

View file

@ -0,0 +1,50 @@
#pragma once
#include <stdio.h>
#include <stdint.h>
#include "libp2p/conn/session.h"
#include "ipfs/core/ipfs_node.h"
#include "libp2p/net/protocol.h"
/**
* The journal protocol attempts to keep a journal in sync with other (approved) nodes
*/
/***
* See if we can handle this message
* @param incoming the incoming message
* @param incoming_size the size of the incoming message
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
*/
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size);
/**
* Clean up resources used by this handler
* @param context the context to clean up
* @returns true(1)
*/
int ipfs_journal_shutdown_handler(void* context);
/***
* Handles a message
* @param incoming the message
* @param incoming_size the size of the message
* @param session_context details of the remote peer
* @param protocol_context in this case, an IpfsNode
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) ;
/***
* Build the protocol handler struct for the Journal protocol
* @param local_node what to stuff in the context
* @returns the protocol handler
*/
struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct IpfsNode* local_node);
/***
* Send a journal message to a remote peer
* @param peer the peer to send it to
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_journal_sync(struct Libp2pPeer* peer);

View file

@ -0,0 +1,43 @@
#pragma once
/**
* A journal entry protobuf
*/
#include <stdlib.h>
#include <stdint.h>
struct JournalEntry {
unsigned long long timestamp;
uint8_t pin;
uint8_t *hash;
size_t hash_size;
};
struct JournalEntry* ipfs_journal_entry_new();
int ipfs_journal_entry_free(struct JournalEntry* entry);
/**
* Determine the maximum size of a protobuf'd JournalEntry
*/
int ipfs_journal_entry_encode_size(struct JournalEntry* entry);
/***
* Protobuf the journal entry
* @param entry the JournalEntry to protobuf
* @param buffer where to place the results
* @param max_buffer_size the amount of memory allocated for the buffer
* @param bytes_used the amount of the buffer used
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_entry_encode(struct JournalEntry* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_used);
/***
* Turn a protobuf'd JournalEntry and turn it into a real JournalEntry
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming buffer
* @param results where to put the new JournalEntry
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_entry_decode(uint8_t *incoming, size_t incoming_size, struct JournalEntry **results);

View file

@ -0,0 +1,42 @@
#pragma once
#include <stdlib.h>
#include <stdint.h>
#include "libp2p/utils/vector.h"
struct JournalMessage {
unsigned long long current_epoch;
unsigned long long start_epoch;
unsigned long long end_epoch;
struct Libp2pVector* journal_entries;
};
struct JournalMessage* ipfs_journal_message_new();
int ipfs_journal_message_free(struct JournalMessage* message);
/**
* Determine the maximum size of a protobuf'd JournalMessage
* @param message the JournalMessage
* @returns the maximum size of this message in bytes if it were protobuf'd
*/
int ipfs_journal_message_encode_size(struct JournalMessage* message);
/***
* Protobuf the journal message
* @param message the JournalMessage to protobuf
* @param buffer where to place the results
* @param max_buffer_size the amount of memory allocated for the buffer
* @param bytes_used the amount of the buffer used
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_message_encode(struct JournalMessage* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_used);
/***
* Turn a protobuf'd JournalMessage and turn it into a real JournalMessage
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming buffer
* @param results where to put the new JournalMessage
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **results);

View file

@ -6,6 +6,7 @@
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include "lmdb.h"
#include "libp2p/db/datastore.h" #include "libp2p/db/datastore.h"
struct JournalRecord { struct JournalRecord {
@ -31,3 +32,5 @@ int repo_journalstore_cursor_get(struct Datastore* datastore, void* cursor, enum
int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor); int repo_cournalstore_cursor_close(struct Datastore* datastore, void* cursor);
int journal_record_free(struct JournalRecord* rec); int journal_record_free(struct JournalRecord* rec);
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size);

18
journal/Makefile Normal file
View file

@ -0,0 +1,18 @@
CC = gcc
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multiaddr/include -I../../c-multihash/include -I../../c-protobuf -Wall -std=c99
ifdef DEBUG
CFLAGS += -g3
endif
LFLAGS =
DEPS =
OBJS = journal.o journal_entry.o journal_message.o
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)
all: $(OBJS)
clean:
rm -f *.o

89
journal/journal.c Normal file
View file

@ -0,0 +1,89 @@
/**
* The journal protocol attempts to keep a journal in sync with other (approved) nodes
*/
#include "ipfs/journal/journal.h"
/***
* See if we can handle this message
* @param incoming the incoming message
* @param incoming_size the size of the incoming message
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
*/
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) {
if (incoming_size < 8)
return 0;
char* result = strstr((char*)incoming, "/ipfs/journal/1.0.0");
if(result == NULL || result != (char*)incoming)
return 0;
return 1;
}
/**
* Clean up resources used by this handler
* @param context the context to clean up
* @returns true(1)
*/
int ipfs_journal_shutdown_handler(void* context) {
return 1;
}
/***
* Handles a message
* @param incoming the message
* @param incoming_size the size of the message
* @param session_context details of the remote peer
* @param protocol_context in this case, an IpfsNode
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
//struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
//TODO: handle the message
return -1;
}
/***
* Build the protocol handler struct for the Journal protocol
* @param local_node what to stuff in the context
* @returns the protocol handler
*/
struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct IpfsNode* local_node) {
struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*) malloc(sizeof(struct Libp2pProtocolHandler));
if (handler != NULL) {
handler->context = (void*)local_node;
handler->CanHandle = ipfs_journal_can_handle;
handler->HandleMessage = ipfs_journal_handle_message;
handler->Shutdown = ipfs_journal_shutdown_handler;
}
return handler;
}
/***
* Send a journal message to a remote peer
* @param peer the peer to send it to
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_journal_sync(struct Libp2pPeer* peer) {
// make sure we're connected securely
if (peer->is_local)
return 0;
if (peer->sessionContext->secure_stream == NULL)
return 0;
/*
// grab the last 10 files
struct Libp2pVector* vector = libp2p_utils_vector_new(1);
if (vector == NULL) {
return 0;
}
ipfs_journal_get_last(10, &vector);
struct JournalMessage* message = NULL;
// build the message
if (!ipfs_journal_build_message(message))
return 0;
// protobuf the message
// send the protocol header
// send the message
*/
return 0;
}

141
journal/journal_entry.c Normal file
View file

@ -0,0 +1,141 @@
/**
* A journal entry protobuf
*/
#include "libp2p/utils/logger.h"
#include "ipfs/journal/journal_entry.h"
#include "protobuf.h"
struct JournalEntry* ipfs_journal_entry_new() {
struct JournalEntry* journal_entry = (struct JournalEntry*) malloc(sizeof(struct JournalEntry));
if (journal_entry != NULL) {
journal_entry->hash = NULL;
journal_entry->hash_size = 0;
journal_entry->pin = 0;
journal_entry->timestamp = 0;
}
return journal_entry;
}
int ipfs_journal_entry_free(struct JournalEntry* entry) {
if (entry != NULL) {
if (entry->hash != NULL) {
free(entry->hash);
entry->hash = NULL;
entry->hash_size = 0;
}
free(entry);
}
return 1;
}
/**
* Determine the maximum size of a protobuf'd JournalEntry
*/
int ipfs_journal_entry_encode_size(struct JournalEntry* entry) {
// hash
int retVal = entry->hash_size;
// hash size
retVal += 11;
// pin
retVal += 1;
// timestamp
retVal += 11;
return retVal;
}
/***
* Protobuf the journal entry
* @param entry the JournalEntry to protobuf
* @param buffer where to place the results
* @param max_buffer_size the amount of memory allocated for the buffer
* @param bytes_used the amount of the buffer used
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_entry_encode(struct JournalEntry* entry, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_written) {
/*
message JournalEntry {
int32 timestamp = 1;
string hash = 2;
bool pin = 3;
}
*/
// sanity checks
if (buffer == NULL)
return 0;
if (max_buffer_size <= 0)
return 0;
*bytes_written = 0;
size_t bytes_used;
// timestamp
if (!protobuf_encode_varint(1, WIRETYPE_VARINT, entry->timestamp, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
// hash
if (!protobuf_encode_length_delimited(2, WIRETYPE_LENGTH_DELIMITED, (char*)entry->hash, entry->hash_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
// pin
if (!protobuf_encode_varint(3, WIRETYPE_VARINT, entry->pin, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
return 1;
}
/***
* Turn a protobuf'd JournalEntry and turn it into a real JournalEntry
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming buffer
* @param results where to put the new JournalEntry
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_entry_decode(uint8_t *incoming, size_t incoming_size, struct JournalEntry **out) {
size_t pos = 0;
int retVal = 0, got_something = 0;;
if ( (*out = ipfs_journal_entry_new()) == NULL)
goto exit;
while(pos < incoming_size) {
size_t bytes_read = 0;
int field_no;
enum WireType field_type;
if (protobuf_decode_field_and_type(&incoming[pos], incoming_size, &field_no, &field_type, &bytes_read) == 0) {
goto exit;
}
if (field_no < 1 || field_no > 5) {
libp2p_logger_error("journal_entry", "Invalid character in journal_entry protobuf at position %lu. Value: %02x\n", pos, incoming[pos]);
}
pos += bytes_read;
switch(field_no) {
case (1): // timestamp
if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->timestamp, &bytes_read) == 0)
goto exit;
pos += bytes_read;
got_something = 1;
break;
case (2): // hash
if (protobuf_decode_length_delimited(&incoming[pos], incoming_size - pos, (char**)&(*out)->hash, &(*out)->hash_size, &bytes_read) == 0)
goto exit;
pos += bytes_read;
got_something = 1;
break;
case (3): { // pin
unsigned long long temp;
if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &temp, &bytes_read) == 0)
goto exit;
(*out)->pin = (temp == 1);
pos += bytes_read;
got_something = 1;
break;
}
}
}
retVal = got_something;
exit:
if (retVal == 0) {
ipfs_journal_entry_free(*out);
}
return retVal;
}

171
journal/journal_message.c Normal file
View file

@ -0,0 +1,171 @@
#include "ipfs/journal/journal_message.h"
#include "ipfs/journal/journal_entry.h"
#include "libp2p/utils/logger.h"
#include "protobuf.h"
struct JournalMessage* ipfs_journal_message_new() {
struct JournalMessage *message = (struct JournalMessage*) malloc(sizeof(struct JournalMessage));
if (message != NULL) {
message->current_epoch = 0;
message->end_epoch = 0;
message->start_epoch = 0;
message->journal_entries = libp2p_utils_vector_new(1);
}
return message;
}
int ipfs_journal_message_free(struct JournalMessage* message) {
if (message != NULL) {
if (message->journal_entries != NULL) {
for(int i = 0; i < message->journal_entries->total; i++) {
struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i);
ipfs_journal_entry_free(entry);
}
libp2p_utils_vector_free(message->journal_entries);
message->journal_entries = NULL;
}
free(message);
}
return 1;
}
/**
* Determine the maximum size of a protobuf'd JournalMessage
* @param message the JournalMessage
* @returns the maximum size of this message in bytes if it were protobuf'd
*/
int ipfs_journal_message_encode_size(struct JournalMessage* message) {
// 3 epochs
int sz = 33;
// journal entries
for (int i = 0; i < message->journal_entries->total; i++) {
struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i);
sz += ipfs_journal_entry_encode_size(entry);
}
return sz;
}
/***
* Protobuf the journal message
* @param message the JournalMessage to protobuf
* @param buffer where to place the results
* @param max_buffer_size the amount of memory allocated for the buffer
* @param bytes_used the amount of the buffer used
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_message_encode(struct JournalMessage* message, uint8_t *buffer, size_t max_buffer_size, size_t *bytes_written) {
/*
message JournalMessage {
int32 current_epoch = 1;
int32 start_epoch = 2;
int32 end_epoch = 3;
repeated JournalEntry journal_entries = 4;
}
*/
// sanity checks
if (buffer == NULL)
return 0;
if (max_buffer_size <= 0)
return 0;
*bytes_written = 0;
size_t bytes_used;
// current_epoch
if (!protobuf_encode_varint(1, WIRETYPE_VARINT, message->current_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
// start_epoch
if (!protobuf_encode_varint(2, WIRETYPE_VARINT, message->start_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
// end_epoch
if (!protobuf_encode_varint(3, WIRETYPE_VARINT, message->end_epoch, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used))
return 0;
*bytes_written += bytes_used;
// journal_entries
for (int i = 0; i < message->journal_entries->total; i++) {
struct JournalEntry* entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, i);
// encode the journal entry
size_t temp_size = ipfs_journal_entry_encode_size(entry);
uint8_t temp[temp_size];
if (!ipfs_journal_entry_encode(entry, &temp[0], temp_size, &temp_size))
return 0;
if (!protobuf_encode_length_delimited(4, WIRETYPE_LENGTH_DELIMITED, (char*)&temp[0], temp_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used)) {
return 0;
}
*bytes_written += bytes_used;
}
return 1;
}
/***
* Turn a protobuf'd JournalMessage and turn it into a real JournalMessage
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming buffer
* @param results where to put the new JournalMessage
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_journal_message_decode(uint8_t *incoming, size_t incoming_size, struct JournalMessage **out) {
size_t pos = 0;
int retVal = 0, got_something = 0;;
if ( (*out = ipfs_journal_message_new()) == NULL)
goto exit;
while(pos < incoming_size) {
size_t bytes_read = 0;
int field_no;
enum WireType field_type;
if (protobuf_decode_field_and_type(&incoming[pos], incoming_size, &field_no, &field_type, &bytes_read) == 0) {
goto exit;
}
if (field_no < 1 || field_no > 5) {
libp2p_logger_error("journal_message", "Invalid character in journal_message protobuf at position %lu. Value: %02x\n", pos, incoming[pos]);
}
pos += bytes_read;
switch(field_no) {
case (1): // current_epoch
if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->current_epoch, &bytes_read) == 0)
goto exit;
pos += bytes_read;
got_something = 1;
break;
case (2): // start_epoch
if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->start_epoch, &bytes_read) == 0)
goto exit;
pos += bytes_read;
got_something = 1;
break;
case (3): // end_epoch
if (protobuf_decode_varint(&incoming[pos], incoming_size - pos, &(*out)->end_epoch, &bytes_read) == 0)
goto exit;
pos += bytes_read;
got_something = 1;
break;
case (4): { // journal entry
uint8_t *temp;
size_t temp_length;
protobuf_decode_length_delimited(&incoming[pos], incoming_size - pos, (char**)&temp, &temp_length, &bytes_read);
pos += bytes_read;
struct JournalEntry* entry = NULL;
if (ipfs_journal_entry_decode(&temp[0], temp_length, &entry)) {
libp2p_utils_vector_add((*out)->journal_entries, (void*)entry);
free(temp);
} else {
free(temp);
goto exit;
}
got_something = 1;
break;
}
}
}
retVal = got_something;
exit:
if (retVal == 0) {
ipfs_journal_message_free(*out);
}
return retVal;
}

View file

@ -0,0 +1,12 @@
message JournalEntry {
int32 timestamp = 1;
string hash = 2;
bool pin = 3;
}
message JournalMessage {
int32 current_epoch = 1;
int32 start_epoch = 2;
int32 end_epoch = 3;
repeated JournalEntry journal_entries = 4;
}

View file

@ -14,6 +14,7 @@ OBJS = main.o \
../exchange/bitswap/*.o \ ../exchange/bitswap/*.o \
../flatfs/flatfs.o \ ../flatfs/flatfs.o \
../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \
../journal/*.o \
../path/path.o \ ../path/path.o \
../merkledag/merkledag.o ../merkledag/node.o \ ../merkledag/merkledag.o ../merkledag/node.o \
../multibase/multibase.o \ ../multibase/multibase.o \

View file

@ -13,6 +13,7 @@
#include "lmdb.h" #include "lmdb.h"
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
#include "ipfs/repo/fsrepo/lmdb_datastore.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h"
#include "ipfs/repo/fsrepo/journalstore.h"
#include "varint.h" #include "varint.h"
/** /**
@ -121,52 +122,6 @@ int repo_fsrepo_lmdb_get(const char* key, size_t key_size, unsigned char* data,
return 1; return 1;
} }
/***
* Write a journal record
* @param mbd_txn the transaction
* @param timestamp the timestamp
* @param hash the hash
* @param hash_size the size of the hash
* @returns true(1) on success, false(0) otherwise
*/
int repo_fsrepo_lmdb_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) {
MDB_dbi mdb_dbi;
struct MDB_val db_key;
struct MDB_val db_value;
libp2p_logger_debug("lmdb_datastore", "journal add timestamp: %llu.\n", timestamp);
// build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value
uint8_t time_varint[8];
size_t time_varint_size = 0;
varint_encode(timestamp, &time_varint[0], 8, &time_varint_size);
libp2p_logger_debug("lmdb_datastore", "journal add varint size: %lu.\n", (unsigned long)time_varint_size);
size_t record_size = hash_size + 1;
uint8_t record[record_size];
record[0] = 1;
memcpy(&record[1], hash, hash_size);
// open the journal table
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
return 0;
}
// write the record
db_key.mv_size = time_varint_size;
db_key.mv_data = time_varint;
db_value.mv_size = record_size;
db_value.mv_data = record;
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) {
return 0;
}
return 1;
}
/** /**
* Get the current time UTC * Get the current time UTC
@ -223,7 +178,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha
retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE); retVal = mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, MDB_NODUPDATA | MDB_NOOVERWRITE);
if (retVal == 0) { if (retVal == 0) {
// the normal case // the normal case
repo_fsrepo_lmdb_journal_add(mdb_txn, timestamp, key, key_size); lmdb_journalstore_journal_add(mdb_txn, timestamp, key, key_size);
retVal = 1; retVal = 1;
} else { } else {
if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip. if (retVal == MDB_KEYEXIST) // We tried to add a key that already exists. Skip.

View file

@ -15,6 +15,49 @@ int journal_record_free(struct JournalRecord* rec) {
return 1; return 1;
} }
/***
* Write a journal record
* @param mbd_txn the transaction
* @param timestamp the timestamp
* @param hash the hash
* @param hash_size the size of the hash
* @returns true(1) on success, false(0) otherwise
*/
int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp, const uint8_t *hash, size_t hash_size) {
MDB_dbi mdb_dbi;
struct MDB_val db_key;
struct MDB_val db_value;
// build the record, which is a timestamp as a key, a byte that is the pin flag, and the hash as the value
uint8_t time_varint[8];
size_t time_varint_size = 0;
varint_encode(timestamp, &time_varint[0], 8, &time_varint_size);
size_t record_size = hash_size + 1;
uint8_t record[record_size];
record[0] = 1;
memcpy(&record[1], hash, hash_size);
// open the journal table
if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) {
return 0;
}
// write the record
db_key.mv_size = time_varint_size;
db_key.mv_data = time_varint;
db_value.mv_size = record_size;
db_value.mv_data = record;
if (mdb_put(mdb_txn, mdb_dbi, &db_key, &db_value, 0) == 0) {
return 0;
}
return 1;
}
/** /**
* Open a cursor to the journalstore table * Open a cursor to the journalstore table
* @param datastore the data connection * @param datastore the data connection

View file

@ -1,5 +1,5 @@
CC = gcc CC = gcc
CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../c-protobuf -g3 -Wall -std=c99 CFLAGS = -O0 -I../include -I../../c-libp2p/include -I../../c-multihash/include -I../../c-multiaddr/include -I../../c-protobuf -I../../lmdb/libraries/liblmdb -g3 -Wall -std=c99
LFLAGS = -L../../c-libp2p -L../../c-multihash -L../../c-multiaddr -lp2p -lm -lmultihash -lmultiaddr -lpthread LFLAGS = -L../../c-libp2p -L../../c-multihash -L../../c-multiaddr -lp2p -lm -lmultihash -lmultiaddr -lpthread
DEPS = cmd/ipfs/test_init.h repo/test_repo_bootstrap_peers.h repo/test_repo_config.h repo/test_repo_identity.h cid/test_cid.h DEPS = cmd/ipfs/test_init.h repo/test_repo_bootstrap_peers.h repo/test_repo_config.h repo/test_repo_identity.h cid/test_cid.h
OBJS = testit.o test_helper.o \ OBJS = testit.o test_helper.o \
@ -17,6 +17,7 @@ OBJS = testit.o test_helper.o \
../exchange/bitswap/*.o \ ../exchange/bitswap/*.o \
../flatfs/flatfs.o \ ../flatfs/flatfs.o \
../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \
../journal/*.o \
../merkledag/merkledag.o ../merkledag/node.o \ ../merkledag/merkledag.o ../merkledag/node.o \
../multibase/multibase.o \ ../multibase/multibase.o \
../repo/init.o \ ../repo/init.o \

View file

@ -0,0 +1,69 @@
#include <stdlib.h>
#include "ipfs/journal/journal_entry.h"
#include "ipfs/journal/journal_message.h"
int test_journal_encode_decode() {
int retVal = 0;
struct JournalEntry* entry = ipfs_journal_entry_new();
struct JournalMessage* message = ipfs_journal_message_new();
struct JournalMessage* result_message = NULL;
struct JournalEntry* result_entry = NULL;
uint8_t *buffer;
size_t buffer_size;
// build entry
entry->hash = malloc(1);
entry->hash[0] = 1;
entry->hash_size = 1;
entry->pin = 1;
entry->timestamp = 1;
// build message
message->current_epoch = 2;
message->start_epoch = 3;
message->end_epoch = 4;
libp2p_utils_vector_add(message->journal_entries, entry);
// protobuf it
buffer_size = ipfs_journal_message_encode_size(message);
buffer = malloc(buffer_size);
if (!ipfs_journal_message_encode(message, buffer, buffer_size, &buffer_size))
goto exit;
// unprotobuf it
if (!ipfs_journal_message_decode(buffer, buffer_size, &result_message))
goto exit;
// compare
if (result_message->current_epoch != message->current_epoch)
goto exit;
if (result_message->start_epoch != message->start_epoch)
goto exit;
if (result_message->end_epoch != message->end_epoch)
goto exit;
if (result_message->journal_entries->total != message->journal_entries->total)
goto exit;
result_entry = (struct JournalEntry*) libp2p_utils_vector_get(message->journal_entries, 0);
if (result_entry->timestamp != entry->timestamp)
goto exit;
if (result_entry->pin != entry->pin)
goto exit;
if (result_entry->hash_size != entry->hash_size)
goto exit;
for (int i = 0; i < result_entry->hash_size; i++) {
if (result_entry->hash[i] != entry->hash[i])
goto exit;
}
// cleanup
retVal = 1;
exit:
if (buffer != NULL)
free(buffer);
ipfs_journal_message_free(message);
ipfs_journal_message_free(result_message);
// the above lines take care of these
//ipfs_journal_entry_free(entry);
//ipfs_journal_entry_free(result_entry);
return retVal;
}

View file

@ -3,6 +3,7 @@
#include "exchange/test_bitswap.h" #include "exchange/test_bitswap.h"
#include "exchange/test_bitswap_request_queue.h" #include "exchange/test_bitswap_request_queue.h"
#include "flatfs/test_flatfs.h" #include "flatfs/test_flatfs.h"
#include "journal/test_journal.h"
#include "merkledag/test_merkledag.h" #include "merkledag/test_merkledag.h"
#include "node/test_node.h" #include "node/test_node.h"
#include "node/test_importer.h" #include "node/test_importer.h"
@ -46,6 +47,7 @@ const char* names[] = {
"test_cid_protobuf_encode_decode", "test_cid_protobuf_encode_decode",
"test_daemon_startup_shutdown", "test_daemon_startup_shutdown",
"test_datastore_list_journal", "test_datastore_list_journal",
"test_journal_encode_decode",
"test_repo_config_new", "test_repo_config_new",
"test_repo_config_init", "test_repo_config_init",
"test_repo_config_write", "test_repo_config_write",
@ -103,6 +105,7 @@ int (*funcs[])(void) = {
test_cid_protobuf_encode_decode, test_cid_protobuf_encode_decode,
test_daemon_startup_shutdown, test_daemon_startup_shutdown,
test_datastore_list_journal, test_datastore_list_journal,
test_journal_encode_decode,
test_repo_config_new, test_repo_config_new,
test_repo_config_init, test_repo_config_init,
test_repo_config_write, test_repo_config_write,