Big changes for the yamux protocol
The Stream interface changed, so it touches many areas. But this change will help with memory allocation and a cleaner interface.
This commit is contained in:
parent
8b2a8ef3ab
commit
6160dd841b
6 changed files with 28 additions and 28 deletions
|
@ -39,13 +39,13 @@ struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* n
|
||||||
// journal
|
// journal
|
||||||
libp2p_utils_vector_add(retVal, ipfs_journal_build_protocol_handler(node));
|
libp2p_utils_vector_add(retVal, ipfs_journal_build_protocol_handler(node));
|
||||||
// kademlia
|
// kademlia
|
||||||
libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore));
|
libp2p_utils_vector_add(retVal, libp2p_routing_dht_build_protocol_handler(node->peerstore, node->providerstore, node->repo->config->datastore, node->repo->config->filestore));
|
||||||
// bitswap
|
// bitswap
|
||||||
libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node));
|
libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node));
|
||||||
// multistream
|
// multistream
|
||||||
libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(retVal));
|
libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(retVal));
|
||||||
// yamux
|
// yamux
|
||||||
libp2p_utils_vector_add(retVal, yamux_build_protocol_handler());
|
libp2p_utils_vector_add(retVal, libp2p_yamux_build_protocol_handler());
|
||||||
// identify
|
// identify
|
||||||
libp2p_utils_vector_add(retVal, libp2p_identify_build_protocol_handler(retVal));
|
libp2p_utils_vector_add(retVal, libp2p_identify_build_protocol_handler(retVal));
|
||||||
}
|
}
|
||||||
|
|
36
core/null.c
36
core/null.c
|
@ -49,35 +49,33 @@ void ipfs_null_connection (void *ptr) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port);
|
|
||||||
session->default_stream = session->insecure_stream;
|
|
||||||
session->datastore = connection_param->local_node->repo->config->datastore;
|
session->datastore = connection_param->local_node->repo->config->datastore;
|
||||||
session->filestore = connection_param->local_node->repo->config->filestore;
|
session->filestore = connection_param->local_node->repo->config->filestore;
|
||||||
session->host = connection_param->ip;
|
session->host = connection_param->ip;
|
||||||
session->port = connection_param->port;
|
session->port = connection_param->port;
|
||||||
|
session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port, session);
|
||||||
|
session->default_stream = session->insecure_stream;
|
||||||
|
|
||||||
libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count));
|
libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count));
|
||||||
|
|
||||||
// try to read from the network
|
// try to read from the network
|
||||||
struct StreamMessage *results = 0;
|
struct StreamMessage *results = 0;
|
||||||
// immediately attempt to negotiate multistream
|
// handle the call
|
||||||
if (libp2p_net_multistream_send_protocol(session)) {
|
for(;;) {
|
||||||
// handle the call
|
// Read from the network
|
||||||
for(;;) {
|
if (!session->default_stream->read(session->default_stream->stream_context, &results, DEFAULT_NETWORK_TIMEOUT)) {
|
||||||
// Read from the network
|
// problem reading
|
||||||
if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) {
|
break;
|
||||||
// problem reading
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (results != NULL) {
|
|
||||||
retVal = libp2p_protocol_marshal(results, session, connection_param->local_node->protocol_handlers);
|
|
||||||
libp2p_stream_message_free(results);
|
|
||||||
}
|
|
||||||
// exit the loop on error (or if they ask us to no longer loop by returning 0)
|
|
||||||
if (retVal <= 0)
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
if (results != NULL) {
|
||||||
|
retVal = libp2p_protocol_marshal(results, session->default_stream, connection_param->local_node->protocol_handlers);
|
||||||
|
libp2p_stream_message_free(results);
|
||||||
|
}
|
||||||
|
if (retVal < 0) {
|
||||||
|
// exit the loop on error
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} // end of loop
|
||||||
|
|
||||||
(*(connection_param->count))--; // update counter.
|
(*(connection_param->count))--; // update counter.
|
||||||
if (connection_param->ip != NULL)
|
if (connection_param->ip != NULL)
|
||||||
|
|
|
@ -7,6 +7,7 @@
|
||||||
#include "libp2p/os/utils.h"
|
#include "libp2p/os/utils.h"
|
||||||
#include "libp2p/utils/logger.h"
|
#include "libp2p/utils/logger.h"
|
||||||
#include "libp2p/net/stream.h"
|
#include "libp2p/net/stream.h"
|
||||||
|
#include "libp2p/net/connectionstream.h"
|
||||||
#include "ipfs/core/ipfs_node.h"
|
#include "ipfs/core/ipfs_node.h"
|
||||||
#include "ipfs/datastore/ds_helper.h"
|
#include "ipfs/datastore/ds_helper.h"
|
||||||
#include "ipfs/exchange/exchange.h"
|
#include "ipfs/exchange/exchange.h"
|
||||||
|
@ -35,8 +36,11 @@ int ipfs_bitswap_shutdown_handler(void* context) {
|
||||||
* @param protocol_context the protocol-dependent context
|
* @param protocol_context the protocol-dependent context
|
||||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||||
*/
|
*/
|
||||||
int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
|
int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
|
||||||
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
||||||
|
struct SessionContext* session_context = libp2p_net_connection_get_session_context(stream);
|
||||||
|
if (session_context == NULL)
|
||||||
|
return -1;
|
||||||
int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, msg->data, msg->data_size);
|
int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, msg->data, msg->data_size);
|
||||||
if (retVal == 0)
|
if (retVal == 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -108,7 +108,7 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
||||||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) {
|
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) {
|
||||||
// handle it
|
// handle it
|
||||||
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data);
|
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data);
|
||||||
int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext->default_stream, context->ipfsNode->protocol_handlers);
|
||||||
libp2p_stream_message_free(buffer);
|
libp2p_stream_message_free(buffer);
|
||||||
did_some_processing = 1;
|
did_some_processing = 1;
|
||||||
if (retVal == -1) {
|
if (retVal == -1) {
|
||||||
|
|
|
@ -33,7 +33,7 @@ int ipfs_journal_shutdown_handler(void* context);
|
||||||
* @param protocol_context in this case, an IpfsNode
|
* @param protocol_context in this case, an IpfsNode
|
||||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||||
*/
|
*/
|
||||||
int ipfs_journal_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) ;
|
int ipfs_journal_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) ;
|
||||||
|
|
||||||
/***
|
/***
|
||||||
* Build the protocol handler struct for the Journal protocol
|
* Build the protocol handler struct for the Journal protocol
|
||||||
|
|
|
@ -306,7 +306,7 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no
|
||||||
* @param protocol_context in this case, an IpfsNode
|
* @param protocol_context in this case, an IpfsNode
|
||||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||||
*/
|
*/
|
||||||
int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct SessionContext* session_context, void* protocol_context) {
|
int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct Stream* stream, void* protocol_context) {
|
||||||
struct StreamMessage* msg = NULL;
|
struct StreamMessage* msg = NULL;
|
||||||
// remove protocol
|
// remove protocol
|
||||||
uint8_t *incoming_pos = (uint8_t*) incoming_msg->data;
|
uint8_t *incoming_pos = (uint8_t*) incoming_msg->data;
|
||||||
|
@ -320,7 +320,7 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// read next segment from network
|
// read next segment from network
|
||||||
if (!session_context->default_stream->read(session_context, &msg, 10)) {
|
if (!stream->read(stream->stream_context, &msg, 10)) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
incoming_pos = msg->data;
|
incoming_pos = msg->data;
|
||||||
|
@ -330,7 +330,6 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct
|
||||||
libp2p_stream_message_free(msg);
|
libp2p_stream_message_free(msg);
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
}
|
}
|
||||||
libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id);
|
|
||||||
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
||||||
// un-protobuf the message
|
// un-protobuf the message
|
||||||
struct JournalMessage* message = NULL;
|
struct JournalMessage* message = NULL;
|
||||||
|
@ -342,7 +341,6 @@ int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct
|
||||||
// NOTE: If our_time_diff is negative, the remote's clock is faster than ours.
|
// NOTE: If our_time_diff is negative, the remote's clock is faster than ours.
|
||||||
// if it is positive, our clock is faster than theirs.
|
// if it is positive, our clock is faster than theirs.
|
||||||
if ( llabs(our_time_diff) > 300) {
|
if ( llabs(our_time_diff) > 300) {
|
||||||
libp2p_logger_error("journal", "The clock of peer %s is out of 5 minute range. Seconds difference: %llu", session_context->remote_peer_id, our_time_diff);
|
|
||||||
if (second_read) {
|
if (second_read) {
|
||||||
free(incoming_pos);
|
free(incoming_pos);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue