implementing handler interface in front of multistream

This commit is contained in:
John Jones 2017-09-04 11:01:17 -05:00
parent 53f754af43
commit 9fd44b7878
6 changed files with 97 additions and 17 deletions

View file

@ -15,12 +15,24 @@
*/ */
struct MultistreamContext {
struct Libp2pVector* handlers;
};
/*** /***
* The handler to handle calls to the protocol * The handler to handle calls to the protocol
* @param stream_context the context * @param stream_context the context
* @returns the protocol handler * @returns the protocol handler
*/ */
struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* stream_context); struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector);
/**
* Sends the protocol header to the remote
* @param context the context
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_net_multistream_send_protocol(struct SessionContext *context);
/** /**
* Read from a multistream socket * Read from a multistream socket

View file

@ -56,7 +56,7 @@ void libp2p_peer_free(struct Libp2pPeer* in);
* @param timeout number of seconds before giving up * @param timeout number of seconds before giving up
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout); int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout);
/*** /***
* Clean up a bad connection * Clean up a bad connection

View file

@ -28,7 +28,9 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco
int protocol_size = strlen(protocol); int protocol_size = strlen(protocol);
// is there a varint in front? // is there a varint in front?
size_t num_bytes = 0; size_t num_bytes = 0;
if (incoming[0] != '/' && incoming[1] != 'm') {
varint_decode(incoming, incoming_size, &num_bytes); varint_decode(incoming, incoming_size, &num_bytes);
}
if (incoming_size >= protocol_size - num_bytes) { if (incoming_size >= protocol_size - num_bytes) {
if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0) if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0)
return 1; return 1;
@ -36,8 +38,49 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco
return 0; return 0;
} }
int libp2p_net_multistream_send_protocol(struct SessionContext *context) {
char *protocol = "/multistream/1.0.0\n";
return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol));
}
int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) { int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) {
return 0; // try sending the protocol back
//if (!libp2p_net_multistream_send_protocol(context))
// return -1;
struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context;
// try to read from the network
uint8_t *results = 0;
size_t bytes_read = 0;
int retVal = 0;
int max_retries = 10;
int numRetries = 0;
// handle the call
for(;;) {
// try to read for 5 seconds
if (context->default_stream->read(context, &results, &bytes_read, 5)) {
// we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (libp2p_net_multistream_can_handle(results, bytes_read))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
}
}
return retVal;
} }
int libp2p_net_multistream_shutdown(void* protocol_context) { int libp2p_net_multistream_shutdown(void* protocol_context) {
@ -49,10 +92,18 @@ int libp2p_net_multistream_shutdown(void* protocol_context) {
* @param stream_context the context * @param stream_context the context
* @returns the protocol handler * @returns the protocol handler
*/ */
struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* stream_context) { struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) {
// build the context
struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (context == NULL)
return NULL;
context->handlers = (struct Libp2pVector*) handler_vector;
// build the handler
struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new(); struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new();
if (handler != NULL) { if (handler != NULL) {
handler->context = stream_context; handler->context = context;
handler->CanHandle = libp2p_net_multistream_can_handle; handler->CanHandle = libp2p_net_multistream_can_handle;
handler->HandleMessage = libp2p_net_multistream_handle_message; handler->HandleMessage = libp2p_net_multistream_handle_message;
handler->Shutdown = libp2p_net_multistream_shutdown; handler->Shutdown = libp2p_net_multistream_shutdown;
@ -275,7 +326,6 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
int retVal = -1, return_result = -1, socket = -1; int retVal = -1, return_result = -1, socket = -1;
unsigned char* results = NULL; unsigned char* results = NULL;
size_t results_size; size_t results_size;
size_t num_bytes = 0;
struct Stream* stream = NULL; struct Stream* stream = NULL;
uint32_t ip = hostname_to_ip(hostname); uint32_t ip = hostname_to_ip(hostname);
@ -286,8 +336,6 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
goto exit; goto exit;
// send the multistream handshake // send the multistream handshake
char* protocol_buffer = "/multistream/1.0.0\n";
stream = libp2p_net_multistream_stream_new(socket, hostname, port); stream = libp2p_net_multistream_stream_new(socket, hostname, port);
if (stream == NULL) if (stream == NULL)
goto exit; goto exit;
@ -299,15 +347,15 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
// try to receive the protocol id // try to receive the protocol id
return_result = libp2p_net_multistream_read(&session, &results, &results_size, timeout_secs); return_result = libp2p_net_multistream_read(&session, &results, &results_size, timeout_secs);
if (return_result == 0 || results_size < 1) if (return_result == 0 || results_size < 1 || !libp2p_net_multistream_can_handle(results, results_size)) {
libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results);
goto exit; goto exit;
}
num_bytes = libp2p_net_multistream_write(&session, (unsigned char*)protocol_buffer, strlen(protocol_buffer)); if (!libp2p_net_multistream_send_protocol(&session)) {
if (num_bytes <= 0) libp2p_logger_error("multistream", "Attempted to send the multistream protocol header, but could not.\n");
goto exit;
if (strstr((char*)results, "multistream") == NULL)
goto exit; goto exit;
}
// we are now in the loop, so we can switch to another protocol (i.e. /secio/1.0.0) // we are now in the loop, so we can switch to another protocol (i.e. /secio/1.0.0)

View file

@ -45,6 +45,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() {
*/ */
int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) { int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) {
const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers); const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers);
char str[incoming_size + 1]; char str[incoming_size + 1];
memcpy(str, incoming, incoming_size); memcpy(str, incoming, incoming_size);
str[incoming_size] = 0; str[incoming_size] = 0;
@ -54,12 +55,14 @@ int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size,
break; break;
} }
} }
if (handler == NULL) { if (handler == NULL) {
libp2p_logger_error("protocol", "Unable to find handler for %s.\n", str); libp2p_logger_error("protocol", "Unable to find handler for %s.\n", str);
return -1; return -1;
} else { } else {
libp2p_logger_debug("protocol", "Found handler for %s.\n", str); libp2p_logger_debug("protocol", "Found handler for %s.\n", str);
} }
//TODO: strip off the protocol? //TODO: strip off the protocol?
return handler->HandleMessage(incoming, incoming_size, session, handler->context); return handler->HandleMessage(incoming, incoming_size, session, handler->context);
} }

View file

@ -96,7 +96,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) {
* @param peerstore if connection is successfull, will add peer to peerstore * @param peerstore if connection is successfull, will add peer to peerstore
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout) { int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) {
libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer)); libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer));
time_t now, prev = time(NULL); time_t now, prev = time(NULL);
// find an appropriate address // find an appropriate address
@ -109,6 +109,7 @@ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* pee
continue; continue;
int port = multiaddress_get_ip_port(ma); int port = multiaddress_get_ip_port(ma);
peer->sessionContext = libp2p_session_context_new(); peer->sessionContext = libp2p_session_context_new();
peer->sessionContext->datastore = datastore;
peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout); peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout);
if (peer->sessionContext->insecure_stream == NULL) { if (peer->sessionContext->insecure_stream == NULL) {
libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer)); libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer));

View file

@ -85,6 +85,7 @@ int libp2p_secio_initiate_handshake(struct SessionContext* session_context, stru
if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) {
return libp2p_secio_handshake(session_context, private_key, peer_store); return libp2p_secio_handshake(session_context, private_key, peer_store);
} }
libp2p_logger_error("secio", "Secio protocol exchange failed.\n");
return 0; return 0;
} }
@ -651,9 +652,16 @@ int libp2p_secio_receive_protocol(struct SessionContext* session) {
size_t buffer_size = 0; size_t buffer_size = 0;
int retVal = session->default_stream->read(session, &buffer, &buffer_size, numSecs); int retVal = session->default_stream->read(session, &buffer, &buffer_size, numSecs);
if (retVal == 0 || buffer != NULL) { if (retVal == 0 || buffer != NULL) {
if (strncmp(protocol, (char*)buffer, strlen(protocol)) == 0) if (strncmp(protocol, (char*)buffer, strlen(protocol)) == 0) {
free(buffer);
return 1; return 1;
} }
else {
libp2p_logger_error("secio", "Expected the secio protocol header, but received %s.\n", buffer);
}
}
if (buffer != NULL)
free(buffer);
return 0; return 0;
} }
@ -922,6 +930,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
if (bytes_written != propose_out_size) { if (bytes_written != propose_out_size) {
libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written);
} else {
libp2p_logger_debug("secio", "Sent propose out.\n");
} }
// try to get the Propse struct from the remote peer // try to get the Propse struct from the remote peer
@ -929,6 +939,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
if (bytes_written <= 0) { if (bytes_written <= 0) {
libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n");
goto exit; goto exit;
} else {
libp2p_logger_debug("secio", "Received their propose struct.\n");
} }
if (!libp2p_secio_propose_protobuf_decode(propose_in_bytes, propose_in_size -1, &propose_in)) { if (!libp2p_secio_propose_protobuf_decode(propose_in_bytes, propose_in_size -1, &propose_in)) {
@ -1031,6 +1043,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
if (exchange_out_protobuf_size != bytes_written) { if (exchange_out_protobuf_size != bytes_written) {
libp2p_logger_error("secio", "Unable to write exchange_out\n"); libp2p_logger_error("secio", "Unable to write exchange_out\n");
goto exit; goto exit;
} else {
libp2p_logger_debug("secio", "Sent exchange_out.\n");
} }
free(exchange_out_protobuf); free(exchange_out_protobuf);
exchange_out_protobuf = NULL; exchange_out_protobuf = NULL;
@ -1043,6 +1057,8 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_logger_error("secio", "unable to read exchange packet.\n");
libp2p_peer_handle_connection_error(remote_peer); libp2p_peer_handle_connection_error(remote_peer);
goto exit; goto exit;
} else {
libp2p_logger_debug("secio", "Read exchange packet.\n");
} }
libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in);
free(results); free(results);