From 6160dd841b560e5188e46e2f81470c3feb6a5310 Mon Sep 17 00:00:00 2001 From: John Jones Date: Wed, 8 Nov 2017 10:54:31 -0500 Subject: [PATCH] Big changes for the yamux protocol The Stream interface changed, so it touches many areas. But this change will help with memory allocation and a cleaner interface. --- core/ipfs_node.c | 4 ++-- core/null.c | 36 ++++++++++++++++------------------ exchange/bitswap/bitswap.c | 6 +++++- exchange/bitswap/engine.c | 2 +- include/ipfs/journal/journal.h | 2 +- journal/journal.c | 6 ++---- 6 files changed, 28 insertions(+), 28 deletions(-) diff --git a/core/ipfs_node.c b/core/ipfs_node.c index 05c43ee..d9a115b 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -39,13 +39,13 @@ struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* n // journal libp2p_utils_vector_add(retVal, ipfs_journal_build_protocol_handler(node)); // kademlia - libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore)); + libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore, node->repo->config->datastore, node->repo->config->filestore)); // bitswap libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node)); // multistream libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(retVal)); // yamux - libp2p_utils_vector_add(retVal, yamux_build_protocol_handler()); + libp2p_utils_vector_add(retVal, libp2p_yamux_build_protocol_handler()); // identify libp2p_utils_vector_add(retVal, libp2p_identify_build_protocol_handler(retVal)); } diff --git a/core/null.c b/core/null.c index 02a8289..b748b88 100644 --- a/core/null.c +++ b/core/null.c @@ -49,35 +49,33 @@ void ipfs_null_connection (void *ptr) { return; } - session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port); - session->default_stream = session->insecure_stream; session->datastore = connection_param->local_node->repo->config->datastore; session->filestore = connection_param->local_node->repo->config->filestore; session->host = connection_param->ip; session->port = connection_param->port; + session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port, session); + session->default_stream = session->insecure_stream; libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); // try to read from the network struct StreamMessage *results = 0; - // immediately attempt to negotiate multistream - if (libp2p_net_multistream_send_protocol(session)) { - // handle the call - for(;;) { - // Read from the network - if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) { - // problem reading - break; - } - if (results != NULL) { - retVal = libp2p_protocol_marshal(results, session, connection_param->local_node->protocol_handlers); - libp2p_stream_message_free(results); - } - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; + // handle the call + for(;;) { + // Read from the network + if (!session->default_stream->read(session->default_stream->stream_context, &results, DEFAULT_NETWORK_TIMEOUT)) { + // problem reading + break; } - } + if (results != NULL) { + retVal = libp2p_protocol_marshal(results, session->default_stream, connection_param->local_node->protocol_handlers); + libp2p_stream_message_free(results); + } + if (retVal < 0) { + // exit the loop on error + break; + } + } // end of loop (*(connection_param->count))--; // update counter. if (connection_param->ip != NULL) diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 5bfbbb0..01fb7ff 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -7,6 +7,7 @@ #include "libp2p/os/utils.h" #include "libp2p/utils/logger.h" #include "libp2p/net/stream.h" +#include "libp2p/net/connectionstream.h" #include "ipfs/core/ipfs_node.h" #include "ipfs/datastore/ds_helper.h" #include "ipfs/exchange/exchange.h" @@ -35,8 +36,11 @@ int ipfs_bitswap_shutdown_handler(void* context) { * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { +int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; + struct SessionContext* session_context = libp2p_net_connection_get_session_context(stream); + if (session_context == NULL) + return -1; int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, msg->data, msg->data_size); if (retVal == 0) return -1; diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index acc6d75..b698bec 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -108,7 +108,7 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) { // handle it libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data); - int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers); + int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext->default_stream, context->ipfsNode->protocol_handlers); libp2p_stream_message_free(buffer); did_some_processing = 1; if (retVal == -1) { diff --git a/include/ipfs/journal/journal.h b/include/ipfs/journal/journal.h index f40f574..446d84e 100644 --- a/include/ipfs/journal/journal.h +++ b/include/ipfs/journal/journal.h @@ -33,7 +33,7 @@ int ipfs_journal_shutdown_handler(void* context); * @param protocol_context in this case, an IpfsNode * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_journal_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) ; +int ipfs_journal_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) ; /*** * Build the protocol handler struct for the Journal protocol diff --git a/journal/journal.c b/journal/journal.c index 388ba47..c1ec933 100644 --- a/journal/journal.c +++ b/journal/journal.c @@ -306,7 +306,7 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no * @param protocol_context in this case, an IpfsNode * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ -int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct SessionContext* session_context, void* protocol_context) { +int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct Stream* stream, void* protocol_context) { struct StreamMessage* msg = NULL; // remove protocol uint8_t *incoming_pos = (uint8_t*) incoming_msg->data; @@ -320,7 +320,7 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct break; } else { // read next segment from network - if (!session_context->default_stream->read(session_context, &msg, 10)) { + if (!stream->read(stream->stream_context, &msg, 10)) { return -1; } incoming_pos = msg->data; @@ -330,7 +330,6 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct libp2p_stream_message_free(msg); msg = NULL; } - libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id); struct IpfsNode* local_node = (struct IpfsNode*)protocol_context; // un-protobuf the message struct JournalMessage* message = NULL; @@ -342,7 +341,6 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct // NOTE: If our_time_diff is negative, the remote's clock is faster than ours. // if it is positive, our clock is faster than theirs. if ( llabs(our_time_diff) > 300) { - libp2p_logger_error("journal", "The clock of peer %s is out of 5 minute range. Seconds difference: %llu", session_context->remote_peer_id, our_time_diff); if (second_read) { free(incoming_pos); }