2017-08-24 15:08:27 +00:00
/**
* The journal protocol attempts to keep a journal in sync with other ( approved ) nodes
*/
2017-08-31 11:41:54 +00:00
# include "libp2p/crypto/encoding/base58.h"
2017-08-24 18:30:44 +00:00
# include "libp2p/os/utils.h"
2017-08-28 15:55:44 +00:00
# include "libp2p/utils/logger.h"
2017-08-24 15:08:27 +00:00
# include "ipfs/journal/journal.h"
2017-08-24 18:30:44 +00:00
# include "ipfs/journal/journal_message.h"
# include "ipfs/journal/journal_entry.h"
# include "ipfs/repo/fsrepo/journalstore.h"
2017-08-28 11:56:22 +00:00
# include "ipfs/repo/config/replication.h"
2017-08-24 15:08:27 +00:00
/***
* See if we can handle this message
* @ param incoming the incoming message
* @ param incoming_size the size of the incoming message
* @ returns true ( 1 ) if the protocol in incoming is something we can handle . False ( 0 ) otherwise .
*/
int ipfs_journal_can_handle ( const uint8_t * incoming , size_t incoming_size ) {
2017-09-07 23:45:09 +00:00
const char * protocol = " /ipfs/journalio/1.0.0 " ;
if ( incoming_size < 21 )
2017-08-24 15:08:27 +00:00
return 0 ;
2017-09-07 23:45:09 +00:00
char * result = strnstr ( ( char * ) incoming , protocol , incoming_size ) ;
2017-08-24 15:08:27 +00:00
if ( result = = NULL | | result ! = ( char * ) incoming )
return 0 ;
2017-08-30 16:10:14 +00:00
libp2p_logger_debug ( " journal " , " Handling incoming message. \n " ) ;
2017-08-24 15:08:27 +00:00
return 1 ;
}
/**
* Clean up resources used by this handler
* @ param context the context to clean up
* @ returns true ( 1 )
*/
int ipfs_journal_shutdown_handler ( void * context ) {
return 1 ;
}
/***
* Build the protocol handler struct for the Journal protocol
* @ param local_node what to stuff in the context
* @ returns the protocol handler
*/
struct Libp2pProtocolHandler * ipfs_journal_build_protocol_handler ( const struct IpfsNode * local_node ) {
struct Libp2pProtocolHandler * handler = ( struct Libp2pProtocolHandler * ) malloc ( sizeof ( struct Libp2pProtocolHandler ) ) ;
if ( handler ! = NULL ) {
handler - > context = ( void * ) local_node ;
handler - > CanHandle = ipfs_journal_can_handle ;
handler - > HandleMessage = ipfs_journal_handle_message ;
handler - > Shutdown = ipfs_journal_shutdown_handler ;
}
return handler ;
}
2017-08-24 18:30:44 +00:00
/***
* Retrieve the last n records from the journalstore
* @ param database the reference to the opened db
* @ param n the number of records to retrieve
* @ returns a vector of struct JournalRecord
*/
struct Libp2pVector * ipfs_journal_get_last ( struct Datastore * database , int n ) {
struct Libp2pVector * vector = libp2p_utils_vector_new ( 1 ) ;
if ( vector ! = NULL ) {
2017-09-04 22:10:57 +00:00
struct lmdb_trans_cursor * cursor = NULL ;
2017-09-07 23:45:09 +00:00
if ( ! lmdb_journalstore_cursor_open ( database - > datastore_context , & cursor , NULL ) ) {
2017-08-30 16:10:14 +00:00
libp2p_logger_error ( " journal " , " Unable to open a cursor for the journalstore. \n " ) ;
2017-08-24 18:30:44 +00:00
return NULL ;
2017-08-30 16:10:14 +00:00
}
2017-08-24 18:30:44 +00:00
struct JournalRecord * rec = NULL ;
2017-09-04 22:10:57 +00:00
if ( ! lmdb_journalstore_cursor_get ( cursor , CURSOR_LAST , & rec ) ) {
2017-08-30 16:10:14 +00:00
libp2p_logger_error ( " journal " , " Unable to find last record from the journalstore. \n " ) ;
2017-08-24 18:30:44 +00:00
libp2p_utils_vector_free ( vector ) ;
2017-09-07 23:45:09 +00:00
lmdb_journalstore_cursor_close ( cursor , 1 ) ;
2017-08-24 18:30:44 +00:00
return NULL ;
}
// we've got one, now start the loop
int i = 0 ;
do {
2017-08-30 16:10:14 +00:00
libp2p_logger_debug ( " journal " , " Adding record to the vector. \n " ) ;
2017-08-24 18:30:44 +00:00
libp2p_utils_vector_add ( vector , rec ) ;
2017-09-04 22:10:57 +00:00
if ( ! lmdb_journalstore_cursor_get ( cursor , CURSOR_PREVIOUS , & rec ) ) {
2017-08-24 18:30:44 +00:00
break ;
}
i + + ;
} while ( i < n ) ;
2017-08-30 16:10:14 +00:00
libp2p_logger_debug ( " journal " , " Closing journalstore cursor. \n " ) ;
2017-09-07 23:45:09 +00:00
lmdb_journalstore_cursor_close ( cursor , 1 ) ;
2017-08-30 16:10:14 +00:00
} else {
libp2p_logger_error ( " journal " , " Unable to allocate vector for ipfs_journal_get_last. \n " ) ;
2017-08-24 18:30:44 +00:00
}
return vector ;
}
int ipfs_journal_free_records ( struct Libp2pVector * records ) {
if ( records ! = NULL ) {
for ( int i = 0 ; i < records - > total ; i + + ) {
struct JournalRecord * rec = ( struct JournalRecord * ) libp2p_utils_vector_get ( records , i ) ;
2017-09-04 18:33:56 +00:00
lmdb_journal_record_free ( rec ) ;
2017-08-24 18:30:44 +00:00
}
libp2p_utils_vector_free ( records ) ;
}
return 1 ;
}
int ipfs_journal_send_message ( struct IpfsNode * node , struct Libp2pPeer * peer , struct JournalMessage * message ) {
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED )
2017-09-04 16:02:48 +00:00
libp2p_peer_connect ( & node - > identity - > private_key , peer , node - > peerstore , node - > repo - > config - > datastore , 10 ) ;
2017-08-24 18:30:44 +00:00
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED )
return 0 ;
// protobuf the message
size_t msg_size = ipfs_journal_message_encode_size ( message ) ;
uint8_t msg [ msg_size ] ;
if ( ! ipfs_journal_message_encode ( message , & msg [ 0 ] , msg_size , & msg_size ) )
return 0 ;
// send the header
2017-08-30 16:10:14 +00:00
char * header = " /ipfs/journalio/1.0.0 \n " ;
if ( ! peer - > sessionContext - > default_stream - > write ( peer - > sessionContext , ( unsigned char * ) header , strlen ( header ) ) )
2017-08-24 18:30:44 +00:00
return 0 ;
// send the message
2017-08-30 16:10:14 +00:00
return peer - > sessionContext - > default_stream - > write ( peer - > sessionContext , msg , msg_size ) ;
2017-08-24 18:30:44 +00:00
}
2017-08-24 15:08:27 +00:00
/***
* Send a journal message to a remote peer
2017-08-28 11:56:22 +00:00
* @ param replication_peer the peer to send it to
2017-08-24 15:08:27 +00:00
* @ returns true ( 1 ) on success , false ( 0 ) otherwise .
*/
2017-08-28 11:56:22 +00:00
int ipfs_journal_sync ( struct IpfsNode * local_node , struct ReplicationPeer * replication_peer ) {
2017-08-30 16:10:14 +00:00
libp2p_logger_debug ( " journal " , " Attempting replication for peer %s. \n " , libp2p_peer_id_to_string ( replication_peer - > peer ) ) ;
// get the real peer object from the peersstore
struct Libp2pPeer * peer = libp2p_peerstore_get_peer ( local_node - > peerstore , ( unsigned char * ) replication_peer - > peer - > id , replication_peer - > peer - > id_size ) ;
if ( peer = = NULL ) {
libp2p_logger_error ( " journal " , " Unable to find peer %s in peerstore. \n " , libp2p_peer_id_to_string ( replication_peer - > peer ) ) ;
return 0 ;
}
2017-08-24 15:08:27 +00:00
// make sure we're connected securely
2017-08-30 16:10:14 +00:00
if ( peer - > is_local ) {
libp2p_logger_debug ( " journal " , " Cannot replicate a local peer. \n " ) ;
2017-08-24 15:08:27 +00:00
return 0 ;
2017-08-30 16:10:14 +00:00
}
if ( peer - > sessionContext = = NULL | | peer - > sessionContext - > secure_stream = = NULL ) {
libp2p_logger_debug ( " journal " , " Cannot replicate over an insecure stream. \n " ) ;
2017-08-24 15:08:27 +00:00
return 0 ;
2017-08-30 16:10:14 +00:00
}
2017-08-24 18:30:44 +00:00
// grab the last 10? files
struct Libp2pVector * journal_records = ipfs_journal_get_last ( local_node - > repo - > config - > datastore , 10 ) ;
if ( journal_records = = NULL | | journal_records - > total = = 0 ) {
// nothing to do
2017-08-30 16:10:14 +00:00
libp2p_logger_debug ( " journal " , " There are no journal records to process. \n " ) ;
2017-09-07 23:45:09 +00:00
replication_peer - > lastConnect = os_utils_gmtime ( ) ;
2017-08-24 18:30:44 +00:00
return 1 ;
2017-08-24 15:08:27 +00:00
}
// build the message
2017-08-24 18:30:44 +00:00
struct JournalMessage * message = ipfs_journal_message_new ( ) ;
for ( int i = 0 ; i < journal_records - > total ; i + + ) {
struct JournalRecord * rec = ( struct JournalRecord * ) libp2p_utils_vector_get ( journal_records , i ) ;
if ( rec - > timestamp > message - > end_epoch )
message - > end_epoch = rec - > timestamp ;
if ( message - > start_epoch = = 0 | | rec - > timestamp < message - > start_epoch )
message - > start_epoch = rec - > timestamp ;
struct JournalEntry * entry = ipfs_journal_entry_new ( ) ;
entry - > timestamp = rec - > timestamp ;
entry - > pin = 1 ;
entry - > hash_size = rec - > hash_size ;
entry - > hash = ( uint8_t * ) malloc ( entry - > hash_size ) ;
if ( entry - > hash = = NULL ) {
// out of memory
ipfs_journal_message_free ( message ) ;
ipfs_journal_free_records ( journal_records ) ;
return 0 ;
}
memcpy ( entry - > hash , rec - > hash , entry - > hash_size ) ;
libp2p_utils_vector_add ( message - > journal_entries , entry ) ;
}
2017-08-24 15:08:27 +00:00
// send the message
2017-08-24 18:30:44 +00:00
message - > current_epoch = os_utils_gmtime ( ) ;
2017-09-07 23:45:09 +00:00
libp2p_logger_debug ( " journal " , " Sending message to %s. \n " , libp2p_peer_id_to_string ( peer ) ) ;
2017-08-30 16:10:14 +00:00
int retVal = ipfs_journal_send_message ( local_node , peer , message ) ;
2017-08-28 11:56:22 +00:00
if ( retVal ) {
replication_peer - > lastConnect = message - > current_epoch ;
replication_peer - > lastJournalTime = message - > end_epoch ;
}
2017-08-24 18:30:44 +00:00
// clean up
ipfs_journal_message_free ( message ) ;
ipfs_journal_free_records ( journal_records ) ;
return retVal ;
2017-08-24 15:08:27 +00:00
}
2017-08-28 15:55:44 +00:00
enum JournalAction { JOURNAL_ENTRY_NEEDED , JOURNAL_TIME_ADJUST , JOURNAL_REMOTE_NEEDS } ;
struct JournalToDo {
enum JournalAction action ; // what needs to be done
unsigned long long local_timestamp ; // what we have in our journal
unsigned long long remote_timestamp ; // what they have in their journal
uint8_t * hash ; // the hash
size_t hash_size ; // the size of the hash
} ;
struct JournalToDo * ipfs_journal_todo_new ( ) {
struct JournalToDo * j = ( struct JournalToDo * ) malloc ( sizeof ( struct JournalToDo ) ) ;
if ( j ! = NULL ) {
j - > action = JOURNAL_ENTRY_NEEDED ;
j - > hash = NULL ;
j - > hash_size = 0 ;
j - > local_timestamp = 0 ;
j - > remote_timestamp = 0 ;
}
return j ;
}
int ipfs_journal_todo_free ( struct JournalToDo * in ) {
if ( in ! = NULL ) {
free ( in ) ;
}
return 1 ;
}
2017-08-30 16:10:14 +00:00
/***
* Loop through the incoming message , looking for what may need to change
* @ param local_node the context
* @ param incoming the incoming JournalMessage
* @ param todo_vector a Libp2pVector that gets allocated and filled with JournalToDo structs
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-08-28 15:55:44 +00:00
int ipfs_journal_build_todo ( struct IpfsNode * local_node , struct JournalMessage * incoming , struct Libp2pVector * * todo_vector ) {
* todo_vector = libp2p_utils_vector_new ( 1 ) ;
if ( * todo_vector = = NULL )
return - 1 ;
struct Libp2pVector * todos = * todo_vector ;
// for every file in message
for ( int i = 0 ; i < incoming - > journal_entries - > total ; i + + ) {
struct JournalEntry * entry = ( struct JournalEntry * ) libp2p_utils_vector_get ( incoming - > journal_entries , i ) ;
// do we have the file?
struct DatastoreRecord * datastore_record = NULL ;
if ( ! local_node - > repo - > config - > datastore - > datastore_get ( entry - > hash , entry - > hash_size , & datastore_record , local_node - > repo - > config - > datastore ) ) {
struct JournalToDo * td = ipfs_journal_todo_new ( ) ;
td - > action = JOURNAL_ENTRY_NEEDED ;
td - > hash = entry - > hash ;
td - > hash_size = entry - > hash_size ;
td - > remote_timestamp = entry - > timestamp ;
libp2p_utils_vector_add ( todos , td ) ;
} else {
// do we need to adjust the time?
2017-09-13 17:40:48 +00:00
if ( ( datastore_record - > timestamp = = 0 & & entry - > timestamp ! = 0 ) | |
( entry - > timestamp ! = 0 & & entry - > timestamp < datastore_record - > timestamp ) ) {
2017-08-28 15:55:44 +00:00
struct JournalToDo * td = ipfs_journal_todo_new ( ) ;
td - > action = JOURNAL_TIME_ADJUST ;
td - > hash = entry - > hash ;
td - > hash_size = entry - > hash_size ;
td - > local_timestamp = datastore_record - > timestamp ;
td - > remote_timestamp = entry - > timestamp ;
libp2p_utils_vector_add ( todos , td ) ;
}
}
libp2p_datastore_record_free ( datastore_record ) ;
}
// TODO: get all files of same second
// are they perhaps missing something?
//struct Libp2pVector* local_records_for_second;
return 0 ;
}
2017-09-13 17:40:48 +00:00
/***
* Adjust the time in the journal
* @ param todo the JournalToDo struct that contains the new time
* @ param local_node the context
* @ returns true ( 1 ) if success , or if no change was needed , false ( 0 ) if there was an error
*/
int ipfs_journal_adjust_time ( struct JournalToDo * todo , struct IpfsNode * local_node ) {
// grab the datastore record
struct DatastoreRecord * datastore_record = NULL ;
if ( ! local_node - > repo - > config - > datastore - > datastore_get ( todo - > hash , todo - > hash_size , & datastore_record , local_node - > repo - > config - > datastore ) ) {
// did not find the record
libp2p_logger_error ( " journal " , " Attempted time_adjust, but could not find the hash. \n " ) ;
return 0 ;
}
// record found
if ( todo - > remote_timestamp ! = 0 ) {
if ( datastore_record = = 0 | | datastore_record - > timestamp > todo - > remote_timestamp ) {
datastore_record - > timestamp = todo - > remote_timestamp ;
} else {
// we don't need to change the time
return 1 ;
}
}
if ( ! local_node - > repo - > config - > datastore - > datastore_put ( datastore_record , local_node - > repo - > config - > datastore ) ) {
libp2p_logger_error ( " journal " , " Attempted time_adjust put, but failed. \n " ) ;
return 0 ;
}
return 1 ;
}
2017-08-28 15:55:44 +00:00
/***
* Handles a message
* @ param incoming the message
* @ param incoming_size the size of the message
* @ param session_context details of the remote peer
* @ param protocol_context in this case , an IpfsNode
* @ returns 0 if the caller should not continue looping , < 0 on error , > 0 on success
*/
int ipfs_journal_handle_message ( const uint8_t * incoming , size_t incoming_size , struct SessionContext * session_context , void * protocol_context ) {
2017-08-30 16:10:14 +00:00
// remove protocol
uint8_t * incoming_pos = ( uint8_t * ) incoming ;
size_t pos_size = incoming_size ;
int second_read = 0 ;
for ( int i = 0 ; i < incoming_size ; i + + ) {
if ( incoming [ i ] = = ' \n ' ) {
if ( incoming_size > i + 1 ) {
incoming_pos = ( uint8_t * ) & incoming [ i + 1 ] ;
pos_size = incoming_size - i ;
break ;
} else {
// read next segment from network
if ( ! session_context - > default_stream - > read ( session_context , & incoming_pos , & pos_size , 10 ) )
return - 1 ;
second_read = 1 ;
}
}
}
libp2p_logger_debug ( " journal " , " Handling incoming message from %s. \n " , session_context - > remote_peer_id ) ;
2017-08-28 15:55:44 +00:00
struct IpfsNode * local_node = ( struct IpfsNode * ) protocol_context ;
// un-protobuf the message
struct JournalMessage * message = NULL ;
2017-08-30 16:10:14 +00:00
if ( ! ipfs_journal_message_decode ( incoming_pos , pos_size , & message ) )
2017-08-28 15:55:44 +00:00
return - 1 ;
// see if the remote's time is within 5 minutes of now
unsigned long long start_time = os_utils_gmtime ( ) ;
long long our_time_diff = start_time - message - > current_epoch ;
// NOTE: If our_time_diff is negative, the remote's clock is faster than ours.
// if it is positive, our clock is faster than theirs.
if ( llabs ( our_time_diff ) > 300 ) {
libp2p_logger_error ( " journal " , " The clock of peer %s is out of 5 minute range. Seconds difference: %llu " , session_context - > remote_peer_id , our_time_diff ) ;
2017-08-30 16:10:14 +00:00
if ( second_read ) {
free ( incoming_pos ) ;
}
2017-09-07 23:45:09 +00:00
ipfs_journal_message_free ( message ) ;
2017-08-28 15:55:44 +00:00
return - 1 ;
}
struct Libp2pVector * todo_vector = NULL ;
ipfs_journal_build_todo ( local_node , message , & todo_vector ) ;
2017-08-30 16:10:14 +00:00
// loop through todo items, and do the right thing
for ( int i = 0 ; i < todo_vector - > total ; i + + ) {
struct JournalToDo * curr = ( struct JournalToDo * ) libp2p_utils_vector_get ( todo_vector , i ) ;
switch ( curr - > action ) {
case ( JOURNAL_ENTRY_NEEDED ) : {
// go get a file
struct Block * block = NULL ;
2017-09-04 18:33:56 +00:00
struct Cid * cid = ipfs_cid_new ( 0 , curr - > hash , curr - > hash_size , CID_DAG_PROTOBUF ) ;
2017-08-31 11:41:54 +00:00
if ( local_node - > exchange - > GetBlockAsync ( local_node - > exchange , cid , & block ) ) {
2017-09-13 17:40:48 +00:00
// set timestamp (if we got the block already, but we probably didn't)
if ( block ! = NULL )
ipfs_journal_adjust_time ( curr , local_node ) ;
2017-08-30 16:10:14 +00:00
}
ipfs_cid_free ( cid ) ;
ipfs_block_free ( block ) ;
}
break ;
case ( JOURNAL_TIME_ADJUST ) : {
2017-09-13 17:40:48 +00:00
ipfs_journal_adjust_time ( curr , local_node ) ;
2017-08-30 16:10:14 +00:00
}
break ;
case ( JOURNAL_REMOTE_NEEDS ) : {
}
break ;
}
}
//TODO: set new values in their ReplicationPeer struct
2017-09-07 23:45:09 +00:00
ipfs_journal_message_free ( message ) ;
2017-08-30 16:10:14 +00:00
if ( second_read )
free ( incoming_pos ) ;
2017-08-28 15:55:44 +00:00
return 1 ;
}