New way of swarm connection
This commit is contained in:
parent
91f5c50a71
commit
f9bec0ac20
14 changed files with 52 additions and 49 deletions
2
Makefile
2
Makefile
|
@ -3,7 +3,7 @@ DEBUG = true
|
|||
export DEBUG
|
||||
|
||||
all:
|
||||
cd ../c-libp2p; make all;
|
||||
#cd ../c-libp2p; make all;
|
||||
cd blocks; make all;
|
||||
cd cid; make all;
|
||||
cd cmd; make all;
|
||||
|
|
|
@ -306,7 +306,7 @@ int ipfs_core_http_process_swarm_connect(struct IpfsNode* local_node, struct Htt
|
|||
return 0;
|
||||
}
|
||||
struct Libp2pPeer* new_peer = libp2p_peer_new_from_multiaddress(ma);
|
||||
if (!libp2p_peer_connect(&local_node->identity->private_key, new_peer, local_node->peerstore, local_node->repo->config->datastore, 30)) {
|
||||
if (!libp2p_peer_connect(local_node->dialer, new_peer, local_node->peerstore, local_node->repo->config->datastore, 30)) {
|
||||
libp2p_logger_error("http_request", "swarm_connect: Unable to connect to peer %s.\n", libp2p_peer_id_to_string(new_peer));
|
||||
libp2p_peer_free(new_peer);
|
||||
multiaddress_free(ma);
|
||||
|
|
34
core/null.c
34
core/null.c
|
@ -8,6 +8,7 @@
|
|||
#include <arpa/inet.h>
|
||||
|
||||
#include "libp2p/conn/session.h"
|
||||
#include "libp2p/net/connectionstream.h"
|
||||
#include "libp2p/net/multistream.h"
|
||||
#include "libp2p/net/p2pnet.h"
|
||||
#include "libp2p/net/protocol.h"
|
||||
|
@ -48,7 +49,7 @@ void ipfs_null_connection (void *ptr) {
|
|||
return;
|
||||
}
|
||||
|
||||
session->insecure_stream = libp2p_net_multistream_stream_new(connection_param->file_descriptor, connection_param->ip, connection_param->port);
|
||||
session->insecure_stream = libp2p_net_connection_new(connection_param->file_descriptor, connection_param->ip, connection_param->port);
|
||||
session->default_stream = session->insecure_stream;
|
||||
session->datastore = connection_param->local_node->repo->config->datastore;
|
||||
session->filestore = connection_param->local_node->repo->config->filestore;
|
||||
|
@ -59,20 +60,21 @@ void ipfs_null_connection (void *ptr) {
|
|||
|
||||
// try to read from the network
|
||||
struct StreamMessage *results = 0;
|
||||
// handle the call
|
||||
for(;;) {
|
||||
// immediately attempt to negotiate multistream
|
||||
if (!libp2p_net_multistream_send_protocol(session))
|
||||
break;
|
||||
if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) {
|
||||
// problem reading;
|
||||
break;
|
||||
}
|
||||
retVal = libp2p_protocol_marshal(results->data, results->data_size, session, connection_param->local_node->protocol_handlers);
|
||||
libp2p_stream_message_free(results);
|
||||
// exit the loop on error (or if they ask us to no longer loop by returning 0)
|
||||
if (retVal <= 0)
|
||||
break;
|
||||
// immediately attempt to negotiate multistream
|
||||
if (libp2p_net_multistream_send_protocol(session)) {
|
||||
// handle the call
|
||||
for(;;) {
|
||||
// Read from the network
|
||||
if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) {
|
||||
// problem reading;
|
||||
break;
|
||||
}
|
||||
retVal = libp2p_protocol_marshal(results, session, connection_param->local_node->protocol_handlers);
|
||||
libp2p_stream_message_free(results);
|
||||
// exit the loop on error (or if they ask us to no longer loop by returning 0)
|
||||
if (retVal <= 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
(*(connection_param->count))--; // update counter.
|
||||
|
@ -101,7 +103,7 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee
|
|||
if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) {
|
||||
// try to connect if we aren't already
|
||||
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
|
||||
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) {
|
||||
if (!libp2p_peer_connect(local_node->dialer, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
#include <pthread.h>
|
||||
#include "libp2p/os/utils.h"
|
||||
#include "libp2p/utils/logger.h"
|
||||
#include "libp2p/net/stream.h"
|
||||
#include "ipfs/core/ipfs_node.h"
|
||||
#include "ipfs/datastore/ds_helper.h"
|
||||
#include "ipfs/exchange/exchange.h"
|
||||
|
@ -15,9 +16,9 @@
|
|||
#include "ipfs/exchange/bitswap/peer_request_queue.h"
|
||||
#include "ipfs/exchange/bitswap/want_manager.h"
|
||||
|
||||
int ipfs_bitswap_can_handle(const uint8_t* incoming, size_t incoming_size) {
|
||||
char* result = strnstr((char*)incoming, "/ipfs/bitswap", incoming_size);
|
||||
if(result == NULL || result != (char*)incoming)
|
||||
int ipfs_bitswap_can_handle(const struct StreamMessage* msg) {
|
||||
char* result = strnstr((char*)msg->data, "/ipfs/bitswap", msg->data_size);
|
||||
if(result == NULL || result != (char*)msg->data)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
@ -34,9 +35,9 @@ int ipfs_bitswap_shutdown_handler(void* context) {
|
|||
* @param protocol_context the protocol-dependent context
|
||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||
*/
|
||||
int ipfs_bitswap_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
|
||||
int ipfs_bitswap_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
|
||||
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
||||
int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, incoming, incoming_size);
|
||||
int retVal = ipfs_bitswap_network_handle_message(local_node, session_context, msg->data, msg->data_size);
|
||||
if (retVal == 0)
|
||||
return -1;
|
||||
return retVal;
|
||||
|
|
|
@ -108,7 +108,7 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
|||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) {
|
||||
// handle it
|
||||
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data);
|
||||
int retVal = libp2p_protocol_marshal(buffer->data, buffer->data_size, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
||||
int retVal = libp2p_protocol_marshal(buffer, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
||||
libp2p_stream_message_free(buffer);
|
||||
did_some_processing = 1;
|
||||
if (retVal == -1) {
|
||||
|
|
|
@ -21,7 +21,7 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru
|
|||
libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer));
|
||||
// get a connection to the peer
|
||||
if (peer->connection_type != CONNECTION_TYPE_CONNECTED || peer->sessionContext == NULL) {
|
||||
libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10);
|
||||
libp2p_peer_connect(context->ipfsNode->dialer, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10);
|
||||
if(peer->connection_type != CONNECTION_TYPE_CONNECTED)
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -384,7 +384,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
|
|||
if (need_to_connect) {
|
||||
if (!connected) {
|
||||
// connect
|
||||
connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0);
|
||||
connected = libp2p_peer_connect(context->ipfsNode->dialer, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0);
|
||||
}
|
||||
if (connected) {
|
||||
// build a message
|
||||
|
|
|
@ -38,6 +38,7 @@ struct IpfsNode {
|
|||
struct Exchange* exchange;
|
||||
struct Libp2pVector* protocol_handlers;
|
||||
struct ApiContext* api_context;
|
||||
struct Dialer* dialer;
|
||||
//struct Pinner pinning; // an interface
|
||||
//struct Mount** mounts;
|
||||
// TODO: Add more here
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
* @param incoming_size the size of the incoming message
|
||||
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
|
||||
*/
|
||||
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size);
|
||||
int ipfs_journal_can_handle(const struct StreamMessage* msg);
|
||||
|
||||
/**
|
||||
* Clean up resources used by this handler
|
||||
|
@ -33,7 +33,7 @@ int ipfs_journal_shutdown_handler(void* context);
|
|||
* @param protocol_context in this case, an IpfsNode
|
||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||
*/
|
||||
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) ;
|
||||
int ipfs_journal_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) ;
|
||||
|
||||
/***
|
||||
* Build the protocol handler struct for the Journal protocol
|
||||
|
|
|
@ -18,12 +18,12 @@
|
|||
* @param incoming_size the size of the incoming message
|
||||
* @returns true(1) if the protocol in incoming is something we can handle. False(0) otherwise.
|
||||
*/
|
||||
int ipfs_journal_can_handle(const uint8_t* incoming, size_t incoming_size) {
|
||||
int ipfs_journal_can_handle(const struct StreamMessage* msg) {
|
||||
const char* protocol = "/ipfs/journalio/1.0.0";
|
||||
if (incoming_size < 21)
|
||||
if (msg->data_size < 21)
|
||||
return 0;
|
||||
char* result = strnstr((char*)incoming, protocol, incoming_size);
|
||||
if(result == NULL || result != (char*)incoming)
|
||||
char* result = strnstr((char*)msg->data, protocol, msg->data_size);
|
||||
if(result == NULL || result != (char*)msg->data)
|
||||
return 0;
|
||||
libp2p_logger_debug("journal", "Handling incoming message.\n");
|
||||
return 1;
|
||||
|
@ -106,7 +106,7 @@ int ipfs_journal_free_records(struct Libp2pVector* records) {
|
|||
|
||||
int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) {
|
||||
if (peer->connection_type != CONNECTION_TYPE_CONNECTED)
|
||||
libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, node->repo->config->datastore, 10);
|
||||
libp2p_peer_connect(node->dialer, peer, node->peerstore, node->repo->config->datastore, 10);
|
||||
if (peer->connection_type != CONNECTION_TYPE_CONNECTED)
|
||||
return 0;
|
||||
// protobuf the message
|
||||
|
@ -301,23 +301,22 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no
|
|||
|
||||
/***
|
||||
* Handles a message
|
||||
* @param incoming the message
|
||||
* @param incoming_size the size of the message
|
||||
* @param incoming_msg the message
|
||||
* @param session_context details of the remote peer
|
||||
* @param protocol_context in this case, an IpfsNode
|
||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||
*/
|
||||
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
|
||||
int ipfs_journal_handle_message(const struct StreamMessage* incoming_msg, struct SessionContext* session_context, void* protocol_context) {
|
||||
struct StreamMessage* msg = NULL;
|
||||
// remove protocol
|
||||
uint8_t *incoming_pos = (uint8_t*) incoming;
|
||||
size_t pos_size = incoming_size;
|
||||
uint8_t *incoming_pos = (uint8_t*) incoming_msg->data;
|
||||
size_t pos_size = incoming_msg->data_size;
|
||||
int second_read = 0;
|
||||
for(int i = 0; i < incoming_size; i++) {
|
||||
if (incoming[i] == '\n') {
|
||||
if (incoming_size > i + 1) {
|
||||
incoming_pos = (uint8_t *)&incoming[i+1];
|
||||
pos_size = incoming_size - i;
|
||||
for(int i = 0; i < incoming_msg->data_size; i++) {
|
||||
if (incoming_msg->data[i] == '\n') {
|
||||
if (incoming_msg->data_size > i + 1) {
|
||||
incoming_pos = (uint8_t *)&incoming_msg->data[i+1];
|
||||
pos_size = incoming_msg->data_size - i;
|
||||
break;
|
||||
} else {
|
||||
// read next segment from network
|
||||
|
|
|
@ -252,7 +252,7 @@ int ipfs_namesys_publisher_publish(struct IpfsNode* local_node, char* path) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
libp2p_routing_dht_send_message_nearest_x(&local_node->identity->private_key, local_node->peerstore, local_node->repo->config->datastore, msg, 10);
|
||||
libp2p_routing_dht_send_message_nearest_x(local_node->dialer, local_node->peerstore, local_node->repo->config->datastore, msg, 10);
|
||||
|
||||
libp2p_message_free(msg);
|
||||
ipfs_cid_free(local_peer);
|
||||
|
|
|
@ -250,7 +250,7 @@ int ipfs_routing_offline_bootstrap (ipfs_routing* routing)
|
|||
return -1; // this should never happen
|
||||
}
|
||||
if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier)
|
||||
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) {
|
||||
if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) {
|
||||
libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -299,7 +299,7 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee
|
|||
int retVal = 0;
|
||||
|
||||
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
|
||||
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5))
|
||||
if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5))
|
||||
goto exit;
|
||||
}
|
||||
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
|
||||
|
@ -430,7 +430,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k
|
|||
if (!libp2p_peer_is_connected(current_peer)) {
|
||||
// attempt to connect. If unsuccessful, continue in the loop.
|
||||
libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n");
|
||||
if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) {
|
||||
if (libp2p_peer_connect(routing->local_node->dialer, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) {
|
||||
libp2p_logger_debug("online", "Peer connected\n");
|
||||
if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) {
|
||||
libp2p_logger_debug("online", "Retrieved a value\n");
|
||||
|
@ -506,7 +506,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) {
|
|||
return -1; // this should never happen
|
||||
}
|
||||
if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier)
|
||||
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) {
|
||||
if (!libp2p_peer_connect(routing->local_node->dialer, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) {
|
||||
libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -365,7 +365,7 @@ int test_routing_find_providers() {
|
|||
for(int i = 0; i < result->total; i++) {
|
||||
remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i);
|
||||
if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED
|
||||
|| libp2p_peer_connect(&local_node3->identity->private_key, remote_peer, local_node3->peerstore, local_node3->repo->config->datastore, 5)) {
|
||||
|| libp2p_peer_connect(local_node3->dialer, remote_peer, local_node3->peerstore, local_node3->repo->config->datastore, 5)) {
|
||||
break;
|
||||
}
|
||||
remote_peer = NULL;
|
||||
|
|
Loading…
Reference in a new issue