multistream protocol now has a protocol interface

This commit is contained in:
John Jones 2017-09-04 11:02:48 -05:00
parent 407f85bc89
commit 7dbb6fca29
9 changed files with 38 additions and 79 deletions

View file

@ -20,7 +20,7 @@ struct Libp2pVector* ipfs_node_online_build_protocol_handlers(struct IpfsNode* n
// bitswap // bitswap
libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node)); libp2p_utils_vector_add(retVal, ipfs_bitswap_build_protocol_handler(node));
// multistream // multistream
libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(node)); libp2p_utils_vector_add(retVal, libp2p_net_multistream_build_protocol_handler(retVal));
} }
return retVal; return retVal;
} }

View file

@ -55,77 +55,30 @@ void ipfs_null_connection (void *ptr) {
libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count)); libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count));
if (libp2p_net_multistream_negotiate(session)) { // try to read from the network
// Someone has connected and successfully negotiated multistream. Now talk to them... uint8_t *results = 0;
int unsuccessful_max = 30;
int unsuccessful_counter = 0;
for(;;) {
// Wait for them to ask something...
unsigned char* results = NULL;
size_t bytes_read = 0; size_t bytes_read = 0;
if (null_shutting_down) { // handle the call
libp2p_logger_debug("null", "%s null shutting down before read.\n", connection_param->local_node->identity->peer->id); for(;;) {
// this service is shutting down. Ignore the request and exit the loop // immediately attempt to negotiate multistream
if (!libp2p_net_multistream_send_protocol(session))
break;
if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT)) {
// problem reading;
break; break;
} }
// see if we have something to read
retVal = session->default_stream->peek(session);
if (retVal < 0) { // error
libp2p_logger_debug("null", "Peer returned %d. Exiting loop\n", retVal);
retVal = -1;
break;
}
if (retVal == 0) { // nothing to read
sleep(1);
unsuccessful_counter++;
if (unsuccessful_counter >= unsuccessful_max) {
libp2p_logger_debug("null", "We've tried %d times in the daemon loop. Exiting.\n", unsuccessful_counter);
retVal = -1;
break;
}
continue;
}
if (retVal > 0 && !session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT) ) {
// it said it was ready, but something happened
libp2p_logger_debug("null", "Peek said there was something there, but there was not. Exiting.\n");
retVal = -1;
break;
}
if (null_shutting_down) {
libp2p_logger_debug("null", "%s null shutting down after read.\n", connection_param->local_node->identity->peer->id);
// this service is shutting down. Ignore the request and exit the loop
retVal = -1;
break;
}
// We actually got something. Process the request...
unsuccessful_counter = 0;
libp2p_logger_debug("null", "Read %lu bytes from a stream tranaction\n", bytes_read);
retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers); retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers);
if (results != NULL)
free(results); free(results);
if (retVal == -1) { // exit the loop on error (or if they ask us to no longer loop by returning 0)
libp2p_logger_debug("null", "protocol_marshal returned error.\n"); if (retVal <= 0)
break; break;
} else if (retVal == 0) {
// clean up, but let someone else handle this from now on
libp2p_logger_debug("null", "protocol_marshal returned 0. The daemon will no longer handle this.\n");
break;
} else {
libp2p_logger_debug("null", "protocol_marshal returned 1. Looping again.\n");
}
}
} else {
libp2p_logger_log("null", LOGLEVEL_DEBUG, "Multistream negotiation failed\n");
} }
(*(connection_param->count))--; // update counter. (*(connection_param->count))--; // update counter.
if (connection_param->ip != NULL) if (connection_param->ip != NULL)
free(connection_param->ip); free(connection_param->ip);
free (connection_param); free (connection_param);
if (retVal != 0) {
libp2p_logger_debug("null", "%s Freeing session context.\n", connection_param->local_node->identity->peer->id);
//libp2p_session_context_free(session);
}
return; return;
} }
@ -148,7 +101,7 @@ int ipfs_null_do_maintenance(struct IpfsNode* local_node, struct Libp2pPeer* pee
if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) { if (replication_peer != NULL && local_node->repo->config->replication->announce && announce_secs < 0) {
// try to connect if we aren't already // try to connect if we aren't already
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, 2)) { if (!libp2p_peer_connect(&local_node->identity->private_key, peer, local_node->peerstore, local_node->repo->config->datastore, 2)) {
return 0; return 0;
} }
} }

View file

@ -19,7 +19,7 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru
libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer)); libp2p_logger_debug("bitswap_network", "Sending bitswap message to %s.\n", libp2p_peer_id_to_string(peer));
// get a connection to the peer // get a connection to the peer
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, 10); libp2p_peer_connect(&context->ipfsNode->identity->private_key, peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 10);
if(peer->connection_type != CONNECTION_TYPE_CONNECTED) if(peer->connection_type != CONNECTION_TYPE_CONNECTED)
return 0; return 0;
} }

View file

@ -384,7 +384,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
if (need_to_connect) { if (need_to_connect) {
if (!connected) { if (!connected) {
// connect // connect
connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, 0); connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, context->ipfsNode->peerstore, context->ipfsNode->repo->config->datastore, 0);
} }
if (connected) { if (connected) {
// build a message // build a message

View file

@ -103,7 +103,7 @@ int ipfs_journal_free_records(struct Libp2pVector* records) {
int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) { int ipfs_journal_send_message(struct IpfsNode* node, struct Libp2pPeer* peer, struct JournalMessage* message) {
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) if (peer->connection_type != CONNECTION_TYPE_CONNECTED)
libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, 10); libp2p_peer_connect(&node->identity->private_key, peer, node->peerstore, node->repo->config->datastore, 10);
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) if (peer->connection_type != CONNECTION_TYPE_CONNECTED)
return 0; return 0;
// protobuf the message // protobuf the message

View file

@ -317,7 +317,7 @@ int ipfs_routing_online_ping(struct IpfsRouting* routing, struct Libp2pPeer* pee
int retVal = 0; int retVal = 0;
if (peer->connection_type != CONNECTION_TYPE_CONNECTED) { if (peer->connection_type != CONNECTION_TYPE_CONNECTED) {
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 5)) if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5))
goto exit; goto exit;
} }
if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { if (peer->connection_type == CONNECTION_TYPE_CONNECTED) {
@ -438,7 +438,7 @@ int ipfs_routing_online_get_value (ipfs_routing* routing, const unsigned char *k
if (!libp2p_peer_is_connected(current_peer)) { if (!libp2p_peer_is_connected(current_peer)) {
// attempt to connect. If unsuccessful, continue in the loop. // attempt to connect. If unsuccessful, continue in the loop.
libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n"); libp2p_logger_debug("online", "Attempting to connect to peer to retrieve file\n");
if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, 5)) { if (libp2p_peer_connect(&routing->local_node->identity->private_key, current_peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 5)) {
libp2p_logger_debug("online", "Peer connected\n"); libp2p_logger_debug("online", "Peer connected\n");
if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) { if (ipfs_routing_online_get_peer_value(routing, current_peer, key, key_size, buffer, buffer_size)) {
libp2p_logger_debug("online", "Retrieved a value\n"); libp2p_logger_debug("online", "Retrieved a value\n");
@ -512,7 +512,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) {
return -1; // this should never happen return -1; // this should never happen
} }
if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier)
if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, 2)) { if (!libp2p_peer_connect(&routing->local_node->identity->private_key, peer, routing->local_node->peerstore, routing->local_node->repo->config->datastore, 2)) {
libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer)); libp2p_logger_debug("online", "Attempted to bootstrap and connect to %s but failed. Continuing.\n", libp2p_peer_id_to_string(peer));
} }
} }

View file

@ -121,7 +121,7 @@ int test_journal_server_1() {
ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0); ipfs_import_file(NULL, filename, &node, local_node, &bytes_written, 0);
ipfs_node_free(local_node); ipfs_node_free(local_node);
libp2p_logger_debug("test_journal", "*** Firing up daemon for server 2 ***\n"); libp2p_logger_debug("test_journal", "*** Firing up daemon for server 1 ***\n");
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
thread_started = 1; thread_started = 1;
@ -182,7 +182,7 @@ int test_journal_server_2() {
pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path); pthread_create(&daemon_thread, NULL, test_daemon_start, (void*)ipfs_path);
thread_started = 1; thread_started = 1;
sleep(30); sleep(120);
retVal = 1; retVal = 1;
exit: exit:

View file

@ -233,7 +233,7 @@ int test_routing_find_providers() {
struct Libp2pPeer *remote_peer = NULL; struct Libp2pPeer *remote_peer = NULL;
for(int i = 0; i < result->total; i++) { for(int i = 0; i < result->total; i++) {
remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i); remote_peer = (struct Libp2pPeer*)libp2p_utils_vector_get(result, i);
if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(&local_node.identity->private_key, remote_peer, local_node.peerstore, 5)) { if (remote_peer->connection_type == CONNECTION_TYPE_CONNECTED || libp2p_peer_connect(&local_node.identity->private_key, remote_peer, local_node.peerstore, local_node.repo->config->datastore, 5)) {
break; break;
} }
remote_peer = NULL; remote_peer = NULL;

View file

@ -205,19 +205,25 @@ int drop_build_open_repo(const char* path, struct FSRepo** fs_repo, const char*
if (config_filename_to_copy != NULL) { if (config_filename_to_copy != NULL) {
// attach config filename to path // attach config filename to path
char config[strlen(path) + 7]; char *config = (char*) malloc(strlen(path) + 7);
strcpy(config, path); strcpy(config, path);
// erase slash if there is one // erase slash if there is one
if (config[strlen(path)-1] == '/') if (config[strlen(path)-1] == '/')
config[strlen(path)-1] = 0; config[strlen(path)-1] = 0;
strcat(config, "/config"); strcat(config, "/config");
// delete the old // delete the old
if (unlink(config) != 0) if (unlink(config) != 0) {
free(config);
return 0; return 0;
}
// copy pre-built config file into directory // copy pre-built config file into directory
if (cp(config, config_filename_to_copy) < 0) if (cp(config, config_filename_to_copy) < 0) {
fprintf(stderr, "Unable to copy %s to %s. Error number %d.\n", config_filename_to_copy, config, errno);
free(config);
return 0; return 0;
} }
free(config);
}
if (!ipfs_repo_fsrepo_new(path, NULL, fs_repo)) if (!ipfs_repo_fsrepo_new(path, NULL, fs_repo))
return 0; return 0;