Implementing the new way of swarm connection

This commit is contained in:
jmjatlanta 2017-10-23 15:21:50 -05:00
parent 8480542b45
commit 1dcac6ecb5
20 changed files with 404 additions and 514 deletions

View file

@ -98,7 +98,7 @@ int libp2p_session_context_free(struct SessionContext* context) {
int libp2p_stream_try_lock(struct Stream* stream) { int libp2p_stream_try_lock(struct Stream* stream) {
if (stream == NULL) if (stream == NULL)
return 0; return 0;
if (pthread_mutex_trylock(&stream->socket_mutex) == 0) if (pthread_mutex_trylock(stream->socket_mutex) == 0)
return 1; return 1;
return 0; return 0;
} }
@ -111,7 +111,7 @@ int libp2p_stream_try_lock(struct Stream* stream) {
int libp2p_stream_lock(struct Stream* stream) { int libp2p_stream_lock(struct Stream* stream) {
if (stream == NULL) if (stream == NULL)
return 0; return 0;
if (pthread_mutex_lock(&stream->socket_mutex) == 0) if (pthread_mutex_lock(stream->socket_mutex) == 0)
return 1; return 1;
return 0; return 0;
} }
@ -124,7 +124,7 @@ int libp2p_stream_lock(struct Stream* stream) {
int libp2p_stream_unlock(struct Stream* stream) { int libp2p_stream_unlock(struct Stream* stream) {
if (stream == NULL) if (stream == NULL)
return 0; return 0;
if (pthread_mutex_unlock(&stream->socket_mutex) == 0) if (pthread_mutex_unlock(stream->socket_mutex) == 0)
return 1; return 1;
return 0; return 0;
} }

View file

@ -1,5 +1,7 @@
#pragma once
/*** /***
* A local dialer. Uses MultiAddr to figure out the best way to * A local dialer. Uses MultiAddr to figure out the best way to
* connect to a client, then returns an open Connection that can be * connect to a client, then returns an open Connection that can be
* closed, read from and written to. The normal procedure is as follows: * closed, read from and written to. The normal procedure is as follows:
* 1) Create a Dialer struct, with the required information about the local host * 1) Create a Dialer struct, with the required information about the local host

View file

@ -0,0 +1,12 @@
#pragma once
#include "libp2p/net/stream.h"
/***
* Create a new stream based on a network connection
* @param fd the handle to the network connection
* @param ip the IP address of the connection
* @param port the port of the connection
* @returns a Stream
*/
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port);

View file

@ -17,6 +17,8 @@
struct MultistreamContext { struct MultistreamContext {
struct Libp2pVector* handlers; struct Libp2pVector* handlers;
struct SessionContext* session_context;
struct Stream* stream;
}; };
/*** /***

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "libp2p/conn/session.h" #include "libp2p/conn/session.h"
#include "libp2p/utils/vector.h" #include "libp2p/utils/vector.h"
#include "libp2p/net/stream.h"
/*** /***
* An "interface" for different IPFS protocols * An "interface" for different IPFS protocols
@ -16,7 +17,7 @@ struct Libp2pProtocolHandler {
* @param incoming_size the size of the incoming data buffer * @param incoming_size the size of the incoming data buffer
* @returns true(1) if it can handle this message, false(0) if not * @returns true(1) if it can handle this message, false(0) if not
*/ */
int (*CanHandle)(const uint8_t* incoming, size_t incoming_size); int (*CanHandle)(const struct StreamMessage* msg);
/*** /***
* Handles the message * Handles the message
* @param incoming the incoming data buffer * @param incoming the incoming data buffer
@ -25,7 +26,7 @@ struct Libp2pProtocolHandler {
* @param protocol_context the protocol-dependent context * @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int (*HandleMessage)(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context); int (*HandleMessage)(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context);
/** /**
* Shutting down. Clean up any memory allocations * Shutting down. Clean up any memory allocations
@ -41,4 +42,4 @@ struct Libp2pProtocolHandler {
*/ */
struct Libp2pProtocolHandler* libp2p_protocol_handler_new(); struct Libp2pProtocolHandler* libp2p_protocol_handler_new();
int libp2p_protocol_marshal(const uint8_t* incoming, size_t incoming_size, struct SessionContext* context, struct Libp2pVector* protocol_handlers); int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* context, struct Libp2pVector* protocol_handlers);

View file

@ -25,6 +25,15 @@ struct StreamMessage* libp2p_stream_message_new();
void libp2p_stream_message_free(struct StreamMessage* msg); void libp2p_stream_message_free(struct StreamMessage* msg);
/**
* This is a context struct for a basic IP connection
*/
struct ConnectionContext {
int socket_descriptor;
struct SessionContext* session_context;
};
/** /**
* An interface in front of various streams * An interface in front of various streams
*/ */
@ -32,9 +41,13 @@ struct Stream {
/** /**
* A generic socket descriptor * A generic socket descriptor
*/ */
void* socket_descriptor; struct MultiAddress* address; // helps identify who is on the other end
pthread_mutex_t socket_mutex; pthread_mutex_t* socket_mutex; // only 1 transmission at a time
struct MultiAddress *address; struct Stream* parent_stream; // what stream wraps this stream
/**
* A generic place to store implementation-specific context items
*/
void* stream_context;
/** /**
* Reads from the stream * Reads from the stream
@ -45,6 +58,16 @@ struct Stream {
*/ */
int (*read)(void* stream_context, struct StreamMessage** message, int timeout_secs); int (*read)(void* stream_context, struct StreamMessage** message, int timeout_secs);
/**
* Reads a certain amount of bytes directly from the stream
* @param stream_context the context
* @param buffer where to put the results
* @param buffer_size the number of bytes to read
* @param timeout_secs number of seconds before a timeout
* @returns number of bytes read, or -1 on error
*/
int (*read_raw)(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs);
/** /**
* Writes to a stream * Writes to a stream
* @param stream the stream context (usually a SessionContext pointer) * @param stream the stream context (usually a SessionContext pointer)

View file

@ -4,6 +4,7 @@
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
#include "libp2p/crypto/rsa.h" #include "libp2p/crypto/rsa.h"
#include "libp2p/conn/session.h" #include "libp2p/conn/session.h"
#include "libp2p/conn/dialer.h"
struct Peerstore; struct Peerstore;
@ -50,13 +51,13 @@ void libp2p_peer_free(struct Libp2pPeer* in);
* Attempt to connect to the peer, setting connection_type correctly * Attempt to connect to the peer, setting connection_type correctly
* NOTE: If successful, this will set peer->connection to the stream * NOTE: If successful, this will set peer->connection to the stream
* *
* @param privateKey the local private key to use * @param dialer the dialer
* @param peer the peer to connect to * @param peer the peer to connect to
* @param peerstore if connection is successfull, will add peer to peerstore * @param peerstore if connection is successfull, will add peer to peerstore
* @param timeout number of seconds before giving up * @param timeout number of seconds before giving up
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); int libp2p_peer_connect(const struct Dialer* dialer, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout);
/*** /***
* Clean up a bad connection * Clean up a bad connection

View file

@ -63,5 +63,5 @@ int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, st
* @param msg the message to send * @param msg the message to send
* @returns true(1) if we sent to at least 1, false(0) otherwise * @returns true(1) if we sent to at least 1, false(0) otherwise
*/ */
int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, int libp2p_routing_dht_send_message_nearest_x(const struct Dialer* dialer, struct Peerstore* peerstore,
struct Datastore* datastore, struct KademliaMessage* msg, int numToSend); struct Datastore* datastore, struct KademliaMessage* msg, int numToSend);

View file

@ -10,38 +10,40 @@
* Handling of a secure connection * Handling of a secure connection
*/ */
struct SecioContext {
struct Stream* stream;
struct SessionContext* session_context;
struct RsaPrivateKey* private_key;
struct Peerstore* peer_store;
};
struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store); struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store);
/*** /***
* performs initial communication over an insecure channel to share * performs initial communication over an insecure channel to share
* keys, IDs, and initiate connection. This is a framed messaging system * keys, IDs, and initiate connection. This is a framed messaging system
* @param session the secure session to be filled * @param ctx the SecioContext
* @param private_key the local private key to use
* @param remote_requested the other side is who asked for the upgrade
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SessionContext* session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore); int libp2p_secio_handshake(struct SecioContext* ctx);
/*** /***
* Initiates a secio handshake. Use this method when you want to initiate a secio * Initiates a secio handshake. Use this method when you want to initiate a secio
* session. This should not be used to respond to incoming secio requests * session. This should not be used to respond to incoming secio requests
* @param session_context the session context * @param ctx the SecioContext
* @param private_key the RSA private key to use
* @param peer_store the peer store
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store); int libp2p_secio_initiate_handshake(struct SecioContext* ctx);
/*** /***
* Send the protocol string to the remote stream * Send the protocol string to the remote stream
* @param session the context * @param session the context
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_send_protocol(struct SessionContext* session); int libp2p_secio_send_protocol(struct SecioContext* session);
/*** /***
* Attempt to read the secio protocol as a reply from the remote * Attempt to read the secio protocol as a reply from the remote
* @param session the context * @param session the context
* @returns true(1) if we received what we think we should have, false(0) otherwise * @returns true(1) if we received what we think we should have, false(0) otherwise
*/ */
int libp2p_secio_receive_protocol(struct SessionContext* session); int libp2p_secio_receive_protocol(struct SecioContext* session);

View file

@ -7,7 +7,7 @@ endif
LFLAGS = LFLAGS =
DEPS = DEPS =
OBJS = sctp.o socket.o tcp.o udp.o multistream.o protocol.o OBJS = sctp.o socket.o tcp.o udp.o multistream.o protocol.o connectionstream.o
%.o: %.c $(DEPS) %.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS) $(CC) -c -o $@ $< $(CFLAGS)

131
net/connectionstream.c Normal file
View file

@ -0,0 +1,131 @@
/**
* A raw network connection, that implements Stream
*/
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include "libp2p/net/stream.h"
#include "libp2p/net/p2pnet.h"
#include "multiaddr/multiaddr.h"
/**
* Close a network connection
* @param stream_context the ConnectionContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_connection_close(void* stream_context) {
if (stream_context == NULL)
return 0;
struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context;
if (close(ctx->socket_descriptor) == 0)
// everything was okay
return 1;
// something went wrong
return 0;
}
/***
* Check and see if there is anything waiting on this network connection
* @param stream_context the ConnectionContext
* @returns number of bytes waiting, or -1 on error
*/
int libp2p_net_connection_peek(void* stream_context) {
if (stream_context == NULL)
return 0;
struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context;
int socket_fd = ctx->socket_descriptor;
if (socket_fd < 0)
return -1;
int bytes = 0;
if (ioctl(socket_fd, FIONREAD, &bytes) < 0) {
// Ooff, we're having problems. Don't use this socket again.
return -1;
}
return bytes;
}
/**
* Read from the network
* @param stream_context the ConnectionContext
* @param msg where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_connection_read(void* stream_context, struct StreamMessage** msg, int timeout_secs) {
return 0;
}
/**
* Reads a certain amount of bytes directly from the stream
* @param stream_context the context
* @param buffer where to put the results
* @param buffer_size the number of bytes to read
* @param timeout_secs number of seconds before a timeout
* @returns number of bytes read, or -1 on error
*/
int libp2p_net_connection_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) {
if (stream_context == NULL)
return -1;
struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context;
int num_read = 0;
for(int i = 0; i < buffer_size; i++) {
int retVal = socket_read(ctx->socket_descriptor, (char*)&buffer[i], 1, 0, timeout_secs);
if (retVal < 1) { // get out of the loop
if (retVal < 0) // error
return -1;
break;
}
num_read += retVal; // Everything ok, loop again (possibly)
}
return num_read;
}
/**
* Writes to a stream
* @param stream the stream context (usually a SessionContext pointer)
* @param buffer what to write
* @returns number of bytes written
*/
int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) {
if (stream_context == NULL)
return -1;
struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context;
return socket_write(ctx->socket_descriptor, (char*)msg->data, msg->data_size, 0);
}
/***
* Create a new stream based on a network connection
* @param fd the handle to the network connection
* @param ip the IP address of the connection
* @param port the port of the connection
* @returns a Stream
*/
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) {
struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream));
if (out != NULL) {
out->close = libp2p_net_connection_close;
out->peek = libp2p_net_connection_peek;
out->read = libp2p_net_connection_read;
out->read_raw = libp2p_net_connection_read_raw;
out->write = libp2p_net_connection_write;
// Multiaddresss
char str[strlen(ip) + 25];
memset(str, 0, strlen(ip) + 16);
sprintf(str, "/ip4/%s/tcp/%d", ip, port);
out->address = multiaddress_new_from_string(str);
out->parent_stream = NULL;
// mutex
out->socket_mutex = (pthread_mutex_t*) malloc(sizeof(pthread_mutex_t));
pthread_mutex_init(out->socket_mutex, NULL);
// context
struct ConnectionContext* ctx = (struct ConnectionContext*) malloc(sizeof(struct ConnectionContext));
if (ctx != NULL) {
out->stream_context = ctx;
}
}
return out;
}

View file

@ -23,9 +23,11 @@ int multistream_default_timeout = 5;
* An implementation of the libp2p multistream * An implementation of the libp2p multistream
*/ */
int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t incoming_size) { int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) {
char *protocol = "/multistream/1.0.0\n"; char *protocol = "/multistream/1.0.0\n";
int protocol_size = strlen(protocol); int protocol_size = strlen(protocol);
unsigned char* incoming = msg->data;
size_t incoming_size = msg->data_size;
// is there a varint in front? // is there a varint in front?
size_t num_bytes = 0; size_t num_bytes = 0;
if (incoming[0] != '/' && incoming[1] != 'm') { if (incoming[0] != '/' && incoming[1] != 'm') {
@ -76,7 +78,7 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) {
return 1; return 1;
} }
int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incoming_size, struct SessionContext* context, void* protocol_context) { int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) {
// try sending the protocol back // try sending the protocol back
//if (!libp2p_net_multistream_send_protocol(context)) //if (!libp2p_net_multistream_send_protocol(context))
// return -1; // return -1;
@ -94,10 +96,10 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi
if (context->default_stream->read(context, &results, 5)) { if (context->default_stream->read(context, &results, 5)) {
// we read something from the network. Process it. // we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it. // NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (libp2p_net_multistream_can_handle(results->data, results->data_size)) if (libp2p_net_multistream_can_handle(results))
continue; continue;
numRetries = 0; numRetries = 0;
retVal = libp2p_protocol_marshal(results->data, results->data_size, context, multistream_context->handlers); retVal = libp2p_protocol_marshal(results, context, multistream_context->handlers);
if (results != NULL) if (results != NULL)
free(results); free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0) // exit the loop on error (or if they ask us to no longer loop by returning 0)
@ -147,48 +149,48 @@ struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void
return handler; return handler;
} }
/**
* Close the connection and free memory
* @param ctx the context
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) {
int retVal = ctx->stream->close(ctx);
// regardless of retVal, free the context
// TODO: Evaluate if this is the correct way to do it:
free(ctx);
return retVal;
}
/*** /***
* Close the Multistream interface * Close the Multistream interface
* NOTE: This also closes the socket * NOTE: This also closes the socket
* @param stream_context a SessionContext * @param stream_context a SessionContext
* @returns true(1) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_net_multistream_close(void* stream_context) { int libp2p_net_multistream_close(void* stream_context) {
struct SessionContext* secure_context = (struct SessionContext*)stream_context; if (stream_context == NULL) {
struct Stream* stream = secure_context->default_stream; return 0;
if (stream == NULL || stream->socket_descriptor == NULL) }
return 1; struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context;
libp2p_net_multistream_stream_free(stream); return libp2p_net_multistream_context_free(multistream_context);
secure_context->default_stream = NULL;
secure_context->insecure_stream = NULL;
secure_context->secure_stream = NULL;
return 1;
} }
/*** /***
* Check the stream to see if there is something to read * Check the stream to see if there is something to read
* @param stream_context a SessionContext * @param stream_context a MultistreamContext
* @returns number of bytes to be read, or -1 if there was an error * @returns number of bytes to be read, or -1 if there was an error
*/ */
int libp2p_net_multistream_peek(void* stream_context) { int libp2p_net_multistream_peek(void* stream_context) {
if (stream_context == NULL) if (stream_context == NULL)
return -1; return -1;
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context;
struct Stream* stream = session_context->default_stream; struct Stream* parent_stream = multistream_context->stream->parent_stream;
if (stream == NULL) if (parent_stream == NULL)
return -1; return -1;
int socket_fd = *((int*)stream->socket_descriptor); return parent_stream->peek(parent_stream);
if (socket_fd < 0)
return -1;
int bytes = 0;
if (ioctl(socket_fd, FIONREAD, &bytes) < 0) {
// Ooff, we're having problems. Don't use this socket again.
return -1;
}
return bytes;
} }
/** /**
@ -197,43 +199,32 @@ int libp2p_net_multistream_peek(void* stream_context) {
* @param msg the data to send * @param msg the data to send
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg) { int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context;
struct Stream* stream = session_context->default_stream; struct Stream* parent_stream = multistream_context->stream->parent_stream;
int num_bytes = 0; int num_bytes = 0;
if (msg->data_size > 0) { // only do this is if there is something to send if (incoming->data_size > 0) { // only do this is if there is something to send
// first send the size // first get the size as a varint
unsigned char varint[12]; unsigned char varint[12];
size_t varint_size = 0; size_t varint_size = 0;
varint_encode(msg->data_size, &varint[0], 12, &varint_size); varint_encode(incoming->data_size, &varint[0], 12, &varint_size);
// now put the size with the data // now put the size with the data
unsigned char* buffer = (unsigned char*)malloc(msg->data_size + varint_size);
if (buffer == NULL)
return 0;
memset(buffer, 0, msg->data_size + varint_size);
memcpy(buffer, varint, varint_size);
memcpy(&buffer[varint_size], msg->data, msg->data_size);
// determine if this should run through the secio protocol or not
if (session_context->secure_stream == NULL) {
int sd = *((int*)stream->socket_descriptor);
// do a "raw" write
num_bytes = socket_write(sd, (char*)varint, varint_size, 0);
if (num_bytes == 0) {
free(buffer);
return 0;
}
// then send the actual data
num_bytes += socket_write(sd, (char*)msg->data, msg->data_size, 0);
session_context->last_comm_epoch = os_utils_gmtime();
} else {
// write using secio
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = buffer; outgoing.data = (uint8_t*) malloc(varint_size + incoming->data_size);
outgoing.data_size = msg->data_size + varint_size; if (outgoing.data == NULL) {
num_bytes = stream->write(stream_context, &outgoing); return 0;
} }
free(buffer); memset(outgoing.data, 0, incoming->data_size + varint_size);
memcpy(outgoing.data, varint, varint_size);
memcpy(&outgoing.data[varint_size], incoming->data, incoming->data_size);
// now ship it
num_bytes = parent_stream->write(parent_stream, &outgoing);
if (num_bytes > 0) {
// update the last time we communicated
multistream_context->session_context->last_comm_epoch = os_utils_gmtime();
}
free(outgoing.data);
} }
return num_bytes; return num_bytes;
@ -245,114 +236,47 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg
* @param results where to put the results. NOTE: this memory is allocated * @param results where to put the results. NOTE: this memory is allocated
* @param results_size the size of the results in bytes * @param results_size the size of the results in bytes
* @param timeout_secs the seconds before a timeout * @param timeout_secs the seconds before a timeout
* @returns number of bytes received * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** results, int timeout_secs) { int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** results, int timeout_secs) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context;
struct Stream* stream = session_context->default_stream; struct Stream* parent_stream = multistream_context->stream->parent_stream;
int bytes = 0;
// TODO: this is arbitrary, and should be dynamic // find out the length
size_t buffer_size = 362144; uint8_t varint[12];
char buffer[buffer_size]; size_t num_bytes_requested = 0;
char* pos = buffer; size_t varint_length = 0;
size_t num_bytes_requested = 0, left = 0, already_read = 0; for(int i = 0; i < 12; i++) {
if (!parent_stream->read_raw(parent_stream->stream_context, &varint[i], 1, timeout_secs)) {
if (session_context->secure_stream == NULL) {
int socketDescriptor = *( (int*) stream->socket_descriptor);
// first read the varint
while(1) {
unsigned char c = '\0';
bytes = socket_read(socketDescriptor, (char*)&c, 1, 0, timeout_secs);
if (bytes <= 0) {
// possible error
if (bytes < 0)
libp2p_logger_error("multistream", "socket_read returned %d reading socket %d\n", bytes, socketDescriptor);
return 0; return 0;
} }
pos[0] = c; if (varint[i] >> 7 == 0) {
if (c >> 7 == 0) { num_bytes_requested = varint_decode(&varint[0], i+1, &varint_length);
pos[1] = 0;
num_bytes_requested = varint_decode((unsigned char*)buffer, strlen(buffer), NULL);
break; break;
} }
pos++;
}
if (num_bytes_requested <= 0) {
libp2p_logger_debug("multistream", "Reading the varint returned %d on socket %d\n", num_bytes_requested, socketDescriptor);
return 0;
} }
left = num_bytes_requested; if (num_bytes_requested <= 0)
do {
bytes = socket_read(socketDescriptor, &buffer[already_read], left, 0, timeout_secs);
if (bytes < 0) {
bytes = 0;
if ( errno == EAGAIN ) {
// do something intelligent
} else {
libp2p_logger_error("multistream", "socket read returned error %d on socket descriptor %d.\n", errno, socketDescriptor);
return 0;
}
}
left = left - bytes;
already_read += bytes;
} while (left > 0);
if (already_read != num_bytes_requested)
return 0; return 0;
// parse the results, removing the leading size indicator // now get the data
*results = libp2p_stream_message_new(); *results = libp2p_stream_message_new();
struct StreamMessage* rslts = *results; struct StreamMessage* rslts = *results;
if (rslts == NULL)
return 0;
rslts->data_size = num_bytes_requested; rslts->data_size = num_bytes_requested;
rslts->data = (uint8_t*) malloc(num_bytes_requested); rslts->data = (uint8_t*) malloc(num_bytes_requested);
if (rslts->data == NULL) { if (rslts->data == NULL) {
libp2p_stream_message_free(rslts); libp2p_stream_message_free(rslts);
rslts = NULL;
}
// now get the data from the parent stream
if (!parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs)) {
// problem reading from the parent stream
libp2p_stream_message_free(*results);
*results = NULL;
return 0; return 0;
} }
memcpy(rslts->data, buffer, num_bytes_requested);
session_context->last_comm_epoch = os_utils_gmtime();
} else { // use secio instead of raw read/writes
struct StreamMessage* read_from_stream;
if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) {
return 0;
}
// pull out num_bytes_requested
num_bytes_requested = varint_decode(read_from_stream->data, read_from_stream->data_size, &left);
memcpy(buffer, read_from_stream->data, read_from_stream->data_size);
buffer_size = read_from_stream->data_size;
libp2p_stream_message_free(read_from_stream);
read_from_stream = NULL;
while (num_bytes_requested > buffer_size - left) {
// need to read more into buffer
if (session_context->default_stream->read(session_context, &read_from_stream, timeout_secs) == 0) {
return 0;
}
memcpy(&buffer[buffer_size], read_from_stream->data, read_from_stream->data_size);
buffer_size += read_from_stream->data_size;
libp2p_stream_message_free(read_from_stream);
}
*results = libp2p_stream_message_new();
struct StreamMessage* rslts = *results;
if (rslts == NULL) {
libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested);
return 0;
}
rslts->data_size = num_bytes_requested;
rslts->data = (uint8_t*) malloc(num_bytes_requested);
if (rslts->data == NULL) {
libp2p_stream_message_free(rslts);
libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested);
return 0;
}
memcpy(rslts->data, &buffer[left], num_bytes_requested);
session_context->last_comm_epoch = os_utils_gmtime();
}
return num_bytes_requested; return 1;
} }
@ -397,7 +321,7 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
// try to receive the protocol id // try to receive the protocol id
return_result = libp2p_net_multistream_read(&session, &results, timeout_secs); return_result = libp2p_net_multistream_read(&session, &results, timeout_secs);
if (results == NULL || return_result == 0 || results->data_size < 1 || !libp2p_net_multistream_can_handle(results->data, results->data_size)) { if (results == NULL || return_result == 0 || results->data_size < 1 || !libp2p_net_multistream_can_handle(results)) {
libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results); libp2p_logger_error("multistream", "Attempted to receive the multistream protocol header, but received %s.\n", results);
goto exit; goto exit;
} }
@ -454,44 +378,10 @@ int libp2p_net_multistream_negotiate(struct SessionContext* session) {
return retVal; return retVal;
} }
/**
* Expect to read a message
* @param fd the socket file descriptor
* @returns the retrieved message, or NULL
*/
/*
struct Libp2pMessage* libp2p_net_multistream_get_message(struct Stream* stream) {
int retVal = 0;
unsigned char* results = NULL;
size_t results_size = 0;
struct Libp2pMessage* msg = NULL;
// read what they sent
libp2p_net_multistream_read(stream, &results, &results_size);
// unprotobuf it
if (!libp2p_message_protobuf_decode(results, results_size, &msg))
goto exit;
// clean up
retVal = 1;
exit:
if (results != NULL)
free(results);
if (retVal != 1 && msg != NULL)
libp2p_message_free(msg);
return msg;
}
*/
void libp2p_net_multistream_stream_free(struct Stream* stream) { void libp2p_net_multistream_stream_free(struct Stream* stream) {
if (stream != NULL) { if (stream != NULL) {
if (stream->socket_descriptor != NULL) { stream->parent_stream->close(stream->parent_stream->stream_context);
close( *((int*)stream->socket_descriptor)); // TODO: free memory allocations
free(stream->socket_descriptor);
}
if (stream->address != NULL)
multiaddress_free(stream->address);
free(stream);
} }
} }
@ -504,14 +394,7 @@ void libp2p_net_multistream_stream_free(struct Stream* stream) {
struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port) { struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port) {
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream));
if (out != NULL) { if (out != NULL) {
pthread_mutex_init(&out->socket_mutex, NULL); out->parent_stream = NULL;
out->socket_descriptor = malloc(sizeof(int));
*((int*)out->socket_descriptor) = socket_fd;
int res = *((int*)out->socket_descriptor);
if (res != socket_fd) {
libp2p_net_multistream_stream_free(out);
return NULL;
}
out->close = libp2p_net_multistream_close; out->close = libp2p_net_multistream_close;
out->read = libp2p_net_multistream_read; out->read = libp2p_net_multistream_read;
out->write = libp2p_net_multistream_write; out->write = libp2p_net_multistream_write;

View file

@ -3,6 +3,10 @@
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
#include "libp2p/net/protocol.h" #include "libp2p/net/protocol.h"
/*
* Handle the different protocols
*/
/*** /***
* Compare incoming to see if they are requesting a protocol upgrade * Compare incoming to see if they are requesting a protocol upgrade
* @param incoming the incoming string * @param incoming the incoming string
@ -10,10 +14,10 @@
* @param test the protocol string to compare it with (i.e. "/secio" or "/nodeio" * @param test the protocol string to compare it with (i.e. "/secio" or "/nodeio"
* @returns true(1) if there was a match, false(0) otherwise * @returns true(1) if there was a match, false(0) otherwise
*/ */
const struct Libp2pProtocolHandler* protocol_compare(const unsigned char* incoming, size_t incoming_size, struct Libp2pVector* protocol_handlers) { const struct Libp2pProtocolHandler* protocol_compare(struct StreamMessage* msg, struct Libp2pVector* protocol_handlers) {
for(int i = 0; i < protocol_handlers->total; i++) { for(int i = 0; i < protocol_handlers->total; i++) {
const struct Libp2pProtocolHandler* handler = (const struct Libp2pProtocolHandler*) libp2p_utils_vector_get(protocol_handlers, i); const struct Libp2pProtocolHandler* handler = (const struct Libp2pProtocolHandler*) libp2p_utils_vector_get(protocol_handlers, i);
if (handler->CanHandle(incoming, incoming_size)) { if (handler->CanHandle(msg)) {
return handler; return handler;
} }
} }
@ -43,26 +47,24 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() {
* @param handlers a Vector of protocol handlers * @param handlers a Vector of protocol handlers
* @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success
*/ */
int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) { int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* session, struct Libp2pVector* handlers) {
const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers); const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers);
char str[incoming_size + 1]; if (handler == NULL) {
memcpy(str, incoming, incoming_size); // turn msg->data to a null terminated string for the error message
str[incoming_size] = 0; char str[msg->data_size + 1];
for(int i = 0; i < incoming_size; i++) { memcpy(str, msg->data, msg->data_size);
str[msg->data_size] = 0;
for(int i = 0; i < msg->data_size; i++) {
if (str[i] == '\n') { if (str[i] == '\n') {
str[i] = 0; str[i] = 0;
break; break;
} }
} }
if (handler == NULL) {
libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str); libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str);
return -1; return -1;
} else {
libp2p_logger_debug("protocol", "Found handler for %s.\n", str);
} }
//TODO: strip off the protocol? //TODO: strip off the protocol?
return handler->HandleMessage(incoming, incoming_size, session, handler->context); return handler->HandleMessage(msg, session, handler->context);
} }

View file

@ -97,7 +97,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) {
* @param peerstore if connection is successfull, will add peer to peerstore * @param peerstore if connection is successfull, will add peer to peerstore
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { int libp2p_peer_connect(const struct Dialer* dialer, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) {
// fix the connection type if in an invalid state // fix the connection type if in an invalid state
if (peer->connection_type == CONNECTION_TYPE_CONNECTED && peer->sessionContext == NULL) if (peer->connection_type == CONNECTION_TYPE_CONNECTED && peer->sessionContext == NULL)
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
@ -107,65 +107,12 @@ int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPee
struct Libp2pLinkedList* current_address = peer->addr_head; struct Libp2pLinkedList* current_address = peer->addr_head;
while (current_address != NULL && peer->connection_type != CONNECTION_TYPE_CONNECTED) { while (current_address != NULL && peer->connection_type != CONNECTION_TYPE_CONNECTED) {
struct MultiAddress *ma = (struct MultiAddress*)current_address->item; struct MultiAddress *ma = (struct MultiAddress*)current_address->item;
if (multiaddress_is_ip(ma)) { // use the dialer to attempt to dial this MultiAddress and join the swarm
char* ip = NULL; struct Stream* yamux_stream = libp2p_conn_dialer_get_stream(dialer, ma, "yamux");
if (!multiaddress_get_ip_address(ma, &ip)) if (yamux_stream != NULL) {
continue; // we're okay, get out
int port = multiaddress_get_ip_port(ma); break;
// out with the old
if (peer->sessionContext != NULL) {
libp2p_session_context_free(peer->sessionContext);
} }
peer->sessionContext = libp2p_session_context_new();
peer->sessionContext->host = ip;
peer->sessionContext->port = port;
peer->sessionContext->datastore = datastore;
peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout);
if (peer->sessionContext->insecure_stream == NULL) {
libp2p_logger_error("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer));
free(ip);
return 0;
}
peer->sessionContext->default_stream = peer->sessionContext->insecure_stream;
peer->connection_type = CONNECTION_TYPE_CONNECTED;
// lock the stream
if (!libp2p_stream_lock(peer->sessionContext->default_stream)) {
libp2p_logger_error("peer", "Unable to lock the newly created peer stream for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
return 0;
}
// switch to secio
if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) {
libp2p_logger_error("peer", "Attempted secio handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
//switch to multistream
if (!libp2p_net_multistream_negotiate(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted multistream handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
// switch to yamux
if (!yamux_send_protocol(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted yamux handshake, but could not send protocol header for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip);
libp2p_stream_unlock(peer->sessionContext->default_stream);
return 0;
}
libp2p_stream_unlock(peer->sessionContext->default_stream);
/*
// expect yamux back
if (!yamux_receive_protocol(peer->sessionContext)) {
libp2p_logger_error("peer", "Attempted yamux handshake, but received unexpected response.\n");
free(ip);
return 0;
}
*/
free(ip);
} // is IP
now = time(NULL); now = time(NULL);
if (now >= (prev + timeout)) if (now >= (prev + timeout))
break; break;

View file

@ -20,11 +20,11 @@ struct DhtContext {
struct ProviderStore* provider_store; struct ProviderStore* provider_store;
}; };
int libp2p_routing_dht_can_handle(const uint8_t* incoming, size_t incoming_size) { int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) {
if (incoming_size < 8) if (msg->data_size < 8)
return 0; return 0;
char* result = strnstr((char*)incoming, "/ipfs/kad", incoming_size); char* result = strnstr((char*)msg->data, "/ipfs/kad", msg->data_size);
if (result != NULL && result == (char*)incoming) if (result != NULL && result == (char*)msg->data)
return 1; return 1;
return 0; return 0;
} }
@ -34,7 +34,7 @@ int libp2p_routing_dht_shutdown(void* context) {
return 1; return 1;
} }
int libp2p_routing_dht_handle_msg(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* context) { int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct SessionContext* session_context, void* context) {
libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id); libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id);
struct DhtContext* ctx = (struct DhtContext*)context; struct DhtContext* ctx = (struct DhtContext*)context;
if (!libp2p_routing_dht_handshake(session_context)) if (!libp2p_routing_dht_handshake(session_context))
@ -572,7 +572,7 @@ int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struc
* @param msg the message to send * @param msg the message to send
* @returns true(1) if we sent to at least 1, false(0) otherwise * @returns true(1) if we sent to at least 1, false(0) otherwise
*/ */
int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, int libp2p_routing_dht_send_message_nearest_x(const struct Dialer* dialer, struct Peerstore* peerstore,
struct Datastore* datastore, struct KademliaMessage* msg, int numToSend) { struct Datastore* datastore, struct KademliaMessage* msg, int numToSend) {
// TODO: Calculate "Nearest" // TODO: Calculate "Nearest"
// but for now, grab x peers, and send to them // but for now, grab x peers, and send to them
@ -585,7 +585,7 @@ int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* privat
struct Libp2pPeer* remote_peer = entry->peer; struct Libp2pPeer* remote_peer = entry->peer;
if (!remote_peer->is_local) { if (!remote_peer->is_local) {
// connect (if not connected) // connect (if not connected)
if (libp2p_peer_connect(private_key, remote_peer, peerstore, datastore, 5)) { if (libp2p_peer_connect(dialer, remote_peer, peerstore, datastore, 5)) {
// send message // send message
if (libp2p_routing_dht_send_message(remote_peer->sessionContext, msg)) if (libp2p_routing_dht_send_message(remote_peer->sessionContext, msg))
numSent++; numSent++;

View file

@ -34,37 +34,31 @@ const char* SupportedExchanges = "P-256,P-384,P-521";
const char* SupportedCiphers = "AES-256,AES-128,Blowfish"; const char* SupportedCiphers = "AES-256,AES-128,Blowfish";
const char* SupportedHashes = "SHA256,SHA512"; const char* SupportedHashes = "SHA256,SHA512";
struct SecioContext { int libp2p_secio_can_handle(const struct StreamMessage* msg) {
struct RsaPrivateKey* private_key;
struct Peerstore* peer_store;
};
int libp2p_secio_can_handle(const uint8_t* incoming, size_t incoming_size) {
const char* protocol = "/secio/1.0.0"; const char* protocol = "/secio/1.0.0";
// sanity checks // sanity checks
if (incoming_size < 12) if (msg->data_size < 12)
return 0; return 0;
char* result = strnstr((char*)incoming, protocol, incoming_size); char* result = strnstr((char*)msg->data, protocol, msg->data_size);
if (result != NULL && result == (char*)incoming) if (result != NULL && result == (char*)msg->data)
return 1; return 1;
return 0; return 0;
} }
/*** /***
* Handle a secio message * Handle a secio message
* @param incoming the incoming bytes * @param msg the incoming message
* @param incoming_size the size of the incoming buffer
* @param session_context who is attempting to connect * @param session_context who is attempting to connect
* @param protocol_context a SecioContext that contains the needed information * @param protocol_context a SecioContext that contains the needed information
* @returns <0 on error, 0 if okay (does not allow daemon to continue looping) * @returns <0 on error, 0 if okay (does not allow daemon to continue looping)
*/ */
int libp2p_secio_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { int libp2p_secio_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
libp2p_logger_debug("secio", "Handling incoming secio message.\n"); libp2p_logger_debug("secio", "Handling incoming secio message.\n");
struct SecioContext* ctx = (struct SecioContext*)protocol_context; struct SecioContext* ctx = (struct SecioContext*)protocol_context;
// send them the protocol // send them the protocol
if (!libp2p_secio_send_protocol(session_context)) if (!libp2p_secio_send_protocol(ctx))
return -1; return -1;
int retVal = libp2p_secio_handshake(session_context, ctx->private_key, ctx->peer_store); int retVal = libp2p_secio_handshake(ctx);
if (retVal) if (retVal)
return 0; return 0;
return -1; return -1;
@ -83,9 +77,9 @@ int libp2p_secio_shutdown(void* context) {
* @param peer_store the peer store * @param peer_store the peer store
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store) { int libp2p_secio_initiate_handshake(struct SecioContext* ctx) {
if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { if (libp2p_secio_send_protocol(ctx) && libp2p_secio_receive_protocol(ctx)) {
return libp2p_secio_handshake(session_context, private_key, peer_store); return libp2p_secio_handshake(ctx);
} }
libp2p_logger_error("secio", "Secio protocol exchange failed.\n"); libp2p_logger_error("secio", "Secio protocol exchange failed.\n");
return 0; return 0;
@ -511,159 +505,30 @@ int libp2p_secio_make_mac_and_cipher(struct SessionContext* session, struct Stre
return 1; return 1;
} }
/***
* Write bytes to an unencrypted stream
* @param session the session information
* @param bytes the bytes to write
* @param data_length the number of bytes to write
* @returns the number of bytes written
*/
int libp2p_secio_unencrypted_write(struct SessionContext* session, unsigned char* bytes, size_t data_length) {
int num_bytes = 0;
if (data_length > 0) { // only do this is if there is something to send
// first send the size
uint32_t size = htonl(data_length);
char* size_as_char = (char*)&size;
int left = 4;
int written = 0;
int written_this_time = 0;
do {
written_this_time = socket_write(*((int*)session->default_stream->socket_descriptor), &size_as_char[written], left, 0);
if (written_this_time < 0) {
written_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable
} else {
return 0;
}
}
left = left - written_this_time;
} while (left > 0);
// then send the actual data
left = data_length;
written = 0;
do {
written_this_time = socket_write(*((int*)session->default_stream->socket_descriptor), (char*)&bytes[written], left, 0);
if (written_this_time < 0) {
written_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable
} else {
return 0;
}
}
left = left - written_this_time;
written += written_this_time;
} while (left > 0);
num_bytes = written;
} // there was something to send
return num_bytes;
}
/***
* Read bytes from the incoming stream
* @param session the session information
* @param results where to put the bytes read
* @param results_size the size of the results
* @returns the number of bytes read
*/
int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size, int timeout_secs) {
uint32_t buffer_size;
if (session == NULL || session->insecure_stream == NULL || session->insecure_stream->socket_descriptor == NULL) {
libp2p_logger_error("secio", "Attempted unencrypted read on invalid session.\n");
return 0;
}
// first read the 4 byte integer
char* size = (char*)&buffer_size;
int left = 4;
int read = 0;
int read_this_time = 0;
do {
read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0, timeout_secs);
if (read_this_time <= 0) {
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable
libp2p_logger_debug("secio", "Attempted read, but got EAGAIN or EWOULDBLOCK. Code %d.\n", errno);
return 0;
} else {
// is this really an error?
if (errno != 0) {
libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno));
return 0;
}
else
libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: 0 bytes read, but errno shows no error. Trying again.\n");
}
} else {
left = left - read_this_time;
read += read_this_time;
}
} while (left > 0);
buffer_size = ntohl(buffer_size);
if (buffer_size == 0) {
libp2p_logger_error("secio", "unencrypted read buffer size is 0.\n");
return 0;
}
// now read the number of bytes we've found, minus the 4 that we just read
left = buffer_size;
read = 0;
read_this_time = 0;
*results = malloc(left);
if (*results == NULL) {
libp2p_logger_error("secio", "Unable to allocate memory for the incoming message. Size: %ulld", left);
return 0;
}
unsigned char* ptr = *results;
do {
read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0, timeout_secs);
if (read_this_time < 0) {
read_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable
} else {
libp2p_logger_error("secio", "read from socket returned %d.\n", errno);
return 0;
}
} else if (read_this_time == 0) {
// socket_read returned 0, which it shouldn't
libp2p_logger_error("secio", "socket_read returned 0 trying to read from %s.\n", session->remote_peer_id);
return 0;
}
left = left - read_this_time;
} while (left > 0);
*results_size = buffer_size;
return buffer_size;
}
/*** /***
* Send the protocol string to the remote stream * Send the protocol string to the remote stream
* @param session the context * @param ctx the context
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_send_protocol(struct SessionContext* session) { int libp2p_secio_send_protocol(struct SecioContext* ctx) {
char* protocol = "/secio/1.0.0\n"; char* protocol = "/secio/1.0.0\n";
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol; outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol); outgoing.data_size = strlen(protocol);
return session->default_stream->write(session, &outgoing); return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing);
} }
/*** /***
* Attempt to read the secio protocol as a reply from the remote * Attempt to read the secio protocol as a reply from the remote
* @param session the context * @param ctx the context
* @returns true(1) if we received what we think we should have, false(0) otherwise * @returns true(1) if we received what we think we should have, false(0) otherwise
*/ */
int libp2p_secio_receive_protocol(struct SessionContext* session) { int libp2p_secio_receive_protocol(struct SecioContext* ctx) {
char* protocol = "/secio/1.0.0\n"; char* protocol = "/secio/1.0.0\n";
int numSecs = 30; int numSecs = 30;
int retVal = 0; int retVal = 0;
struct StreamMessage* buffer = NULL; struct StreamMessage* buffer = NULL;
session->default_stream->read(session, &buffer, numSecs); ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &buffer, numSecs);
if (buffer == NULL) { if (buffer == NULL) {
libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n"); libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n");
} else { } else {
@ -755,19 +620,22 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) { int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) {
struct SessionContext* session = (struct SessionContext*) stream_context; struct SecioContext* ctx = (struct SecioContext*) stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream;
struct SessionContext* session_context = ctx->session_context;
// writer uses the local cipher and mac // writer uses the local cipher and mac
unsigned char* buffer = NULL; struct StreamMessage outgoing;
size_t buffer_size = 0; if (!libp2p_secio_encrypt(session_context, bytes->data, bytes->data_size, &outgoing.data, &outgoing.data_size)) {
if (!libp2p_secio_encrypt(session, bytes->data, bytes->data_size, &buffer, &buffer_size)) {
libp2p_logger_error("secio", "secio_encrypt returned false.\n"); libp2p_logger_error("secio", "secio_encrypt returned false.\n");
return 0; return 0;
} }
int retVal = libp2p_secio_unencrypted_write(session, buffer, buffer_size);
int retVal = parent_stream->write(parent_stream->stream_context, &outgoing);
if (!retVal) { if (!retVal) {
libp2p_logger_error("secio", "secio_unencrypted_write returned false\n"); libp2p_logger_error("secio", "secio_unencrypted_write returned false\n");
} }
free(buffer); free(outgoing.data);
return retVal; return retVal;
} }
@ -848,21 +716,20 @@ int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* in
*/ */
int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** bytes, int timeout_secs) { int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** bytes, int timeout_secs) {
int retVal = 0; int retVal = 0;
struct SessionContext* session = (struct SessionContext*)stream_context; struct SecioContext* ctx = (struct SecioContext*)stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream;
// reader uses the remote cipher and mac // reader uses the remote cipher and mac
// read the data // read the data
unsigned char* incoming = NULL; struct StreamMessage* msg = NULL;
size_t incoming_size = 0; if (!parent_stream->read(parent_stream->stream_context, &msg, timeout_secs)) {
if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size, timeout_secs) <= 0) {
libp2p_logger_error("secio", "Unencrypted_read returned false.\n"); libp2p_logger_error("secio", "Unencrypted_read returned false.\n");
goto exit; goto exit;
} }
retVal = libp2p_secio_decrypt(session, incoming, incoming_size, bytes); retVal = libp2p_secio_decrypt(ctx->session_context, msg->data, msg->data_size, bytes);
if (!retVal) if (!retVal)
libp2p_logger_error("secio", "Decrypting incoming stream returned false.\n"); libp2p_logger_error("secio", "Decrypting incoming stream returned false.\n");
exit: exit:
if (incoming != NULL) libp2p_stream_message_free(msg);
free(incoming);
return retVal; return retVal;
} }
@ -875,10 +742,11 @@ int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** byt
* @param peerstore the collection of peers * @param peerstore the collection of peers
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SessionContext* local_session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { int libp2p_secio_handshake(struct SecioContext* secio_context) {
int retVal = 0; int retVal = 0;
size_t results_size = 0, bytes_written = 0; size_t results_size = 0, bytes_written = 0;
struct StreamMessage* stream_message = NULL; struct StreamMessage* incoming = NULL;
struct StreamMessage outgoing; // used for outgoing messages
unsigned char* propose_in_bytes = NULL; // the remote protobuf unsigned char* propose_in_bytes = NULL; // the remote protobuf
size_t propose_in_size = 0; size_t propose_in_size = 0;
unsigned char* propose_out_bytes = NULL; // the local protobuf unsigned char* propose_out_bytes = NULL; // the local protobuf
@ -899,6 +767,10 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
struct PublicKey pub_key = {0}; struct PublicKey pub_key = {0};
struct Libp2pPeer* remote_peer = NULL; struct Libp2pPeer* remote_peer = NULL;
struct SessionContext* local_session = secio_context->session_context;
struct RsaPrivateKey* private_key = secio_context->private_key;
struct Peerstore* peerstore = secio_context->peer_store;
//TODO: make sure we're not talking to ourself //TODO: make sure we're not talking to ourself
// send the protocol id and the outgoing Propose struct // send the protocol id and the outgoing Propose struct
@ -949,7 +821,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
goto exit; goto exit;
// now send the protocol and Propose struct // now send the protocol and Propose struct
bytes_written = libp2p_secio_unencrypted_write(local_session, propose_out_bytes, propose_out_size); outgoing.data = propose_out_bytes;
outgoing.data_size = propose_out_size;
bytes_written = secio_context->stream->parent_stream->write(secio_context->stream->parent_stream->stream_context, &outgoing);
if (bytes_written != propose_out_size) { if (bytes_written != propose_out_size) {
libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written);
@ -958,7 +832,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
} }
// try to get the Propse struct from the remote peer // try to get the Propse struct from the remote peer
bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size, 10); bytes_written = secio_context->stream->parent_stream->read(secio_context->stream->parent_stream->stream_context, &incoming, 10);
if (bytes_written <= 0) { if (bytes_written <= 0) {
libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n");
goto exit; goto exit;
@ -966,10 +840,12 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
//libp2p_logger_debug("secio", "Received their propose struct.\n"); //libp2p_logger_debug("secio", "Received their propose struct.\n");
} }
if (!libp2p_secio_propose_protobuf_decode(propose_in_bytes, propose_in_size -1, &propose_in)) { if (!libp2p_secio_propose_protobuf_decode(incoming->data, incoming->data_size -1, &propose_in)) {
libp2p_logger_error("secio", "Unable to un-protobuf the remote's Propose struct\n"); libp2p_logger_error("secio", "Unable to un-protobuf the remote's Propose struct\n");
goto exit; goto exit;
} }
libp2p_stream_message_free(incoming);
incoming = NULL;
// get their nonce // get their nonce
if (propose_in->rand_size != 16) if (propose_in->rand_size != 16)
@ -1074,8 +950,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
libp2p_secio_exchange_protobuf_encode(exchange_out, exchange_out_protobuf, exchange_out_protobuf_size, &bytes_written); libp2p_secio_exchange_protobuf_encode(exchange_out, exchange_out_protobuf, exchange_out_protobuf_size, &bytes_written);
exchange_out_protobuf_size = bytes_written; exchange_out_protobuf_size = bytes_written;
//libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Writing exchange_out\n"); outgoing.data = exchange_out_protobuf;
bytes_written = libp2p_secio_unencrypted_write(local_session, exchange_out_protobuf, exchange_out_protobuf_size); outgoing.data_size = exchange_out_protobuf_size;
bytes_written = secio_context->stream->parent_stream->write(secio_context->stream->parent_stream, &outgoing);
if (exchange_out_protobuf_size != bytes_written) { if (exchange_out_protobuf_size != bytes_written) {
libp2p_logger_error("secio", "Unable to write exchange_out\n"); libp2p_logger_error("secio", "Unable to write exchange_out\n");
goto exit; goto exit;
@ -1088,7 +965,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
// receive Exchange packet // receive Exchange packet
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n");
bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size, 10); bytes_written = secio_context->stream->parent_stream->read(secio_context->stream->parent_stream->stream_context, &incoming, 10);
if (bytes_written == 0) { if (bytes_written == 0) {
libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_logger_error("secio", "unable to read exchange packet.\n");
libp2p_peer_handle_connection_error(remote_peer); libp2p_peer_handle_connection_error(remote_peer);
@ -1096,9 +973,9 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
} else { } else {
//libp2p_logger_debug("secio", "Read exchange packet.\n"); //libp2p_logger_debug("secio", "Read exchange packet.\n");
} }
libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); libp2p_secio_exchange_protobuf_decode(incoming->data, incoming->data_size, &exchange_in);
free(results); libp2p_stream_message_free(incoming);
results = NULL; incoming = NULL;
// end of receive Exchange packet // end of receive Exchange packet
// parse and verify // parse and verify
@ -1171,7 +1048,6 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
libp2p_secio_initialize_crypto(local_session); libp2p_secio_initialize_crypto(local_session);
// send their nonce to verify encryption works // send their nonce to verify encryption works
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)local_session->remote_nonce; outgoing.data = (uint8_t*)local_session->remote_nonce;
outgoing.data_size = 16; outgoing.data_size = 16;
if (libp2p_secio_encrypted_write(local_session, &outgoing) <= 0) { if (libp2p_secio_encrypted_write(local_session, &outgoing) <= 0) {
@ -1182,20 +1058,23 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
// receive our nonce to verify encryption works // receive our nonce to verify encryption works
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce\n"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce\n");
results = NULL; results = NULL;
int bytes_read = libp2p_secio_encrypted_read(local_session, &stream_message, 10); int bytes_read = libp2p_secio_encrypted_read(secio_context->stream->stream_context, &incoming, 10);
if (bytes_read <= 0 || stream_message == NULL) { if (bytes_read <= 0 || incoming == NULL) {
libp2p_logger_error("secio", "Encrypted read returned %d\n", bytes_read); libp2p_logger_error("secio", "Encrypted read returned %d\n", bytes_read);
goto exit; goto exit;
} }
if (stream_message->data_size != 16) { if (incoming->data_size != 16) {
libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", stream_message->data_size); libp2p_logger_error("secio", "Results_size should be 16 but was %d\n", incoming->data_size);
goto exit; goto exit;
} }
if (libp2p_secio_bytes_compare(stream_message->data, (unsigned char*)local_session->local_nonce, 16) != 0) { if (libp2p_secio_bytes_compare(incoming->data, (unsigned char*)local_session->local_nonce, 16) != 0) {
libp2p_logger_error("secio", "Bytes of nonce did not match\n"); libp2p_logger_error("secio", "Bytes of nonce did not match\n");
goto exit; goto exit;
} }
libp2p_stream_message_free(incoming);
incoming = NULL;
// set up the secure stream in the struct // set up the secure stream in the struct
local_session->secure_stream = local_session->insecure_stream; local_session->secure_stream = local_session->insecure_stream;
local_session->secure_stream->read = libp2p_secio_encrypted_read; local_session->secure_stream->read = libp2p_secio_encrypted_read;
@ -1231,7 +1110,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
libp2p_secio_propose_free(propose_out); libp2p_secio_propose_free(propose_out);
libp2p_secio_propose_free(propose_in); libp2p_secio_propose_free(propose_in);
libp2p_stream_message_free(stream_message); libp2p_stream_message_free(incoming);
if (retVal != 1) { if (retVal != 1) {
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n");

View file

@ -90,10 +90,6 @@ int test_dialer_dial_multistream() {
stream = libp2p_conn_dialer_get_stream(dialer, destination_address, "multistream"); stream = libp2p_conn_dialer_get_stream(dialer, destination_address, "multistream");
if (stream == NULL) if (stream == NULL)
goto exit; goto exit;
int socket_descriptor = *((int*)stream->socket_descriptor);
if ( socket_descriptor < 0 || socket_descriptor > 255) {
goto exit;
}
// now ping // now ping

View file

@ -36,7 +36,7 @@ int test_multistream_get_list() {
struct SessionContext session; struct SessionContext session;
session.insecure_stream = libp2p_net_multistream_connect("10.211.55.2", 4001); session.insecure_stream = libp2p_net_multistream_connect("10.211.55.2", 4001);
if (*((int*)session.insecure_stream->socket_descriptor) < 0) if (session.insecure_stream == NULL)
goto exit; goto exit;
// try to respond something, ls command // try to respond something, ls command

View file

@ -33,6 +33,7 @@ void print_stretched_key(struct StretchedKey* key) {
int test_secio_handshake() { int test_secio_handshake() {
/*
libp2p_logger_add_class("secio"); libp2p_logger_add_class("secio");
int retVal = 0; int retVal = 0;
@ -119,6 +120,7 @@ int test_secio_handshake() {
goto exit; goto exit;
} }
*/ */
/*
// a new way to do the above // a new way to do the above
if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) {
libp2p_logger_error("test_secio", "Unable to do handshake\n"); libp2p_logger_error("test_secio", "Unable to do handshake\n");
@ -145,7 +147,7 @@ int test_secio_handshake() {
print_stretched_key(secure_session.remote_stretched_key); print_stretched_key(secure_session.remote_stretched_key);
fprintf(stdout, "\n"); fprintf(stdout, "\n");
*/ */
/*
// now attempt to do something with it... try to negotiate a multistream // now attempt to do something with it... try to negotiate a multistream
if (libp2p_net_multistream_negotiate(secure_session) == 0) { if (libp2p_net_multistream_negotiate(secure_session) == 0) {
fprintf(stdout, "Unable to negotiate multistream\n"); fprintf(stdout, "Unable to negotiate multistream\n");
@ -215,6 +217,7 @@ int test_secio_handshake() {
if (secure_session.shared_key != NULL) if (secure_session.shared_key != NULL)
free(secure_session.shared_key); free(secure_session.shared_key);
*/ */
/*
if (private_key != NULL) if (private_key != NULL)
libp2p_crypto_private_key_free(private_key); libp2p_crypto_private_key_free(private_key);
if (decode_base64 != NULL) if (decode_base64 != NULL)
@ -222,6 +225,8 @@ int test_secio_handshake() {
if (rsa_private_key != NULL) if (rsa_private_key != NULL)
libp2p_crypto_rsa_rsa_private_key_free(rsa_private_key); libp2p_crypto_rsa_rsa_private_key_free(rsa_private_key);
return retVal; return retVal;
*/
return 0;
} }
int libp2p_secio_encrypt(const struct SessionContext* session, const unsigned char* incoming, size_t incoming_size, unsigned char** outgoing, size_t* outgoing_size); int libp2p_secio_encrypt(const struct SessionContext* session, const unsigned char* incoming, size_t incoming_size, unsigned char** outgoing, size_t* outgoing_size);
@ -380,6 +385,7 @@ int test_secio_encrypt_like_go() {
*/ */
int test_secio_handshake_go() { int test_secio_handshake_go() {
/*
libp2p_logger_add_class("secio"); libp2p_logger_add_class("secio");
int retVal = 0; int retVal = 0;
@ -460,6 +466,7 @@ int test_secio_handshake_go() {
goto exit; goto exit;
} }
*/ */
/*
// a new way to do the above // a new way to do the above
if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) { if (!libp2p_secio_initiate_handshake(secure_session, rsa_private_key, peerstore)) {
libp2p_logger_error("test_secio", "Unable to do handshake.\n"); libp2p_logger_error("test_secio", "Unable to do handshake.\n");
@ -486,7 +493,7 @@ int test_secio_handshake_go() {
print_stretched_key(secure_session.remote_stretched_key); print_stretched_key(secure_session.remote_stretched_key);
fprintf(stdout, "\n"); fprintf(stdout, "\n");
*/ */
/*
// now attempt to do something with it... try to negotiate a multistream // now attempt to do something with it... try to negotiate a multistream
if (libp2p_net_multistream_negotiate(secure_session) == 0) { if (libp2p_net_multistream_negotiate(secure_session) == 0) {
fprintf(stdout, "Unable to negotiate multistream\n"); fprintf(stdout, "Unable to negotiate multistream\n");
@ -526,4 +533,6 @@ int test_secio_handshake_go() {
if (peerstore != NULL) if (peerstore != NULL)
libp2p_peerstore_free(peerstore); libp2p_peerstore_free(peerstore);
return retVal; return retVal;
*/
return 0;
} }

View file

@ -13,16 +13,16 @@
* @param incoming_size the size of the incoming data buffer * @param incoming_size the size of the incoming data buffer
* @returns true(1) if it can handle this message, false(0) if not * @returns true(1) if it can handle this message, false(0) if not
*/ */
int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) { int yamux_can_handle(const struct StreamMessage* msg) {
char *protocol = "/yamux/1.0.0\n"; char *protocol = "/yamux/1.0.0\n";
int protocol_size = strlen(protocol); int protocol_size = strlen(protocol);
// is there a varint in front? // is there a varint in front?
size_t num_bytes = 0; size_t num_bytes = 0;
if (incoming[0] != protocol[0] && incoming[1] != protocol[1]) { if (msg->data[0] != protocol[0] && msg->data[1] != protocol[1]) {
varint_decode(incoming, incoming_size, &num_bytes); varint_decode(msg->data, msg->data_size, &num_bytes);
} }
if (incoming_size >= protocol_size - num_bytes) { if (msg->data_size >= protocol_size - num_bytes) {
if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0) if (strncmp(protocol, (char*) &msg->data[num_bytes], protocol_size) == 0)
return 1; return 1;
} }
return 0; return 0;
@ -31,12 +31,12 @@ int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) {
/** /**
* the yamux stream received some bytes. Process them * the yamux stream received some bytes. Process them
* @param stream the stream that the data came in on * @param stream the stream that the data came in on
* @param incoming_size the size of the stream buffer * @param msg the message
* @param incoming the stream buffer * @param incoming the stream buffer
*/ */
void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8_t* incoming) { void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
struct Libp2pVector* handlers = stream->userdata; struct Libp2pVector* handlers = stream->userdata;
int retVal = libp2p_protocol_marshal(incoming, incoming_size, stream->session->session_context, handlers); int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers);
if (retVal == -1) { if (retVal == -1) {
// TODO handle error condition // TODO handle error condition
libp2p_logger_error("yamux", "Marshalling returned error.\n"); libp2p_logger_error("yamux", "Marshalling returned error.\n");
@ -94,21 +94,21 @@ int yamux_receive_protocol(struct SessionContext* context) {
/*** /***
* Handles the message * Handles the message
* @param incoming the incoming data buffer * @param msg the incoming message
* @param incoming_size the size of the incoming data buffer * @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection * @param session_context the information about the incoming connection
* @param protocol_context the protocol-dependent context * @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int yamux_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
// they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream
struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context); struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(incoming_size); uint8_t* buf = (uint8_t*) malloc(msg->data_size);
if (buf == NULL) if (buf == NULL)
return -1; return -1;
memcpy(buf, incoming, incoming_size); memcpy(buf, msg->data, msg->data_size);
for(;;) { for(;;) {
int retVal = yamux_decode(yamux, incoming, incoming_size); int retVal = yamux_decode(yamux, msg->data, msg->data_size);
free(buf); free(buf);
buf = NULL; buf = NULL;
if (!retVal) if (!retVal)