diff --git a/core/ipfs_node.c b/core/ipfs_node.c index b157adb..dc262b1 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -80,6 +80,54 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) { return 1; } +/*** + * build an offline IpfsNode + * @param repo_path where the IPFS repository directory is + * @param node the completed IpfsNode struct + * @returns true(1) on success + */ +int ipfs_node_offline_new(const char* repo_path, struct IpfsNode** node) { + struct FSRepo* fs_repo = NULL; + + *node = (struct IpfsNode*)malloc(sizeof(struct IpfsNode)); + if(*node == NULL) + return 0; + + struct IpfsNode* local_node = *node; + local_node->identity = NULL; + local_node->peerstore = NULL; + local_node->providerstore = NULL; + local_node->repo = NULL; + local_node->routing = NULL; + local_node->exchange = NULL; + + // build the struct + if (!ipfs_repo_fsrepo_new(repo_path, NULL, &fs_repo)) { + ipfs_node_free(local_node); + *node = NULL; + return 0; + } + // open the repo + if (!ipfs_repo_fsrepo_open(fs_repo)) { + ipfs_node_free(local_node); + *node = NULL; + return 0; + } + + // fill in the node + local_node->repo = fs_repo; + local_node->identity = fs_repo->config->identity; + local_node->peerstore = libp2p_peerstore_new(local_node->identity->peer); + local_node->providerstore = libp2p_providerstore_new(fs_repo->config->datastore, local_node->identity->peer); + local_node->blockstore = ipfs_blockstore_new(fs_repo); + local_node->protocol_handlers = ipfs_node_online_build_protocol_handlers(local_node); + local_node->mode = MODE_OFFLINE; + local_node->routing = ipfs_routing_new_offline(local_node, &fs_repo->config->identity->private_key); + local_node->exchange = ipfs_bitswap_new(local_node); + + return 1; +} + /*** * Free resources from the creation of an IpfsNode * @param node the node to free diff --git a/core/null.c b/core/null.c index 8b4e553..c70c6bd 100644 --- a/core/null.c +++ b/core/null.c @@ -130,30 +130,39 @@ void ipfs_null_connection (void *ptr) { } int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* peer) { - if (peer == NULL) + libp2p_logger_debug("null", "Attempting maintenance on peer %s.\n", libp2p_peer_id_to_string(peer)); + if (peer == NULL) { + libp2p_logger_debug("null", "No maintenance ran on NULL node.\n"); return 0; - if (peer->is_local) + } + if (peer->is_local) { + libp2p_logger_debug("null", "No maintenance ran on local node.\n"); return 1; + } // Is this peer one of our backup partners? struct ReplicationPeer* replication_peer = repo_config_get_replication_peer(local_node->repo->config->replication, peer); long long announce_secs = local_node->repo->config->replication->announce_minutes * 60; // If so, has there been enough time since the last attempt a backup? if (replication_peer != NULL) { - announce_secs -= os_utils_gmtime() - repo_config_replication_last_attempt(local_node->repo->config->replication, peer); + announce_secs -= os_utils_gmtime() - replication_peer->lastConnect; + libp2p_logger_debug("null", "Found replication peer. Announce secs are %lld.\n", announce_secs); } // should we attempt to connect if we're not already? if (replication_peer != NULL && announce_secs < 0) { // try to connect if we aren't already if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 10)) { + if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 2)) { return 0; } } // attempt a backup, don't forget to reset timer + libp2p_logger_debug("null", "Attempting a sync of node %s.\n", peer->id); ipfs_journal_sync(local_node, replication_peer); } else { // try a ping, but only if we're connected + libp2p_logger_debug("null", "Not replicating, attempt ping of %s.\n", peer->id); if (peer->connection_type == CONNECTION_TYPE_CONNECTED && !local_node->routing->Ping(local_node->routing, peer)) { + libp2p_logger_debug("null", "Attempted ping of %s failed.\n", peer->id); peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; } } diff --git a/importer/exporter.c b/importer/exporter.c index 51fe6d3..178cb3b 100644 --- a/importer/exporter.c +++ b/importer/exporter.c @@ -26,7 +26,7 @@ int ipfs_exporter_get_node(struct IpfsNode* local_node, const unsigned char* has unsigned char *buffer = NULL; size_t buffer_size = 0; int retVal = 0; - struct Libp2pMessage* msg = NULL; + struct KademliaMessage* msg = NULL; if (local_node->routing->GetValue(local_node->routing, hash, hash_size, (void**)&buffer, &buffer_size)) { libp2p_logger_debug("exporter", "get_node got a value. Converting it to a HashtableNode\n"); diff --git a/importer/resolver.c b/importer/resolver.c index 0beec66..6049a92 100644 --- a/importer/resolver.c +++ b/importer/resolver.c @@ -137,7 +137,7 @@ struct HashtableNode* ipfs_resolver_remote_get(const char* path, struct Hashtabl struct Stream* stream = libp2p_net_multistream_connect(ip, port); free(ip); // build the request - struct Libp2pMessage* message = libp2p_message_new(); + struct KademliaMessage* message = libp2p_message_new(); message->message_type = MESSAGE_TYPE_GET_VALUE; message->key = key; message->key_size = strlen(key); diff --git a/include/ipfs/core/ipfs_node.h b/include/ipfs/core/ipfs_node.h index 3a2cff6..83c45d6 100644 --- a/include/ipfs/core/ipfs_node.h +++ b/include/ipfs/core/ipfs_node.h @@ -32,6 +32,15 @@ struct IpfsNode { * @returns true(1) on success */ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node); + +/*** + * build an offline IpfsNode + * @param repo_path where the IPFS repository directory is + * @param node the completed IpfsNode struct + * @returns true(1) on success + */ +int ipfs_node_offline_new(const char* repo_path, struct IpfsNode** node); + /*** * Free resources from the creation of an IpfsNode * @param node the node to free diff --git a/include/ipfs/repo/fsrepo/lmdb_datastore.h b/include/ipfs/repo/fsrepo/lmdb_datastore.h index 66b467f..88d31c0 100644 --- a/include/ipfs/repo/fsrepo/lmdb_datastore.h +++ b/include/ipfs/repo/fsrepo/lmdb_datastore.h @@ -3,9 +3,6 @@ #include "lmdb.h" #include "libp2p/db/datastore.h" -static const char* DATASTORE_DB = "DATASTORE"; -static const char* JOURNAL_DB = "JOURNAL"; - struct lmdb_trans_cursor { MDB_txn* transaction; MDB_cursor* cursor; diff --git a/journal/journal.c b/journal/journal.c index 1aeb5ce..c85c697 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -18,9 +18,10 @@ int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) { if (incoming_size < 8) return 0; - char* result = strstr((char*)incoming, "/ipfs/journal/1.0.0"); + char* result = strstr((char*)incoming, "/ipfs/journalio/1.0.0"); if(result == NULL || result != (char*)incoming) return 0; + libp2p_logger_debug("journal", "Handling incoming message.\n"); return 1; } @@ -58,11 +59,14 @@ struct Libp2pProtocolHandler* ipfs_journal_build_protocol_handler(const struct I struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { struct Libp2pVector* vector = libp2p_utils_vector_new(1); if (vector != NULL) { - void* cursor; - if (!repo_journalstore_cursor_open(database, &cursor)) + void* cursor = NULL; + if (!repo_journalstore_cursor_open(database, &cursor)) { + libp2p_logger_error("journal", "Unable to open a cursor for the journalstore.\n"); return NULL; + } struct JournalRecord* rec = NULL; if (!repo_journalstore_cursor_get(database, cursor, CURSOR_LAST, &rec)) { + libp2p_logger_error("journal", "Unable to find last record from the journalstore.\n"); libp2p_utils_vector_free(vector); repo_journalstore_cursor_close(database, cursor); return NULL; @@ -70,13 +74,17 @@ struct Libp2pVector* ipfs_journal_get_last(struct Datastore* database, int n) { // we've got one, now start the loop int i = 0; do { + libp2p_logger_debug("journal", "Adding record to the vector.\n"); libp2p_utils_vector_add(vector, rec); if (!repo_journalstore_cursor_get(database, cursor, CURSOR_PREVIOUS, &rec)) { break; } i++; } while(i < n); + libp2p_logger_debug("journal", "Closing journalstore cursor.\n"); repo_journalstore_cursor_close(database, cursor); + } else { + libp2p_logger_error("journal", "Unable to allocate vector for ipfs_journal_get_last.\n"); } return vector; } @@ -103,11 +111,11 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st if (!ipfs_journal_message_encode(message, &msg[0], msg_size, &msg_size)) return 0; // send the header - char* header = "/ipfs/journalio/1.0.0/n"; - if (!peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, (unsigned char*)header, strlen(header))) + char* header = "/ipfs/journalio/1.0.0\n"; + if (!peer->sessionContext->default_stream->write(peer->sessionContext, (unsigned char*)header, strlen(header))) return 0; // send the message - return peer->sessionContext->default_stream->write(peer->sessionContext->default_stream, msg, msg_size); + return peer->sessionContext->default_stream->write(peer->sessionContext, msg, msg_size); } /*** @@ -116,16 +124,28 @@ int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, st * @returns true(1) on success, false(0) otherwise. */ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* replication_peer) { + libp2p_logger_debug("journal", "Attempting replication for peer %s.\n", libp2p_peer_id_to_string(replication_peer->peer)); + // get the real peer object from the peersstore + struct Libp2pPeer *peer = libp2p_peerstore_get_peer(local_node->peerstore, (unsigned char*)replication_peer->peer->id, replication_peer->peer->id_size); + if (peer == NULL) { + libp2p_logger_error("journal", "Unable to find peer %s in peerstore.\n", libp2p_peer_id_to_string(replication_peer->peer)); + return 0; + } // make sure we're connected securely - if (replication_peer->peer->is_local) + if (peer->is_local) { + libp2p_logger_debug("journal", "Cannot replicate a local peer.\n"); return 0; - if (replication_peer->peer->sessionContext->secure_stream == NULL) + } + if (peer->sessionContext == NULL || peer->sessionContext->secure_stream == NULL) { + libp2p_logger_debug("journal", "Cannot replicate over an insecure stream.\n"); return 0; + } // grab the last 10? files struct Libp2pVector* journal_records = ipfs_journal_get_last(local_node->repo->config->datastore, 10); if (journal_records == NULL || journal_records->total == 0) { // nothing to do + libp2p_logger_debug("journal", "There are no journal records to process.\n"); return 1; } // build the message @@ -152,7 +172,8 @@ int ipfs_journal_sync(struct IpfsNode* local_node, struct ReplicationPeer* repli } // send the message message->current_epoch = os_utils_gmtime(); - int retVal = ipfs_journal_send_message(local_node, replication_peer->peer, message); + libp2p_logger_debug("journal", "Sending message to %s.\n", peer->id); + int retVal = ipfs_journal_send_message(local_node, peer, message); if (retVal) { replication_peer->lastConnect = message->current_epoch; replication_peer->lastJournalTime = message->end_epoch; @@ -193,6 +214,13 @@ int ipfs_journal_todo_free(struct JournalToDo *in) { return 1; } +/*** + * Loop through the incoming message, looking for what may need to change + * @param local_node the context + * @param incoming the incoming JournalMessage + * @param todo_vector a Libp2pVector that gets allocated and filled with JournalToDo structs + * @returns true(1) on success, false(0) otherwise + */ int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage* incoming, struct Libp2pVector** todo_vector) { *todo_vector = libp2p_utils_vector_new(1); if (*todo_vector == NULL) @@ -239,10 +267,29 @@ int ipfs_journal_build_todo(struct IpfsNode* local_node, struct JournalMessage* * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { + // remove protocol + uint8_t *incoming_pos = (uint8_t*) incoming; + size_t pos_size = incoming_size; + int second_read = 0; + for(int i = 0; i < incoming_size; i++) { + if (incoming[i] == '\n') { + if (incoming_size > i + 1) { + incoming_pos = (uint8_t *)&incoming[i+1]; + pos_size = incoming_size - i; + break; + } else { + // read next segment from network + if (!session_context->default_stream->read(session_context, &incoming_pos, &pos_size, 10)) + return -1; + second_read = 1; + } + } + } + libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id); struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; // un-protobuf the message struct JournalMessage* message = NULL; - if (!ipfs_journal_message_decode(incoming, incoming_size, &message)) + if (!ipfs_journal_message_decode(incoming_pos, pos_size, &message)) return -1; // see if the remote's time is within 5 minutes of now unsigned long long start_time = os_utils_gmtime(); @@ -251,17 +298,42 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s // if it is positive, our clock is faster than theirs. if ( llabs(our_time_diff) > 300) { libp2p_logger_error("journal", "The clock of peer %s is out of 5 minute range. Seconds difference: %llu", session_context->remote_peer_id, our_time_diff); + if (second_read) { + free(incoming_pos); + } return -1; } - // TODO: get our records for the same period - // TODO: compare the two sets of records - // we will build a list of todo items: - // ask for files - // adjust time on files - // notify remote that we have files that they probably do not have struct Libp2pVector* todo_vector = NULL; ipfs_journal_build_todo(local_node, message, &todo_vector); - // set new values in their ReplicationPeer struct + // loop through todo items, and do the right thing + for(int i = 0; i < todo_vector->total; i++) { + struct JournalToDo *curr = (struct JournalToDo*) libp2p_utils_vector_get(todo_vector, i); + switch (curr->action) { + case (JOURNAL_ENTRY_NEEDED): { + // go get a file + struct Block* block = NULL; + struct Cid* cid = ipfs_cid_new(0, curr->hash, curr->hash_size, CID_PROTOBUF); + if (local_node->exchange->GetBlock(local_node->exchange, cid, &block)) { + // set timestamp + } + ipfs_cid_free(cid); + ipfs_block_free(block); + } + break; + case (JOURNAL_TIME_ADJUST): { + + } + break; + case (JOURNAL_REMOTE_NEEDS): { + + } + break; + } + } + //TODO: set new values in their ReplicationPeer struct + + if (second_read) + free(incoming_pos); return 1; } diff --git a/repo/config/replication.c b/repo/config/replication.c index 337937d..d492b2b 100644 --- a/repo/config/replication.c +++ b/repo/config/replication.c @@ -2,6 +2,7 @@ #include "multiaddr/multiaddr.h" #include "libp2p/utils/linked_list.h" +#include "libp2p/utils/logger.h" #include "ipfs/repo/config/replication.h" /** @@ -79,8 +80,9 @@ struct ReplicationPeer* repo_config_get_replication_peer(struct Replication* rep if (replication->replication_peers != NULL) { for(int i = 0; i < replication->replication_peers->total; i++) { struct ReplicationPeer* confAddr = (struct ReplicationPeer*) libp2p_utils_vector_get(replication->replication_peers, i); - if (libp2p_peer_compare(confAddr->peer, key) == 0) + if (libp2p_peer_matches_id(confAddr->peer, (unsigned char*)key->id, key->id_size)) { return confAddr; + } } } } @@ -108,6 +110,7 @@ int repo_config_replication_approved_node(struct Replication* replication, struc * @param peer the peer to examine * @returns the time since the last replication, or the announce time if we have no record */ +/* unsigned long long repo_config_replication_last_attempt(struct Replication* replication, struct Libp2pPeer* peer) { struct ReplicationPeer* rp = repo_config_get_replication_peer(replication, peer); if (rp != NULL) { @@ -115,6 +118,7 @@ unsigned long long repo_config_replication_last_attempt(struct Replication* repl } return 0; } +*/ /*** * Determine the last journal record time that was sent to this peer diff --git a/repo/fsrepo/fs_repo.c b/repo/fsrepo/fs_repo.c index 105b6f3..aa0a25b 100644 --- a/repo/fsrepo/fs_repo.c +++ b/repo/fsrepo/fs_repo.c @@ -1,6 +1,7 @@ #include #include +#include "libp2p/utils/logger.h" #include "libp2p/crypto/encoding/base64.h" #include "libp2p/crypto/key.h" #include "libp2p/peer/peer.h" @@ -486,13 +487,14 @@ int fs_repo_open_config(struct FSRepo* repo) { // nodes list int nodes_pos = _find_token(data, tokens, num_tokens, curr_pos, "Peers"); if (nodes_pos >= 0) { + nodes_pos++; if (tokens[nodes_pos].type == JSMN_ARRAY) { int nodes_size = tokens[nodes_pos].size; repo->config->replication->replication_peers = libp2p_utils_vector_new(nodes_size); nodes_pos++; for(int i = 0; i < nodes_size; i++) { char* val = NULL; - if (!_get_json_string_value(data, tokens, num_tokens, nodes_pos + i, NULL, &val)) + if (!_get_json_string_value(data, tokens, num_tokens, nodes_pos, NULL, &val)) break; struct MultiAddress* cur = multiaddress_new_from_string(val); if (cur == NULL) @@ -501,10 +503,15 @@ int fs_repo_open_config(struct FSRepo* repo) { struct Libp2pPeer* peer = libp2p_peer_new_from_multiaddress(cur); struct ReplicationPeer* rp = repo_config_replication_peer_new(); rp->peer = peer; + libp2p_logger_debug("fs_repo", "Adding %s to replication_peers.\n", libp2p_peer_id_to_string(rp->peer)); libp2p_utils_vector_add(repo->config->replication->replication_peers, rp); free(val); } + } else { + libp2p_logger_debug("fs_repo", "Replication|Peers is not an array.\n"); } + } else { + libp2p_logger_debug("fs_repo", "No replication peers found.\n"); } } // free the memory used reading the json file diff --git a/repo/fsrepo/lmdb_datastore.c b/repo/fsrepo/lmdb_datastore.c index adc6c2b..8a474dc 100644 --- a/repo/fsrepo/lmdb_datastore.c +++ b/repo/fsrepo/lmdb_datastore.c @@ -101,7 +101,7 @@ int repo_fsrepo_lmdb_get(const unsigned char* key, size_t key_size, struct Datas if (mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn) != 0) return 0; - if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { mdb_txn_commit(mdb_txn); return 0; } @@ -150,7 +150,7 @@ int repo_fsrepo_lmdb_put(unsigned const char* key, size_t key_size, unsigned cha retVal = mdb_txn_begin(mdb_env, NULL, 0, &mdb_txn); if (retVal != 0) return 0; - retVal = mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi); + retVal = mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi); if (retVal != 0) return 0; @@ -247,7 +247,7 @@ int repo_fsrepo_lmdb_cursor_open(struct Datastore* datastore) { if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) return 0; MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; - if (mdb_dbi_open(mdb_txn, DATASTORE_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + if (mdb_dbi_open(mdb_txn, "DATASTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { mdb_txn_commit(mdb_txn); return 0; } diff --git a/repo/fsrepo/lmdb_journalstore.c b/repo/fsrepo/lmdb_journalstore.c index 57363a8..45e18d2 100644 --- a/repo/fsrepo/lmdb_journalstore.c +++ b/repo/fsrepo/lmdb_journalstore.c @@ -1,9 +1,11 @@ +#include + +#include "varint.h" +#include "lmdb.h" +#include "libp2p/utils/logger.h" #include "ipfs/repo/fsrepo/journalstore.h" #include "ipfs/repo/fsrepo/lmdb_datastore.h" -#include "lmdb.h" -#include "varint.h" -#include int journal_record_free(struct JournalRecord* rec) { if (rec != NULL) { @@ -40,7 +42,7 @@ int lmdb_journalstore_journal_add(MDB_txn* mdb_txn, unsigned long long timestamp // open the journal table - if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { return 0; } @@ -72,10 +74,13 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) { *crsr = malloc(sizeof(struct lmdb_trans_cursor)); struct lmdb_trans_cursor* cursor = (struct lmdb_trans_cursor*)*crsr; // open transaction - if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) + if (mdb_txn_begin(mdb_env, NULL, 0, &cursor->transaction) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to start a transaction.\n"); return 0; + } MDB_txn* mdb_txn = (MDB_txn*)cursor->transaction; - if (mdb_dbi_open(mdb_txn, JOURNAL_DB, MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + if (mdb_dbi_open(mdb_txn, "JOURNALSTORE", MDB_DUPSORT | MDB_CREATE, &mdb_dbi) != 0) { + libp2p_logger_error("lmdb_journalstore", "Unable to open the dbi to the journalstore"); mdb_txn_commit(mdb_txn); return 0; } @@ -85,7 +90,11 @@ int repo_journalstore_cursor_open(struct Datastore* datastore, void** crsr) { return 0; } return 1; + } else { + libp2p_logger_error("lmdb_journalstore", "Attempted to open a new cursor but there is something already there.\n"); } + } else { + libp2p_logger_error("lmdb_journalstore", "Unable to open cursor on null db handle.\n"); } return 0; diff --git a/routing/online.c b/routing/online.c index a187ea4..fa82d71 100644 --- a/routing/online.c +++ b/routing/online.c @@ -1,4 +1,5 @@ #include +#include #include "ipfs/routing/routing.h" #include "ipfs/core/null.h" @@ -20,10 +21,11 @@ * @param message what to send * @returns what was received */ -struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionContext* sessionContext, struct Libp2pMessage* message) { +struct KademliaMessage* ipfs_routing_online_send_receive_message(struct SessionContext* sessionContext, struct KademliaMessage* message) { size_t protobuf_size = 0, results_size = 0; unsigned char* protobuf = NULL, *results = NULL; - struct Libp2pMessage* return_message = NULL; + struct KademliaMessage* return_message = NULL; + //unsigned char* protocol = (unsigned char*)"/ipfs/kad/1.0.0\n"; protobuf_size = libp2p_message_protobuf_encode_size(message); protobuf = (unsigned char*)malloc(protobuf_size); @@ -35,6 +37,10 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon goto exit; } + //if (!sessionContext->default_stream->write(sessionContext, protocol, strlen( (char*)protocol))) { + // libp2p_logger_error("online", "Unable to switch to kademlia protocol.\n"); + //} + // send the message, and expect the same back if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) { libp2p_logger_error("online", "Attempted to write to Kademlia stream, but could not.\n"); @@ -45,10 +51,30 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon } if (results_size == 0) { - libp2p_logger_error("online", "reading kademlia response returned nothing.\n"); + libp2p_logger_error("online", "reading kademlia header response returned nothing.\n"); goto exit; } + // remove protocol + uint8_t *pos = &results[0]; + size_t pos_length = results_size; + int results_max = fmin(results_size, 30); + for (int i = 0; i < results_max; i++) { + if (results[i] == '\n') { + if (i < results_size - 1) { + // there's more left + pos = &results[i]; + pos_length = results_size - i; + } else { + // we've run out of buffer. See if we have more on the network. + if (!sessionContext->default_stream->read(sessionContext, &results, &results_size, 5)) { + // we don't have more. This is a problem. + libp2p_logger_error("online", "Reading kademlia response returned nothing.\n"); + } + } + } + } + // see if we can unprotobuf if (!libp2p_message_protobuf_decode(results, results_size, &return_message)) { libp2p_logger_error("online", "Received kademlia response, but cannot decode it.\n"); @@ -75,7 +101,7 @@ struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionCon int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const unsigned char* key, size_t key_size, struct Libp2pVector** peers) { int found = 0; // build the message to be transmitted - struct Libp2pMessage* message = libp2p_message_new(); + struct KademliaMessage* message = libp2p_message_new(); message->message_type = MESSAGE_TYPE_GET_PROVIDERS; message->key_size = key_size; message->key = malloc(message->key_size); @@ -87,7 +113,7 @@ int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { // Ask for hash, if it has it, break out of the loop and stop looking libp2p_logger_debug("online", "FindRemoteProviders: Asking for who can provide\n"); - struct Libp2pMessage* return_message = ipfs_routing_online_send_receive_message(peer->sessionContext, message); + struct KademliaMessage* return_message = ipfs_routing_online_send_receive_message(peer->sessionContext, message); if (return_message != NULL && return_message->provider_peer_head != NULL) { libp2p_logger_debug("online", "FindRemoteProviders: Return value is not null\n"); found = 1; @@ -163,7 +189,7 @@ int ipfs_routing_online_find_providers(struct IpfsRouting* routing, const unsign */ int ipfs_routing_online_ask_peer_for_peer(struct Libp2pPeer* whoToAsk, const unsigned char* peer_id, size_t peer_id_size, struct Libp2pPeer **result) { int retVal = 0; - struct Libp2pMessage *message = NULL, *return_message = NULL; + struct KademliaMessage *message = NULL, *return_message = NULL; if (whoToAsk->connection_type == CONNECTION_TYPE_CONNECTED) { message = libp2p_message_new(); @@ -261,7 +287,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char struct Libp2pPeer* local_peer = ipfs_routing_online_build_local_peer(routing); // create the message - struct Libp2pMessage* msg = libp2p_message_new(); + struct KademliaMessage* msg = libp2p_message_new(); msg->key_size = key_size; msg->key = malloc(msg->key_size); memcpy(msg->key, key, msg->key_size); @@ -280,7 +306,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char // notify everyone we're connected to if (current_peer->connection_type == CONNECTION_TYPE_CONNECTED) { // ignoring results is okay this time - struct Libp2pMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->sessionContext, msg); + struct KademliaMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->sessionContext, msg); if (rslt != NULL) libp2p_message_free(rslt); } @@ -301,7 +327,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char * @returns true(1) on success, otherwise false(0) */ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) { - struct Libp2pMessage *outMsg = NULL, *inMsg = NULL; + struct KademliaMessage *outMsg = NULL, *inMsg = NULL; int retVal = 0; if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { @@ -348,14 +374,14 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee */ int ipfs_routing_online_get_peer_value(ipfs_routing* routing, const struct Libp2pPeer* peer, const unsigned char* key, size_t key_size, void** buffer, size_t *buffer_size) { // build message - struct Libp2pMessage* msg = libp2p_message_new(); + struct KademliaMessage* msg = libp2p_message_new(); msg->key_size = key_size; msg->key = malloc(msg->key_size); memcpy(msg->key, key, msg->key_size); msg->message_type = MESSAGE_TYPE_GET_VALUE; // send message and receive results - struct Libp2pMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->sessionContext, msg); + struct KademliaMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->sessionContext, msg); libp2p_message_free(msg); if (ret_msg == NULL) @@ -500,7 +526,9 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { return -1; // this should never happen } if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) - libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 5); + if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 2)) { + libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer)); + } } } } diff --git a/test/config.test1 b/test/config.test1 index e51b19d..9336e9f 100644 --- a/test/config.test1 +++ b/test/config.test1 @@ -39,6 +39,7 @@ "ResolveCacheSize": 128 }, "Bootstrap": [ + "/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5" ], "Tour": { "Last": "" @@ -68,9 +69,9 @@ "AddrFilters": null }, "Replication": { - "AnnounceMinutes": "60", - "Nodes": [ - "/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5" + "AnnounceMinutes": 60, + "Peers": [ + "/ip4/127.0.0.1/tcp/4002/ipfs/QmcDW1t4QQBGAs2HSig8xkUhPxFZzmewAeFdsmp6q6nyY5" ] } -} +} \ No newline at end of file diff --git a/test/config.test2 b/test/config.test2 index d1abdcf..254aa2e 100644 --- a/test/config.test2 +++ b/test/config.test2 @@ -39,6 +39,7 @@ "ResolveCacheSize": 128 }, "Bootstrap": [ + "/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4" ], "Tour": { "Last": "" @@ -68,9 +69,9 @@ "AddrFilters": null }, "Replication": { - "AnnounceMinutes": "60", - "Nodes": [ - "/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4" + "AnnounceMinutes": 60, + "Peers": [ + "/ip4/127.0.0.1/tcp/4001/ipfs/QmZVoAZGFfinB7MQQiDzB84kWaDPQ95GLuXdemJFM2r9b4" ] } } \ No newline at end of file diff --git a/test/core/test_ping.h b/test/core/test_ping.h index 6479eaf..930ef50 100644 --- a/test/core/test_ping.h +++ b/test/core/test_ping.h @@ -19,7 +19,7 @@ int test_ping() { int retVal = 0; struct FSRepo* fs_repo = NULL; - struct Libp2pMessage* message = NULL; + struct KademliaMessage* message = NULL; //struct IpfsNode local_node; struct Libp2pPeer* remote_peer = NULL; struct Dialer* dialer = NULL; diff --git a/test/journal/test_journal.h b/test/journal/test_journal.h index 3883f0a..7b59678 100644 --- a/test/journal/test_journal.h +++ b/test/journal/test_journal.h @@ -82,6 +82,16 @@ int test_journal_server_1() { struct FSRepo* fs_repo = NULL; libp2p_logger_add_class("test_journal"); + libp2p_logger_add_class("journal"); + libp2p_logger_add_class("daemon"); + libp2p_logger_add_class("online"); + libp2p_logger_add_class("peer"); + //libp2p_logger_add_class("null"); + libp2p_logger_add_class("replication"); + libp2p_logger_add_class("fs_repo"); + libp2p_logger_add_class("lmdb_journalstore"); + libp2p_logger_add_class("secio"); + libp2p_logger_add_class("socket"); if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) { ipfs_repo_fsrepo_free(fs_repo); @@ -93,9 +103,24 @@ int test_journal_server_1() { ipfs_repo_fsrepo_free(fs_repo); + // add some files to the datastore + uint8_t *bytes = (unsigned char*)"hello, world!\n"; + char* filename = "test1.txt"; + create_file(filename, bytes, strlen((char*)bytes)); + struct HashtableNode* node; + size_t bytes_written; + struct IpfsNode *local_node = NULL; + ipfs_node_offline_new(ipfs_path, &local_node); + ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); + ipfs_node_free(local_node); + + libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n"); + pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); thread_started = 1; + sleep(45); + retVal = 1; exit: ipfs_daemon_stop(); @@ -118,6 +143,15 @@ int test_journal_server_2() { struct FSRepo* fs_repo = NULL; libp2p_logger_add_class("test_journal"); + libp2p_logger_add_class("journal"); + libp2p_logger_add_class("daemon"); + libp2p_logger_add_class("online"); + libp2p_logger_add_class("peer"); + //libp2p_logger_add_class("null"); + libp2p_logger_add_class("replication"); + libp2p_logger_add_class("fs_repo"); + libp2p_logger_add_class("lmdb_journalstore"); + libp2p_logger_add_class("secio"); if (!drop_build_open_repo(ipfs_path, &fs_repo, config_file)) { ipfs_repo_fsrepo_free(fs_repo); @@ -129,9 +163,13 @@ int test_journal_server_2() { ipfs_repo_fsrepo_free(fs_repo); + libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n"); + pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); thread_started = 1; + sleep(30); + retVal = 1; exit: ipfs_daemon_stop();