Beginnings of stream locking

yamux
jmjatlanta 2017-10-12 12:37:40 -05:00
parent 56e301df8d
commit 2391f23b4e
10 changed files with 196 additions and 7 deletions

View File

@ -90,6 +90,45 @@ int libp2p_session_context_free(struct SessionContext* context) {
return 1;
}
/***
* Attempt to lock a stream for personal use. Does not block.
* @param stream the stream to lock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_try_lock(struct Stream* stream) {
if (stream == NULL)
return 0;
if (pthread_mutex_trylock(&stream->socket_mutex) == 0)
return 1;
return 0;
}
/***
* Attempt to lock a stream for personal use. Blocks until the lock is acquired
* @param stream the stream to lock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_lock(struct Stream* stream) {
if (stream == NULL)
return 0;
if (pthread_mutex_lock(&stream->socket_mutex) == 0)
return 1;
return 0;
}
/***
* Attempt to unlock the mutex for this stream
* @param stream the stream to unlock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_unlock(struct Stream* stream) {
if (stream == NULL)
return 0;
if (pthread_mutex_unlock(&stream->socket_mutex) == 0)
return 1;
return 0;
}
/****
* Make a copy of a SessionContext
* @param original the original

View File

@ -1,5 +1,5 @@
#pragma once
#include <pthread.h>
#include "libp2p/crypto/key.h"
#include "libp2p/db/datastore.h"
#include "libp2p/db/filestore.h"
@ -18,6 +18,7 @@ struct SessionContext {
char* host;
int port;
enum IPTrafficType traffic_type;
pthread_mutex_t stream_mutex;
// once the connection is established
/**
* Note: default_stream should be used in most cases. Often, insecure_stream and secure_stream will be

View File

@ -33,6 +33,13 @@ struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void
*/
int libp2p_net_multistream_send_protocol(struct SessionContext *context);
/***
* Check to see if the reply is the multistream protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_multistream_receive_protocol(struct SessionContext* context);
/**
* Read from a multistream socket

View File

@ -1,5 +1,7 @@
#pragma once
#include <pthread.h>
/**
* An interface in front of various streams
*/
@ -8,6 +10,7 @@ struct Stream {
* A generic socket descriptor
*/
void* socket_descriptor;
pthread_mutex_t socket_mutex;
struct MultiAddress *address;
/**
@ -46,3 +49,24 @@ struct Stream {
*/
int (*peek)(void* stream_context);
};
/***
* Attempt to lock a stream for personal use. Does not block.
* @param stream the stream to lock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_try_lock(struct Stream* stream);
/***
* Attempt to lock a stream for personal use. Blocks until the lock is acquired
* @param stream the stream to lock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_lock(struct Stream* stream);
/***
* Attempt to unlock the mutex for this stream
* @param stream the stream to unlock
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_stream_unlock(struct Stream* stream);

View File

@ -13,3 +13,11 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler();
* @returns true(1) on success, false(0) otherwise
*/
int yamux_send_protocol(struct SessionContext* context);
/***
* Check to see if the reply is the yamux protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_receive_protocol(struct SessionContext* context);

View File

@ -38,9 +38,40 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco
return 0;
}
/***
* Send the multistream header out the default stream
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_multistream_send_protocol(struct SessionContext *context) {
char *protocol = "/multistream/1.0.0\n";
return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol));
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) {
libp2p_logger_error("multistream", "send_protocol: Unable to send multistream protocol header.\n");
return 0;
}
return 1;
}
/***
* Check to see if the reply is the multistream protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_multistream_receive_protocol(struct SessionContext* context) {
char* protocol = "/multistream/1.0.0\n";
uint8_t* results = NULL;
size_t results_size = 0;
if (!context->default_stream->read(context, &results, &results_size, 30)) {
libp2p_logger_error("multistream", "receive_protocol: Unable to read results.\n");
return 0;
}
// the first byte is the size, so skip it
char* ptr = strstr((char*)&results[1], protocol);
if (ptr == NULL || ptr - (char*)results > 1) {
return 0;
}
return 1;
}
int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) {
@ -456,6 +487,7 @@ void libp2p_net_multistream_stream_free(struct Stream* stream) {
struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port) {
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream));
if (out != NULL) {
pthread_mutex_init(&out->socket_mutex, NULL);
out->socket_descriptor = malloc(sizeof(int));
*((int*)out->socket_descriptor) = socket_fd;
int res = *((int*)out->socket_descriptor);

View File

@ -122,26 +122,48 @@ int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPee
peer->sessionContext->datastore = datastore;
peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout);
if (peer->sessionContext->insecure_stream == NULL) {
libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer));
libp2p_logger_error("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer));
free(ip);
return 0;
}
if (peer->sessionContext->insecure_stream != NULL) {
peer->sessionContext->default_stream = peer->sessionContext->insecure_stream;
peer->connection_type = CONNECTION_TYPE_CONNECTED;
peer->sessionContext->default_stream = peer->sessionContext->insecure_stream;
peer->connection_type = CONNECTION_TYPE_CONNECTED;
// lock the stream
if (!libp2p_stream_lock(peer->sessionContext->default_stream)) {
libp2p_logger_error("peer", "Unable to lock the newly created peer stream for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
return 0;
}
// switch to secio
if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) {
libp2p_logger_error("peer", "Attempted secio handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
//switch to multistream
if (!libp2p_net_multistream_negotiate(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted multistream handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
// switch to yamux
if (!yamux_send_protocol(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted yamux handshake, but could not send protocol header for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
libp2p_stream_unlock(peer->sessionContext->default_stream);
/*
// expect yamux back
if (!yamux_receive_protocol(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted yamux handshake, but received unexpected response.\n");
free(ip);
return 0;
}
*/
free(ip);
} // is IP
now = time(NULL);

View File

@ -995,6 +995,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
struct MultiAddress* ma = multiaddress_new_from_string(url);
if (ma == NULL) {
libp2p_logger_error("secio", "Unable to generate MultiAddress from [%s].\n", url);
goto exit;
} else {
libp2p_logger_debug("secio", "Adding %s to peer %s.\n", url, libp2p_peer_id_to_string(remote_peer));
struct Libp2pLinkedList* ma_ll = libp2p_utils_linked_list_new();

View File

@ -94,6 +94,7 @@ int test_secio_handshake() {
}
// attempt to write the protocol, and see what comes back
/*
char* protocol = "/secio/1.0.0\n";
int protocol_size = strlen(protocol);
secure_session->insecure_stream->write(secure_session, (unsigned char*)protocol, protocol_size);
@ -117,6 +118,22 @@ int test_secio_handshake() {
}
goto exit;
}
*/
// a new way to do the above
if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) {
libp2p_logger_error("test_secio", "Unable to do handshake\n");
if (secure_session->shared_key != NULL) {
fprintf(stdout, "Shared key: ");
for(int i = 0; i < secure_session->shared_key_size; i++)
fprintf(stdout, "%d ", secure_session->shared_key[i]);
fprintf(stdout, "\nLocal stretched key: ");
print_stretched_key(secure_session->local_stretched_key);
fprintf(stdout, "\nRemote stretched key: ");
print_stretched_key(secure_session->remote_stretched_key);
fprintf(stdout, "\n");
}
goto exit;
}
/*
fprintf(stdout, "Shared key: ");
@ -415,6 +432,7 @@ int test_secio_handshake_go() {
}
// attempt to write the protocol, and see what comes back
/*
char* protocol = "/secio/1.0.0\n";
int protocol_size = strlen(protocol);
secure_session->insecure_stream->write(secure_session, (unsigned char*)protocol, protocol_size);
@ -423,7 +441,6 @@ int test_secio_handshake_go() {
size_t bytes_read = 0;
int timeout = 30;
secure_session->insecure_stream->read(secure_session, &buffer, &bytes_read, timeout);
if (!libp2p_secio_handshake(secure_session, rsa_private_key, peerstore)) {
fprintf(stderr, "test_secio_handshake: Unable to do handshake\n");
if (secure_session->shared_key != NULL) {
@ -438,6 +455,22 @@ int test_secio_handshake_go() {
}
goto exit;
}
*/
// a new way to do the above
if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) {
libp2p_logger_error("test_secio", "Unable to do handshake.\n");
if (secure_session->shared_key != NULL) {
fprintf(stdout, "Shared key: ");
for(int i = 0; i < secure_session->shared_key_size; i++)
fprintf(stdout, "%d ", secure_session->shared_key[i]);
fprintf(stdout, "\nLocal stretched key: ");
print_stretched_key(secure_session->local_stretched_key);
fprintf(stdout, "\nRemote stretched key: ");
print_stretched_key(secure_session->remote_stretched_key);
fprintf(stdout, "\n");
}
goto exit;
}
/*
fprintf(stdout, "Shared key: ");

View File

@ -63,6 +63,28 @@ int yamux_send_protocol(struct SessionContext* context) {
return 1;
}
/***
* Check to see if the reply is the yamux protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_receive_protocol(struct SessionContext* context) {
char* protocol = "/yamux/1.0.0\n";
uint8_t* results = NULL;
size_t results_size = 0;
if (!context->default_stream->read(context, &results, &results_size, 30)) {
libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n");
return 0;
}
// the first byte is the size, so skip it
char* ptr = strstr((char*)&results[1], protocol);
if (ptr == NULL || ptr - (char*)results > 1) {
return 0;
}
return 1;
}
/***
* Handles the message
* @param incoming the incoming data buffer