An occasional ping to verify connectivity

yamux
jmjatlanta 2017-08-16 08:15:06 -05:00
parent 5de67539ef
commit 5b242a2d08
2 changed files with 40 additions and 22 deletions

View File

@ -148,6 +148,11 @@ void* ipfs_null_listen (void *ptr)
libp2p_logger_error("null", "Ipfs listening on %d\n", listen_param->port);
// when we have nothing to do, check on the connections to see if we're still connected
struct Libp2pLinkedList* current_peer_entry = NULL;
if (listen_param->local_node->peerstore->head_entry != NULL)
current_peer_entry = listen_param->local_node->peerstore->head_entry;
// the main loop, listening for new connections
for (;;) {
//libp2p_logger_debug("null", "%s Attempting socket read with fd %d.\n", listen_param->local_node->identity->peer->id, socketfd);
@ -179,6 +184,19 @@ void* ipfs_null_listen (void *ptr)
// Create pthread for ipfs_null_connection.
thpool_add_work(thpool, ipfs_null_connection, connection_param);
}
} else {
// timeout... do maintenance
struct PeerEntry* entry = current_peer_entry->item;
if (current_peer_entry != NULL && !entry->peer->is_local && entry->peer->connection_type == CONNECTION_TYPE_CONNECTED) {
libp2p_logger_debug("null", "Attempting to ping %s.\n", entry->peer->id);
if (!listen_param->local_node->routing->Ping(listen_param->local_node->routing, entry->peer)) {
entry->peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
}
}
if (current_peer_entry != NULL)
current_peer_entry = current_peer_entry->next;
if (current_peer_entry == NULL)
current_peer_entry = listen_param->local_node->peerstore->head_entry;
}
}

View File

@ -301,39 +301,39 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* peer) {
struct Libp2pMessage* msg = NULL;
unsigned char *protobuf;
size_t protobuf_size;
struct Libp2pMessage *outMsg = NULL, *inMsg = NULL;
int retVal = 0;
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 5))
return 0;
goto exit;
}
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
// build the message
msg = libp2p_message_new();
msg->message_type = MESSAGE_TYPE_PING;
protobuf_size = libp2p_message_protobuf_encode_size(msg);
protobuf = (unsigned char*)malloc(protobuf_size);
libp2p_message_protobuf_encode(msg, protobuf, protobuf_size, &protobuf_size);
libp2p_message_free(msg);
msg = NULL;
outMsg = libp2p_message_new();
if (outMsg == NULL)
goto exit;
outMsg->message_type = MESSAGE_TYPE_PING;
// send the message
inMsg = ipfs_routing_online_send_receive_message(peer->sessionContext, outMsg);
// connect using a dialer
struct Dialer *dialer = libp2p_conn_dialer_new(routing->local_node->identity->peer->id, libp2p_crypto_rsa_to_private_key(routing->sk));
struct Connection *conn = libp2p_conn_dialer_get_connection(dialer, peer->addr_head->item);
// send the record
conn->write(conn, (char*)protobuf, protobuf_size);
free(protobuf);
conn->read(conn, (char**)&protobuf, &protobuf_size);
libp2p_message_protobuf_decode(protobuf, protobuf_size, &msg);
if (inMsg == NULL) {
goto exit;
}
if (msg == NULL || msg->message_type != MESSAGE_TYPE_PING)
return 0;
if (inMsg == NULL || inMsg->message_type != MESSAGE_TYPE_PING)
goto exit;
}
return 1;
retVal = 1;
exit:
if (inMsg != NULL)
libp2p_message_free(inMsg);
if (outMsg != NULL)
libp2p_message_free(outMsg);
return retVal;
}
/***