From f9bec0ac206ace9c318f539f39342f891b36e498 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 23 Oct 2017 15:22:12 -0500 Subject: [PATCH] New way of swarm connection --- Makefile | 2 +- core/http_request.c | 2 +- core/null.c | 34 ++++++++++++++------------- exchange/bitswap/bitswap.c | 11 +++++---- exchange/bitswap/engine.c | 2 +- exchange/bitswap/network.c | 2 +- exchange/bitswap/peer_request_queue.c | 2 +- include/ipfs/core/ipfs_node.h | 1 + include/ipfs/journal/journal.h | 4 ++-- journal/journal.c | 29 +++++++++++------------ namesys/publisher.c | 2 +- routing/offline.c | 2 +- routing/online.c | 6 ++--- test/routing/test_routing.h | 2 +- 14 files changed, 52 insertions(+), 49 deletions(-) diff --git a/Makefile b/Makefile index 1507920..c08e474 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ DEBUG = true export DEBUG all: - cd ../c-libp2p; make all; + #cd ../c-libp2p; make all; cd blocks; make all; cd cid; make all; cd cmd; make all; diff --git a/core/http_request.c b/core/http_request.c index 5d81eff..dd5cfd3 100644 --- a/core/http_request.c +++ b/core/http_request.c @@ -306,7 +306,7 @@ int ipfs_core_http_process_swarm_connect(struct IpfsNode* local_node, struct Htt return 0; } struct Libp2pPeer* new_peer = libp2p_peer_new_from_multiaddress(ma); - if (!libp2p_peer_connect(&local_node->identity->private_key, new_peer, local_node->peerstore, local_node->repo->config->datastore, 30)) { + if (!libp2p_peer_connect(local_node->dialer, new_peer, local_node->peerstore, local_node->repo->config->datastore, 30)) { libp2p_logger_error("http_request", "swarm_connect: Unable to connect to peer %s.\n", libp2p_peer_id_to_string(new_peer)); libp2p_peer_free(new_peer); multiaddress_free(ma); diff --git a/core/null.c b/core/null.c index b92a314..06c77a9 100644 --- a/core/null.c +++ b/core/null.c @@ -8,6 +8,7 @@ #include #include "libp2p/conn/session.h" +#include "libp2p/net/connectionstream.h" #include "libp2p/net/multistream.h" #include "libp2p/net/p2pnet.h" #include "libp2p/net/protocol.h" @@ -48,7 +49,7 @@ void ipfs_null_connection (void *ptr) { return; } - session->insecure_stream = libp2p_net_multistream_stream_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); + session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); session->default_stream = session->insecure_stream; session->datastore = connection_param->local_node->repo->config->datastore; session->filestore = connection_param->local_node->repo->config->filestore; @@ -59,20 +60,21 @@ void ipfs_null_connection (void *ptr) { // try to read from the network struct StreamMessage *results = 0; - // handle the call - for(;;) { - // immediately attempt to negotiate multistream - if (!libp2p_net_multistream_send_protocol(session)) - break; - if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) { - // problem reading; - break; - } - retVal = libp2p_protocol_marshal(results->data, results->data_size, session, connection_param->local_node->protocol_handlers); - libp2p_stream_message_free(results); - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; + // immediately attempt to negotiate multistream + if (libp2p_net_multistream_send_protocol(session)) { + // handle the call + for(;;) { + // Read from the network + if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) { + // problem reading; + break; + } + retVal = libp2p_protocol_marshal(results, session, connection_param->local_node->protocol_handlers); + libp2p_stream_message_free(results); + // exit the loop on error (or if they ask us to no longer loop by returning 0) + if (retVal <= 0) + break; + } } (*(connection_param->count))--; // update counter. @@ -101,7 +103,7 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) { // try to connect if we aren't already if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) { + if (!libp2p_peer_connect(local_node->dialer, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) { return 0; } } diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index cb2e522..5bfbbb0 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -6,6 +6,7 @@ #include #include "libp2p/os/utils.h" #include "libp2p/utils/logger.h" +#include "libp2p/net/stream.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/datastore/ds_helper.h" #include "ipfs/exchange/exchange.h" @@ -15,9 +16,9 @@ #include "ipfs/exchange/bitswap/peer_request_queue.h" #include "ipfs/exchange/bitswap/want_manager.h" -int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) { - char* result = strnstr((char*)incoming, "/ipfs/bitswap", incoming_size); - if(result == NULL || result != (char*)incoming) +int ipfs_bitswap_can_handle(const struct StreamMessage* msg) { + char* result = strnstr((char*)msg->data, "/ipfs/bitswap", msg->data_size); + if(result == NULL || result != (char*)msg->data) return 0; return 1; } @@ -34,9 +35,9 @@ int ipfs_bitswap_shutdown_handler(void* context) { * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_bitswap_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { +int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; - int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, incoming, incoming_size); + int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, msg->data, msg->data_size); if (retVal == 0) return -1; return retVal; diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index e995eb8..acc6d75 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -108,7 +108,7 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) { // handle it libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data); - int retVal = libp2p_protocol_marshal(buffer->data, buffer->data_size, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); + int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); libp2p_stream_message_free(buffer); did_some_processing = 1; if (retVal == -1) { diff --git a/exchange/bitswap/network.c b/exchange/bitswap/network.c index 2904fd6..9aae113 100644 --- a/exchange/bitswap/network.c +++ b/exchange/bitswap/network.c @@ -21,7 +21,7 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer)); // get a connection to the peer if (peer->connection_type != CONNECTION_TYPE_CONNECTED || peer->sessionContext == NULL) { - libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10); + libp2p_peer_connect(context->ipfsNode->dialer, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10); if(peer->connection_type != CONNECTION_TYPE_CONNECTED) return 0; } diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index e2a381a..ae621c9 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -384,7 +384,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context if (need_to_connect) { if (!connected) { // connect - connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0); + connected = libp2p_peer_connect(context->ipfsNode->dialer, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0); } if (connected) { // build a message diff --git a/include/ipfs/core/ipfs_node.h b/include/ipfs/core/ipfs_node.h index 90cf9e8..619f897 100644 --- a/include/ipfs/core/ipfs_node.h +++ b/include/ipfs/core/ipfs_node.h @@ -38,6 +38,7 @@ struct IpfsNode { struct Exchange* exchange; struct Libp2pVector* protocol_handlers; struct ApiContext* api_context; + struct Dialer* dialer; //struct Pinner pinning; // an interface //struct Mount** mounts; // TODO: Add more here diff --git a/include/ipfs/journal/journal.h b/include/ipfs/journal/journal.h index f08150f..f40f574 100644 --- a/include/ipfs/journal/journal.h +++ b/include/ipfs/journal/journal.h @@ -16,7 +16,7 @@ * @param incoming_size the size of the incoming message * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. */ -int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size); +int ipfs_journal_can_handle(const struct StreamMessage* msg); /** * Clean up resources used by this handler @@ -33,7 +33,7 @@ int ipfs_journal_shutdown_handler(void* context); * @param protocol_context in this case, an IpfsNode * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) ; +int ipfs_journal_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) ; /*** * Build the protocol handler struct for the Journal protocol diff --git a/journal/journal.c b/journal/journal.c index 0bcbb7a..388ba47 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -18,12 +18,12 @@ * @param incoming_size the size of the incoming message * @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise. */ -int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) { +int ipfs_journal_can_handle(const struct StreamMessage* msg) { const char* protocol = "/ipfs/journalio/1.0.0"; - if (incoming_size < 21) + if (msg->data_size < 21) return 0; - char* result = strnstr((char*)incoming, protocol, incoming_size); - if(result == NULL || result != (char*)incoming) + char* result = strnstr((char*)msg->data, protocol, msg->data_size); + if(result == NULL || result != (char*)msg->data) return 0; libp2p_logger_debug("journal", "Handling incoming message.\n"); return 1; @@ -106,7 +106,7 @@ int ipfs_journal_free_records(struct Libp2pVector* records) { int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) { if (peer->connection_type != CONNECTION_TYPE_CONNECTED) - libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, node->repo->config->datastore, 10); + libp2p_peer_connect(node->dialer, peer, node->peerstore, node->repo->config->datastore, 10); if (peer->connection_type != CONNECTION_TYPE_CONNECTED) return 0; // protobuf the message @@ -301,23 +301,22 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no /*** * Handles a message - * @param incoming the message - * @param incoming_size the size of the message + * @param incoming_msg the message * @param session_context details of the remote peer * @param protocol_context in this case, an IpfsNode * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { +int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct SessionContext* session_context, void* protocol_context) { struct StreamMessage* msg = NULL; // remove protocol - uint8_t *incoming_pos = (uint8_t*) incoming; - size_t pos_size = incoming_size; + uint8_t *incoming_pos = (uint8_t*) incoming_msg->data; + size_t pos_size = incoming_msg->data_size; int second_read = 0; - for(int i = 0; i < incoming_size; i++) { - if (incoming[i] == '\n') { - if (incoming_size > i + 1) { - incoming_pos = (uint8_t *)&incoming[i+1]; - pos_size = incoming_size - i; + for(int i = 0; i < incoming_msg->data_size; i++) { + if (incoming_msg->data[i] == '\n') { + if (incoming_msg->data_size > i + 1) { + incoming_pos = (uint8_t *)&incoming_msg->data[i+1]; + pos_size = incoming_msg->data_size - i; break; } else { // read next segment from network diff --git a/namesys/publisher.c b/namesys/publisher.c index b2cfdd9..83959ab 100644 --- a/namesys/publisher.c +++ b/namesys/publisher.c @@ -252,7 +252,7 @@ int ipfs_namesys_publisher_publish(struct IpfsNode* local_node, char* path) { return 0; } - libp2p_routing_dht_send_message_nearest_x(&local_node->identity->private_key, local_node->peerstore, local_node->repo->config->datastore, msg, 10); + libp2p_routing_dht_send_message_nearest_x(local_node->dialer, local_node->peerstore, local_node->repo->config->datastore, msg, 10); libp2p_message_free(msg); ipfs_cid_free(local_peer); diff --git a/routing/offline.c b/routing/offline.c index 3744f0c..e26938f 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -250,7 +250,7 @@ int ipfs_routing_offline_bootstrap (ipfs_routing* routing) return -1; // this should never happen } if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) - if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) { + if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) { libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer)); } } diff --git a/routing/online.c b/routing/online.c index c4a1a60..2dfd8f3 100644 --- a/routing/online.c +++ b/routing/online.c @@ -299,7 +299,7 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee int retVal = 0; if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { - if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) + if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) goto exit; } if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { @@ -430,7 +430,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k if (!libp2p_peer_is_connected(current_peer)) { // attempt to connect. If unsuccessful, continue in the loop. libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n"); - if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) { + if (libp2p_peer_connect(routing->local_node->dialer, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) { libp2p_logger_debug("online", "Peer connected\n"); if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) { libp2p_logger_debug("online", "Retrieved a value\n"); @@ -506,7 +506,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { return -1; // this should never happen } if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) - if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) { + if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) { libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer)); } } diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index 4434ca4..2c1db41 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -365,7 +365,7 @@ int test_routing_find_providers() { for(int i = 0; i < result->total; i++) { remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i); if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED - || libp2p_peer_connect(&local_node3->identity->private_key, remote_peer, local_node3->peerstore, local_node3->repo->config->datastore, 5)) { + || libp2p_peer_connect(local_node3->dialer, remote_peer, local_node3->peerstore, local_node3->repo->config->datastore, 5)) { break; } remote_peer = NULL;