Major changes for implementation of yamux protocol

This commit changes the Stream interface, so as to allow the yamux
protocol to have channels. It is necessary, but touches many areas. The
codebase is better for it.
This commit is contained in:
John Jones 2017-11-08 10:51:43 -05:00
parent f4860d3ed4
commit f2e5af4058
21 changed files with 438 additions and 319 deletions

View file

@ -44,7 +44,7 @@ struct SessionContext* libp2p_session_context_new() {
int libp2p_session_context_free(struct SessionContext* context) {
if (context != NULL) {
if (context->default_stream != NULL)
context->default_stream->close(context);
context->default_stream->close(context->default_stream);
context->default_stream = NULL;
context->insecure_stream = NULL;
context->secure_stream = NULL;

View file

@ -24,7 +24,7 @@ struct Stream* libp2p_conn_tcp_dial(const struct TransportDialer* transport_dial
if (!multiaddress_get_ip_address(addr, &ip))
return NULL;
struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port);
struct Stream* stream = libp2p_net_connection_new(socket_descriptor, ip, port, NULL);
free(ip);
return stream;

View file

@ -35,12 +35,12 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) {
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_identify_send_protocol(struct IdentifyContext *context) {
int libp2p_identify_send_protocol(struct Stream *stream) {
char *protocol = "/ipfs/id/1.0.0\n";
struct StreamMessage msg;
msg.data = (uint8_t*) protocol;
msg.data_size = strlen(protocol);
if (!context->parent_stream->write(context->parent_stream->stream_context, &msg)) {
if (!stream->write(stream->stream_context, &msg)) {
libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n");
return 0;
}
@ -53,10 +53,10 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) {
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_identify_receive_protocol(struct IdentifyContext* context) {
int libp2p_identify_receive_protocol(struct Stream* stream) {
const char *protocol = "/ipfs/id/1.0.0\n";
struct StreamMessage* results = NULL;
if (!context->parent_stream->read(context->parent_stream->stream_context, &results, 30)) {
if (!stream->read(stream->stream_context, &results, 30)) {
libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n");
return 0;
}
@ -80,15 +80,14 @@ int libp2p_identify_receive_protocol(struct IdentifyContext* context) {
* @param protocol_context the identify protocol context
* @returns <0 on error, 0 if loop should not continue, >0 on success
*/
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) {
if (protocol_context == NULL)
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
struct Stream* new_stream = libp2p_identify_stream_new(stream);
if (new_stream == NULL)
return -1;
// attempt to create a new Identify connection with them.
// send the protocol id back, and set up the channel
struct IdentifyContext* ctx = (struct IdentifyContext*)protocol_context;
libp2p_identify_send_protocol(ctx);
//TODO: now add this "channel"
return 1;
return stream->handle_upgrade(stream, new_stream);
}
/**
@ -97,13 +96,16 @@ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Sessi
* @returns true(1)
*/
int libp2p_identify_shutdown(void* protocol_context) {
if (protocol_context == NULL)
return 0;
free(protocol_context);
return 1;
}
struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = handlers;
handler->context = handler;
handler->CanHandle = libp2p_identify_can_handle;
handler->HandleMessage = libp2p_identify_handle_message;
handler->Shutdown = libp2p_identify_shutdown;
@ -111,13 +113,14 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp
return handler;
}
int libp2p_identify_close(void* stream_context) {
if (stream_context == NULL)
int libp2p_identify_close(struct Stream* stream) {
if (stream == NULL)
return 0;
struct IdentifyContext* ctx = (struct IdentifyContext*)stream_context;
ctx->parent_stream->close(ctx->parent_stream->stream_context);
free(ctx->stream);
free(ctx);
if (stream->parent_stream != NULL)
stream->parent_stream->close(stream->parent_stream);
if (stream->stream_context != NULL)
free(stream->stream_context);
libp2p_stream_free(stream);
return 1;
}
@ -146,7 +149,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
ctx->stream = out;
out->stream_context = ctx;
out->close = libp2p_identify_close;
if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) {
if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) {
libp2p_stream_free(out);
free(ctx);
return NULL;

View file

@ -29,9 +29,8 @@ struct IdentifyContext {
};
int libp2p_identify_can_handle(const struct StreamMessage* msg);
int libp2p_identify_send_protocol(struct IdentifyContext *context);
int libp2p_identify_receive_protocol(struct IdentifyContext* context);
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context);
int libp2p_identify_send_protocol(struct Stream* stream);
int libp2p_identify_receive_protocol(struct Stream* stream);
int libp2p_identify_shutdown(void* protocol_context);
struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers);

View file

@ -1,6 +1,7 @@
#pragma once
#include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
/***
* Create a new stream based on a network connection
@ -9,7 +10,24 @@
* @param port the port of the connection
* @returns a Stream
*/
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port);
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context);
/**
* Attempt to upgrade the parent_stream to use the new stream by default
* @param parent_stream the parent stream
* @param new_stream the new stream
* @returns true(1) on success, false(0) if not
*/
int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream);
/**
* Given a stream, find the SessionContext
* NOTE: This is done by navigating to the root context, which should
* be a ConnectionContext, then grabbing the SessionContext there.
* @param stream the stream to use
* @returns the SessionContext for this stream
*/
struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream);
/***
* These are put here to allow implementations of struct Stream

View file

@ -22,11 +22,11 @@ struct Libp2pProtocolHandler {
* Handles the message
* @param incoming the incoming data buffer
* @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection
* @param stream the incoming stream
* @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int (*HandleMessage)(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context);
int (*HandleMessage)(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context);
/**
* Shutting down. Clean up any memory allocations
@ -45,8 +45,15 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new();
/***
* Handle an incoming message
* @param message the incoming message
* @param session the SessionContext of the incoming connection
* @param stream the incoming connection
* @param handlers a Vector of protocol handlers
* @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success
* @returns -1 on error, 0 on protocol upgrade, 1 on success
*/
int libp2p_protocol_marshal(struct StreamMessage* message, struct SessionContext* context, struct Libp2pVector* protocol_handlers);
int libp2p_protocol_marshal(struct StreamMessage* message, struct Stream* stream, struct Libp2pVector* protocol_handlers);
/***
* Shut down all protocol handlers and free vector
* @param handlers vector of Libp2pProtocolHandler
* @returns true(1)
*/
int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers);

View file

@ -46,6 +46,8 @@ struct Stream {
struct MultiAddress* address; // helps identify who is on the other end
pthread_mutex_t* socket_mutex; // only 1 transmission at a time
struct Stream* parent_stream; // what stream wraps this stream
int channel; // the channel (for multiplexing streams)
/**
* A generic place to store implementation-specific context items
*/
@ -82,10 +84,10 @@ struct Stream {
* Closes a stream
*
* NOTE: This is also responsible for deallocating the Stream struct
* @param stream the stream context
* @param stream the stream
* @returns true(1) on success, otherwise false(0)
*/
int (*close)(void* stream_context);
int (*close)(struct Stream* stream);
/***
* Checks to see if something is waiting on the stream
@ -94,6 +96,13 @@ struct Stream {
* @returns true(1) if something is waiting, false(0) if not, -1 on error
*/
int (*peek)(void* stream_context);
/**
* Handle a stream upgrade
* @param stream the current stream
* @param new_stream the newly created stream
*/
int (*handle_upgrade)(struct Stream* stream, struct Stream* new_stream);
};
struct Stream* libp2p_stream_new();

View file

@ -10,7 +10,16 @@
* This is where kademlia and dht talk to the outside world
*/
struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store);
struct DhtContext {
struct Peerstore* peer_store;
struct ProviderStore* provider_store;
struct Datastore* datastore;
struct Filestore* filestore;
};
struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store,
struct Datastore* datastore, struct Filestore* filestore);
/**
* Take existing stream and upgrade to the Kademlia / DHT protocol/codec
@ -21,20 +30,20 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context);
/**
* Handle a client requesting an upgrade to the DHT protocol
* @param context the context
* @param stream the stream
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handshake(struct SessionContext* context);
int libp2p_routing_dht_handshake(struct Stream* stream);
/***
* Handle the incoming message. Handshake should have already
* been done. We should expect that the next read contains
* a protobuf'd kademlia message.
* @param session the context
* @param peerstore a list of peers
* @param stream the incoming stream
* @param protocol_context the protocol context
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore);
int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context);
/***
* Send a kademlia message

View file

@ -42,16 +42,17 @@ int libp2p_secio_initiate_handshake(struct SecioContext* ctx);
/***
* Send the protocol string to the remote stream
* @param session the context
* @param stream stream
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_secio_send_protocol(struct SecioContext* session);
int libp2p_secio_send_protocol(struct Stream* stream);
/***
* Attempt to read the secio protocol as a reply from the remote
* @param session the context
* @returns true(1) if we received what we think we should have, false(0) otherwise
*/
int libp2p_secio_receive_protocol(struct SecioContext* session);
int libp2p_secio_receive_protocol(struct Stream* stream);
/***
* performs initial communication over an insecure channel to share

View file

@ -26,7 +26,10 @@ struct YamuxContext {
struct YamuxChannelContext {
char type;
struct YamuxContext* yamux_context;
// this stream
struct Stream* stream;
// the child protocol's stream
struct Stream* child_stream;
// the channel number
int channel;
// the window size for this channel
@ -40,14 +43,14 @@ struct YamuxChannelContext {
/**
* Build a handler that can handle the yamux protocol
*/
struct Libp2pProtocolHandler* yamux_build_protocol_handler();
struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler();
/***
* Send the yamux protocol out the default stream
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @param stream the stream
* @returns true(1) on success, false(0) otherwise
*/
int yamux_send_protocol(struct YamuxContext* context);
int yamux_send_protocol(struct Stream* stream);
/***
* Check to see if the reply is the yamux protocol header we expect

View file

@ -12,6 +12,7 @@
#include "libp2p/net/stream.h"
#include "libp2p/net/p2pnet.h"
#include "libp2p/utils/logger.h"
#include "libp2p/conn/session.h"
#include "multiaddr/multiaddr.h"
/**
@ -19,10 +20,10 @@
* @param stream_context the ConnectionContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_net_connection_close(void* stream_context) {
if (stream_context == NULL)
int libp2p_net_connection_close(struct Stream* stream) {
if (stream->stream_context == NULL)
return 0;
struct ConnectionContext* ctx = (struct ConnectionContext*)stream_context;
struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context;
if (ctx != NULL) {
if (ctx->socket_descriptor > 0) {
close(ctx->socket_descriptor);
@ -113,7 +114,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg)
* @param port the port of the connection
* @returns a Stream
*/
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) {
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context) {
struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream));
if (out != NULL) {
out->close = libp2p_net_connection_close;
@ -135,6 +136,7 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) {
if (ctx != NULL) {
out->stream_context = ctx;
ctx->socket_descriptor = fd;
ctx->session_context = session_context;
if (!socket_connect4_with_timeout(ctx->socket_descriptor, hostname_to_ip(ip), port, 10) == 0) {
// unable to connect
libp2p_stream_free(out);
@ -144,3 +146,39 @@ struct Stream* libp2p_net_connection_new(int fd, char* ip, int port) {
}
return out;
}
/**
* Attempt to upgrade the parent_stream to use the new stream by default
* @param parent_stream the parent stream
* @param new_stream the new stream
* @returns true(1) on success, false(0) if not
*/
int libp2p_net_connection_upgrade(struct Stream* parent_stream, struct Stream* new_stream) {
if (parent_stream == NULL)
return 0;
struct Stream* current_stream = parent_stream;
while (current_stream->parent_stream != NULL)
current_stream = current_stream->parent_stream;
// current_stream is now the root, and should have a ConnectionContext
struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context;
ctx->session_context->default_stream = new_stream;
return 1;
}
/**
* Given a stream, find the SessionContext
* NOTE: This is done by navigating to the root context, which should
* be a ConnectionContext, then grabbing the SessionContext there.
* @param stream the stream to use
* @returns the SessionContext for this stream
*/
struct SessionContext* libp2p_net_connection_get_session_context(struct Stream* stream) {
if (stream == NULL) {
return NULL;
}
struct Stream* current_stream = stream;
while (current_stream->parent_stream != NULL)
current_stream = current_stream->parent_stream;
struct ConnectionContext* ctx = (struct ConnectionContext*)current_stream->stream_context;
return ctx->session_context;
}

View file

@ -9,6 +9,7 @@
#include "libp2p/os/utils.h"
#include "libp2p/net/p2pnet.h"
#include "libp2p/net/connectionstream.h"
#include "libp2p/record/message.h"
#include "libp2p/secio/secio.h"
#include "varint.h"
@ -78,45 +79,6 @@ int libp2p_net_multistream_receive_protocol(struct SessionContext* context) {
return 1;
}
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) {
// try sending the protocol back
//if (!libp2p_net_multistream_send_protocol(context))
// return -1;
struct MultistreamContext* multistream_context = (struct MultistreamContext*) protocol_context;
// try to read from the network
struct StreamMessage* results = NULL;
int retVal = 0;
int max_retries = 10;
int numRetries = 0;
// handle the call
for(;;) {
// try to read for 5 seconds
if (context->default_stream->read(context, &results, 5)) {
// we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (libp2p_net_multistream_can_handle(results))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, context, multistream_context->handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
}
}
return retVal;
}
int libp2p_net_multistream_shutdown(void* protocol_context) {
struct MultistreamContext* context = (struct MultistreamContext*) protocol_context;
if (context != NULL) {
@ -125,37 +87,13 @@ int libp2p_net_multistream_shutdown(void* protocol_context) {
return 1;
}
/***
* The handler to handle calls to the protocol
* @param stream_context the context
* @returns the protocol handler
*/
struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) {
// build the context
struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (context == NULL)
return NULL;
context->handlers = (struct Libp2pVector*) handler_vector;
// build the handler
struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = context;
handler->CanHandle = libp2p_net_multistream_can_handle;
handler->HandleMessage = libp2p_net_multistream_handle_message;
handler->Shutdown = libp2p_net_multistream_shutdown;
}
return handler;
}
/**
* Close the connection and free memory
* @param ctx the context
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) {
int retVal = ctx->stream->close(ctx);
int retVal = ctx->stream->close(ctx->stream);
// regardless of retVal, free the context
// TODO: Evaluate if this is the correct way to do it:
free(ctx);
@ -168,11 +106,11 @@ int libp2p_net_multistream_context_free(struct MultistreamContext* ctx) {
* @param stream_context a SessionContext
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_net_multistream_close(void* stream_context) {
if (stream_context == NULL) {
int libp2p_net_multistream_close(struct Stream* stream) {
if (stream->stream_context == NULL) {
return 0;
}
struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream_context;
struct MultistreamContext* multistream_context = (struct MultistreamContext*)stream->stream_context;
return libp2p_net_multistream_context_free(multistream_context);
}
@ -458,3 +396,45 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) {
return out;
}
/***
* The remote is attempting to negotiate the multistream protocol
* @param msg incoming message
* @param stream the incoming stream
* @param protocol_context the context for the Multistream protocol (not stream specific)
* @returns <0 on error, 0 for the caller to stop handling this, 1 for success
*/
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// attempt negotiations
struct Stream* new_stream = libp2p_net_multistream_stream_new(stream);
if (new_stream != NULL) {
// upgrade
return stream->handle_upgrade(stream, new_stream);
}
return -1;
}
/***
* The handler to handle calls to the protocol
* @param stream_context the context
* @returns the protocol handler
*/
struct Libp2pProtocolHandler* libp2p_net_multistream_build_protocol_handler(void* handler_vector) {
// build the context
struct MultistreamContext* context = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (context == NULL)
return NULL;
context->handlers = (struct Libp2pVector*) handler_vector;
// build the handler
struct Libp2pProtocolHandler *handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = context;
handler->CanHandle = libp2p_net_multistream_can_handle;
handler->HandleMessage = libp2p_net_multistream_handle_message;
handler->Shutdown = libp2p_net_multistream_shutdown;
}
return handler;
}

View file

@ -46,7 +46,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() {
* @param handlers a Vector of protocol handlers
* @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success
*/
int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* session, struct Libp2pVector* handlers) {
int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, struct Libp2pVector* handlers) {
const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers);
if (handler == NULL) {
@ -60,9 +60,22 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct SessionContext* se
break;
}
}
libp2p_logger_error("protocol", "Session [%s]: Unable to find handler for %s.\n", session->remote_peer_id, str);
return -1;
}
return handler->HandleMessage(msg, session, handler->context);
return handler->HandleMessage(msg, stream, handler->context);
}
/***
* Shut down all protocol handlers and free vector
* @param handlers vector of Libp2pProtocolHandler
* @returns true(1)
*/
int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers) {
for(int i = 0; i < handlers->total; i++) {
struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*)libp2p_utils_vector_get(handlers, i);
handler->Shutdown(handler->context);
}
libp2p_utils_vector_free(handlers);
return 1;
}

View file

@ -2,6 +2,11 @@
#include "multiaddr/multiaddr.h"
#include "libp2p/net/stream.h"
#include "libp2p/net/connectionstream.h"
int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) {
return libp2p_net_connection_upgrade(parent_stream, new_stream);
}
struct Stream* libp2p_stream_new() {
struct Stream* stream = (struct Stream*) malloc(sizeof(struct Stream));
@ -15,6 +20,8 @@ struct Stream* libp2p_stream_new() {
stream->socket_mutex = NULL;
stream->stream_context = NULL;
stream->write = NULL;
stream->handle_upgrade = libp2p_stream_default_handle_upgrade;
stream->channel = -1;
}
return stream;
}

View file

@ -15,11 +15,6 @@
* This is where kademlia and dht talk to the outside world
*/
struct DhtContext {
struct Peerstore* peer_store;
struct ProviderStore* provider_store;
};
int libp2p_routing_dht_can_handle(const struct StreamMessage* msg) {
if (msg->data_size < 8)
return 0;
@ -34,20 +29,22 @@ int libp2p_routing_dht_shutdown(void* context) {
return 1;
}
int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct SessionContext* session_context, void* context) {
libp2p_logger_debug("dht_protocol", "Handling incoming dht routing request from peer %s.\n", session_context->remote_peer_id);
struct DhtContext* ctx = (struct DhtContext*)context;
if (!libp2p_routing_dht_handshake(session_context))
int libp2p_routing_dht_handle_msg(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
struct DhtContext* ctx = (struct DhtContext*)protocol_context;
if (!libp2p_routing_dht_handshake(stream))
return -1;
return (libp2p_routing_dht_handle_message(session_context, ctx->peer_store, ctx->provider_store) == 0) ? -1 : 1;
return (libp2p_routing_dht_handle_message(stream, ctx) == 0) ? -1 : 1;
}
struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store) {
struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct Peerstore* peer_store, struct ProviderStore* provider_store,
struct Datastore* datastore, struct Filestore* filestore) {
struct Libp2pProtocolHandler* handler = (struct Libp2pProtocolHandler*) malloc(sizeof(struct Libp2pProtocolHandler));
if (handler != NULL) {
struct DhtContext* ctx = (struct DhtContext*) malloc(sizeof(struct DhtContext));
ctx->peer_store = peer_store;
ctx->provider_store = provider_store;
ctx->datastore = datastore;
ctx->filestore = filestore;
handler->context = ctx;
handler->CanHandle = libp2p_routing_dht_can_handle;
handler->HandleMessage = libp2p_routing_dht_handle_msg;
@ -111,15 +108,15 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) {
/**
* Handle a client requesting an upgrade to the DHT protocol
* @param context the context
* @param stream the stream to the remote
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handshake(struct SessionContext* context) {
int libp2p_routing_dht_handshake(struct Stream* stream) {
char* protocol = "/ipfs/kad/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*) protocol;
outgoing.data_size = strlen(protocol);
return context->default_stream->write(context, &outgoing);
return stream->write(stream->stream_context, &outgoing);
}
/**
@ -136,14 +133,15 @@ int libp2p_routing_dht_handle_ping(struct KademliaMessage* message, unsigned cha
/**
* See if we have information as to who can provide this item
* @param session the context
* @param stream the incoming stream
* @param message the message from the caller, contains a key
* @param peerstore the list of peers
* @param providerstore the list of peers that can provide things
* @param protocol_context the context
* @param results where to put the results
* @param results_size the size of the results
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct KademliaMessage* message, struct Peerstore* peerstore,
struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) {
int libp2p_routing_dht_handle_get_providers(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* protocol_context,
unsigned char** results, size_t* results_size) {
unsigned char* peer_id = NULL;
int peer_id_size = 0;
@ -152,16 +150,16 @@ int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, stru
// Can I provide it locally?
struct DatastoreRecord* datastore_record = NULL;
if (session->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, session->datastore)) {
if (protocol_context->datastore->datastore_get((unsigned char*)message->key, message->key_size, &datastore_record, protocol_context->datastore)) {
// we can provide this hash from our datastore
libp2p_datastore_record_free(datastore_record);
libp2p_logger_debug("dht_protocol", "I can provide myself as a provider for this key.\n");
message->provider_peer_head = libp2p_utils_linked_list_new();
message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(peerstore));
} else if (libp2p_providerstore_get(providerstore, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) {
message->provider_peer_head->item = libp2p_peer_copy(libp2p_peerstore_get_local_peer(protocol_context->peer_store));
} else if (libp2p_providerstore_get(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, &peer_id, &peer_id_size)) {
// Can I provide it because someone announced it earlier?
// we have a peer id, convert it to a peer object
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, peer_id, peer_id_size);
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, peer_id, peer_id_size);
if (peer != NULL) {
libp2p_logger_debug("dht_protocol", "I can provide a provider for this key, because %s says he has it.\n", libp2p_peer_id_to_string(peer));
// add it to the message
@ -233,16 +231,15 @@ struct MultiAddress* libp2p_routing_dht_find_peer_ip_multiaddress(struct Libp2pL
/***
* Remote peer has announced that he can provide a key
* @param session session context
* @param stream the incoming stream
* @param message the message
* @param peerstore the peerstore
* @param providerstore the providerstore
* @param protocol_context the context
* @param result_buffer where to put the result
* @param result_buffer_size the size of the result buffer
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) {
int libp2p_routing_dht_handle_add_provider(struct Stream* stream, struct KademliaMessage* message,
struct DhtContext* protocol_context, unsigned char** result_buffer, size_t* result_buffer_size) {
int retVal = 0;
struct Libp2pPeer *peer = NULL;
@ -294,10 +291,10 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
*/
// now add the peer to the peerstore
libp2p_logger_debug("dht_protocol", "About to add peer %s to peerstore\n", peer_ma->string);
if (!libp2p_peerstore_add_peer(peerstore, peer))
if (!libp2p_peerstore_add_peer(protocol_context->peer_store, peer))
goto exit;
libp2p_logger_debug("dht_protocol", "About to add key to providerstore\n");
if (!libp2p_providerstore_add(providerstore, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size))
if (!libp2p_providerstore_add(protocol_context->provider_store, (unsigned char*)message->key, message->key_size, (unsigned char*)peer->id, peer->id_size))
goto exit;
}
@ -332,11 +329,11 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
* @param result_buffer_size the size of the results
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
int libp2p_routing_dht_handle_get_value(struct Stream* stream, struct KademliaMessage* message, struct DhtContext* dht_context,
unsigned char** result_buffer, size_t *result_buffer_size) {
struct Datastore* datastore = session->datastore;
struct Filestore* filestore = session->filestore;
struct Datastore* datastore = dht_context->datastore;
struct Filestore* filestore = dht_context->filestore;
size_t data_size = 0;
unsigned char* data = NULL;
@ -374,14 +371,15 @@ int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct K
/**
* Put something in the dht datastore
* @param session the session context
* @param stream the incoming stream
* @param message the message
* @param peerstore the peerstore
* @param providerstore the providerstore
* @param datastore the datastore
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore) {
int libp2p_routing_dht_handle_put_value(struct Stream* stream, struct KademliaMessage* message,
struct DhtContext* protocol_context) {
if (message->record == NULL)
return 0;
@ -407,7 +405,7 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K
}
memcpy(record->value, message->record->value, record->value_size);
int retVal = session->datastore->datastore_put(record, session->datastore);
int retVal = protocol_context->datastore->datastore_put(record, protocol_context->datastore);
libp2p_datastore_record_free(record);
return retVal;
}
@ -422,10 +420,10 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct K
* @param result_buffer_size the size of the results
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
int libp2p_routing_dht_handle_find_node(struct Stream* stream, struct KademliaMessage* message,
struct DhtContext* protocol_context, unsigned char** result_buffer, size_t *result_buffer_size) {
// look through peer store
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)message->key, message->key_size);
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(protocol_context->peer_store, (unsigned char*)message->key, message->key_size);
if (peer != NULL) {
message->provider_peer_head = libp2p_utils_linked_list_new();
message->provider_peer_head->item = libp2p_peer_copy(peer);
@ -445,7 +443,7 @@ int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct K
* @param peerstore a list of peers
* @returns true(1) on success, otherwise false(0)
*/
int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore) {
int libp2p_routing_dht_handle_message(struct Stream* stream, struct DhtContext* protocol_context) {
unsigned char *result_buffer = NULL;
struct StreamMessage* buffer = NULL;
size_t result_buffer_size = 0;
@ -453,7 +451,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
struct KademliaMessage* message = NULL;
// read from stream
if (!session->default_stream->read(session, &buffer, 5))
if (!stream->read(stream->stream_context, &buffer, 5))
goto exit;
// unprotobuf
if (!libp2p_message_protobuf_decode(buffer->data, buffer->data_size, &message))
@ -462,19 +460,19 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
// handle message
switch(message->message_type) {
case(MESSAGE_TYPE_PUT_VALUE): // store a value in local storage
libp2p_routing_dht_handle_put_value(session, message, peerstore, providerstore);
libp2p_routing_dht_handle_put_value(stream, message, protocol_context);
break;
case(MESSAGE_TYPE_GET_VALUE): // get a value from local storage
libp2p_routing_dht_handle_get_value(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
libp2p_routing_dht_handle_get_value(stream, message, protocol_context, &result_buffer, &result_buffer_size);
break;
case(MESSAGE_TYPE_ADD_PROVIDER): // client wants us to know he can provide something
libp2p_routing_dht_handle_add_provider(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
libp2p_routing_dht_handle_add_provider(stream, message, protocol_context, &result_buffer, &result_buffer_size);
break;
case(MESSAGE_TYPE_GET_PROVIDERS): // see if we can help, and send closer peers
libp2p_routing_dht_handle_get_providers(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
libp2p_routing_dht_handle_get_providers(stream, message, protocol_context, &result_buffer, &result_buffer_size);
break;
case(MESSAGE_TYPE_FIND_NODE): // find peers
libp2p_routing_dht_handle_find_node(session, message, peerstore, providerstore, &result_buffer, &result_buffer_size);
libp2p_routing_dht_handle_find_node(stream, message, protocol_context, &result_buffer, &result_buffer_size);
break;
case(MESSAGE_TYPE_PING):
libp2p_routing_dht_handle_ping(message, &result_buffer, &result_buffer_size);
@ -486,7 +484,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
struct StreamMessage outgoing;
outgoing.data = result_buffer;
outgoing.data_size = result_buffer_size;
if (!session->default_stream->write(session, &outgoing))
if (!stream->write(stream->stream_context, &outgoing))
goto exit;
} else {
libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type);

View file

@ -53,11 +53,11 @@ int libp2p_secio_can_handle(const struct StreamMessage* msg) {
* @param protocol_context a SecioContext that contains the needed information
* @returns <0 on error, 0 if okay (does not allow daemon to continue looping)
*/
int libp2p_secio_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
libp2p_logger_debug("secio", "Handling incoming secio message.\n");
struct SecioContext* ctx = (struct SecioContext*)protocol_context;
// send them the protocol
if (!libp2p_secio_send_protocol(ctx))
if (!libp2p_secio_send_protocol(stream))
return -1;
int retVal = libp2p_secio_handshake(ctx);
if (retVal)
@ -518,12 +518,12 @@ int libp2p_secio_make_mac_and_cipher(struct SessionContext* session, struct Stre
* @param ctx the context
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_secio_send_protocol(struct SecioContext* ctx) {
int libp2p_secio_send_protocol(struct Stream* stream) {
char* protocol = "/secio/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing);
return stream->write(stream->stream_context, &outgoing);
}
/***
@ -531,12 +531,12 @@ int libp2p_secio_send_protocol(struct SecioContext* ctx) {
* @param ctx the context
* @returns true(1) if we received what we think we should have, false(0) otherwise
*/
int libp2p_secio_receive_protocol(struct SecioContext* ctx) {
int libp2p_secio_receive_protocol(struct Stream* stream) {
char* protocol = "/secio/1.0.0\n";
int numSecs = 30;
int retVal = 0;
struct StreamMessage* buffer = NULL;
ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &buffer, numSecs);
stream->read(stream->stream_context, &buffer, numSecs);
if (buffer == NULL) {
libp2p_logger_error("secio", "Expected the secio protocol header, but received NULL.\n");
} else {
@ -1335,6 +1335,12 @@ int libp2p_secio_read_raw(void* stream_context, uint8_t* buffer, int buffer_size
return max_to_read;
}
int libp2p_secio_close(struct Stream* stream) {
if (stream != NULL && stream->stream_context != NULL)
free(stream->stream_context);
return 1;
}
/***
* Initiates a secio handshake. Use this method when you want to initiate a secio
* session. This should not be used to respond to incoming secio requests
@ -1361,13 +1367,13 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp
ctx->peer_store = peerstore;
ctx->private_key = rsa_private_key;
new_stream->parent_stream = parent_stream;
new_stream->close = libp2p_secio_shutdown;
new_stream->close = libp2p_secio_close;
new_stream->peek = libp2p_secio_peek;
new_stream->read = libp2p_secio_encrypted_read;
new_stream->read_raw = libp2p_secio_read_raw;
new_stream->write = libp2p_secio_encrypted_write;
if (!libp2p_secio_send_protocol(ctx)
|| !libp2p_secio_receive_protocol(ctx)
if (!libp2p_secio_send_protocol(parent_stream)
|| !libp2p_secio_receive_protocol(parent_stream)
|| !libp2p_secio_handshake(ctx)) {
libp2p_stream_free(new_stream);
new_stream = NULL;

View file

@ -3,17 +3,13 @@
#include <unistd.h>
#include "libp2p/net/stream.h"
struct MockContext {
struct Stream* stream;
};
void mock_stream_free(struct Stream* stream);
int mock_stream_close(void* context) {
if (context == NULL)
return 1;
struct MockContext* ctx = (struct MockContext*)context;
mock_stream_free(ctx->stream);
int mock_stream_close(struct Stream* stream) {
if (stream == NULL)
return 0;
struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context;
mock_stream_free(stream);
return 1;
}
@ -41,8 +37,9 @@ struct Stream* mock_stream_new() {
out->read = mock_stream_read;
out->read_raw = mock_stream_read_raw;
out->write = mock_stream_write;
struct MockContext* ctx = malloc(sizeof(struct MockContext));
ctx->stream = out;
struct ConnectionContext* ctx = malloc(sizeof(struct ConnectionContext));
ctx->session_context = (struct SessionContext*)malloc(sizeof(struct SessionContext));
ctx->session_context->default_stream = out;
out->stream_context = ctx;
}
return out;
@ -51,7 +48,12 @@ struct Stream* mock_stream_new() {
void mock_stream_free(struct Stream* stream) {
if (stream == NULL)
return;
if (stream->stream_context != NULL)
if (stream->stream_context != NULL) {
struct ConnectionContext* ctx = (struct ConnectionContext*)stream->stream_context;
// this will close the session, which will be a loop, so don't
//libp2p_session_context_free(ctx->session_context);
free(ctx->session_context);
free(stream->stream_context);
}
free(stream);
}

View file

@ -182,7 +182,7 @@ int test_dialer_dial_multistream() {
if (stream != NULL) {
struct SessionContext session_context;
session_context.insecure_stream = stream;
stream->close(&session_context);
stream->close(stream);
libp2p_net_multistream_stream_free(stream);
}
return retVal;

View file

@ -21,7 +21,7 @@ int test_multistream_connect() {
if (stream != NULL) {
struct SessionContext ctx;
ctx.insecure_stream = stream;
stream->close(&ctx);
stream->close(stream);
libp2p_net_multistream_stream_free(stream);
}
@ -61,7 +61,7 @@ int test_multistream_get_list() {
exit:
if (session.insecure_stream != NULL) {
session.insecure_stream->close(&session);
session.insecure_stream->close(session.insecure_stream);
libp2p_net_multistream_stream_free(session.insecure_stream);
}
libp2p_stream_message_free(response);

View file

@ -53,8 +53,7 @@ int test_yamux_stream_new() {
retVal = 1;
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream->stream_context);
mock_stream->close(mock_stream->stream_context);
yamux_stream->close(yamux_stream);
return retVal;
}
@ -71,43 +70,58 @@ int test_yamux_identify() {
goto exit;
// Now add in another protocol
mock_stream->read = mock_identify_read_protocol;
if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(libp2p_yamux_channel_new(yamux_stream)))) {
if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream))) {
goto exit;
}
// tear down
retVal = 1;
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream->stream_context);
mock_stream->close(mock_stream->stream_context);
yamux_stream->close(yamux_stream);
return retVal;
}
int test_yamux_incoming_protocol_request() {
int retVal = 0;
// setup
struct Stream* mock_stream = mock_stream_new();
mock_stream->read = mock_yamux_read_protocol;
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream);
if (yamux_stream == NULL)
goto exit;
// build the protocol handler that can handle identify protocol
// build the protocol handler that can handle yamux and identify protocol
struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1);
struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
struct SessionContext session_context;
session_context.default_stream = yamux_stream;
handler = libp2p_yamux_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
struct Stream* mock_stream = mock_stream_new();
struct SessionContext* session_context = ((struct ConnectionContext*)mock_stream->stream_context)->session_context;
mock_stream->read = mock_yamux_read_protocol;
struct StreamMessage* result_message = NULL;
if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) {
libp2p_logger_error("test_yamux", "Unable to read Yamux protocol from mock stream.\n");
goto exit;
}
if (libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers) < 0) {
libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol unsuccessful.\n");
goto exit;
}
// now we should have upgraded to the yamux protocol
libp2p_stream_message_free(result_message);
result_message = NULL;
if (session_context->default_stream->parent_stream == NULL) {
libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol appeared susccessful, but was not.\n");
goto exit;
}
// Someone is requesting the identity protocol
mock_stream->read = mock_identify_read_protocol;
struct StreamMessage* result_message;
if (!yamux_stream->read(yamux_stream->stream_context, &result_message, 10)) {
if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) {
libp2p_logger_error("test_yamux", "Unable to read identify protocol.\n");
goto exit;
}
// handle the marshaling of the protocol
libp2p_protocol_marshal(result_message, &session_context, protocol_handlers);
libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers);
libp2p_stream_message_free(result_message);
result_message = NULL;
// now verify the results
struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context;
struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context;
if (yamux_context->channels->total != 1) {
libp2p_logger_error("test_yamux", "Identify protocol was not found.\n");
goto exit;
@ -116,8 +130,8 @@ int test_yamux_incoming_protocol_request() {
// tear down
retVal = 1;
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream->stream_context);
mock_stream->close(mock_stream->stream_context);
if (session_context->default_stream != NULL)
session_context->default_stream->close(session_context->default_stream);
libp2p_protocol_handlers_shutdown(protocol_handlers);
return retVal;
}

View file

@ -5,6 +5,7 @@
#include "libp2p/yamux/yamux.h"
#include "libp2p/net/protocol.h"
#include "libp2p/net/stream.h"
#include "libp2p/net/connectionstream.h"
#include "libp2p/conn/session.h"
#include "libp2p/utils/logger.h"
@ -59,12 +60,12 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_send_protocol(struct YamuxContext* context) {
int yamux_send_protocol(struct Stream* stream) {
char* protocol = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
if (!context->stream->parent_stream->write(context->stream->parent_stream->stream_context, &outgoing))
if (!stream->write(stream->stream_context, &outgoing))
return 0;
return 1;
}
@ -99,33 +100,16 @@ int yamux_receive_protocol(struct YamuxContext* context) {
* The remote is attempting to negotiate yamux
* @param msg the incoming message
* @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection
* @param stream the incoming stream
* @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
struct YamuxContext* ctx = (struct YamuxContext*)protocol_context;
// we should have the yamux protocol in msg. Send the protocol back.
if (!yamux_send_protocol(ctx)) {
return 0;
}
/*
struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(msg->data_size);
if (buf == NULL)
int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
struct Stream* new_stream = libp2p_yamux_stream_new(stream);
if (new_stream == NULL)
return -1;
memcpy(buf, msg->data, msg->data_size);
for(;;) {
int retVal = yamux_decode(yamux, msg->data, msg->data_size);
free(buf);
buf = NULL;
if (!retVal)
break;
else { // try to read more from this stream
// TODO need more information as to what this loop should do
}
}
*/
// upgrade
stream->handle_upgrade(stream, new_stream);
return 1;
}
@ -135,13 +119,15 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext*
* @returns true(1)
*/
int yamux_shutdown(void* protocol_context) {
if (protocol_context != NULL)
free(protocol_context);
return 0;
}
struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = handlers;
handler->context = handler;
handler->CanHandle = yamux_can_handle;
handler->HandleMessage = yamux_handle_message;
handler->Shutdown = yamux_shutdown;
@ -155,11 +141,13 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector*
* @param stream_context the YamuxContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_close(void* stream_context) {
if (stream_context == NULL)
int libp2p_yamux_close(struct Stream* stream) {
if (stream == NULL)
return 0;
struct YamuxContext* ctx = (struct YamuxContext*)stream_context;
libp2p_yamux_stream_free(ctx->stream);
if (stream->stream_context == NULL)
return 0;
if (stream->parent_stream->close(stream->parent_stream))
libp2p_yamux_stream_free(stream);
return 1;
}
@ -263,25 +251,6 @@ struct YamuxContext* libp2p_yamux_context_new() {
return ctx;
}
/***
* Free the resources from libp2p_yamux_context_new
* @param ctx the context
*/
void libp2p_yamux_context_free(struct YamuxContext* ctx) {
if (ctx == NULL)
return;
// free all the channels
if (ctx->channels) {
for(int i = 0; i < ctx->channels->total; i++) {
struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i);
curr->close(curr->stream_context);
}
libp2p_utils_vector_free(ctx->channels);
}
free(ctx);
return;
}
int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
@ -343,6 +312,18 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
return retVal;
}
/***
* A new protocol was asked for. Give it a "channel"
* @param yamux_stream the yamux stream
* @param new_stream the newly negotiated protocol
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) {
// put this stream in the collection, and tie it to an id
struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context;
return libp2p_yamux_stream_add(yamux_context, new_stream);
}
/***
* Negotiate the Yamux protocol
* @param parent_stream the parent stream
@ -357,6 +338,7 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
out->write = libp2p_yamux_write;
out->peek = libp2p_yamux_peek;
out->read_raw = libp2p_yamux_read_raw;
out->handle_upgrade = libp2p_yamux_handle_upgrade;
out->address = parent_stream->address;
// build YamuxContext
struct YamuxContext* ctx = libp2p_yamux_context_new();
@ -375,6 +357,44 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
return out;
}
/**
* Clean up resources from libp2p_yamux_channel_new
* @param ctx the YamuxChannelContext
*/
int libp2p_yamux_channel_close(void* context) {
if (context == NULL)
return 0;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context;
if (ctx != NULL) {
// close the child's stream
ctx->child_stream->close(ctx->child_stream);
libp2p_stream_free(ctx->stream);
free(ctx);
}
return 1;
}
/***
* Free the resources from libp2p_yamux_context_new
* @param ctx the context
*/
void libp2p_yamux_context_free(struct YamuxContext* ctx) {
if (ctx == NULL)
return;
// free all the channels
if (ctx->channels) {
for(int i = 0; i < ctx->channels->total; i++) {
struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i);
//curr->close(curr->stream_context);
libp2p_yamux_channel_close(curr->stream_context);
}
libp2p_utils_vector_free(ctx->channels);
}
free(ctx);
return;
}
/**
* Frees resources held by the stream
* @param yamux_stream the stream
@ -387,6 +407,50 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) {
libp2p_stream_free(yamux_stream);
}
/***
* Channels calling close on the stream should not be able
* to clean up layers below
* @param context the context
* @returns true(1);
*/
int libp2p_yamux_channel_null_close(struct Stream* stream) {
return 1;
}
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* NOTE: This "wraps" the incoming stream, so that the returned stream is the parent
* of the incoming_stream
* @param incoming_stream the stream of the new protocol
* @returns a new Stream that has a YamuxChannelContext, and incoming_stream->parent_stream is set to this stream
*/
struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream) {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->address = incoming_stream->address;
// don't allow the incoming_stream to close the channel
out->close = libp2p_yamux_channel_null_close;
out->parent_stream = incoming_stream->parent_stream;
out->peek = incoming_stream->parent_stream->peek;
out->read = incoming_stream->parent_stream->read;
out->read_raw = incoming_stream->parent_stream->read_raw;
out->socket_mutex = incoming_stream->parent_stream->socket_mutex;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
ctx->channel = 0;
ctx->closed = 0;
ctx->state = 0;
ctx->window_size = 0;
ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->yamux_context = incoming_stream->parent_stream->stream_context;
ctx->stream = out;
ctx->child_stream = incoming_stream;
out->stream_context = ctx;
out->write = incoming_stream->parent_stream->write;
incoming_stream->parent_stream = out;
}
return out;
}
/****
* Add a stream "channel" to the yamux handler
* @param ctx the context
@ -396,65 +460,13 @@ void libp2p_yamux_stream_free(struct Stream* yamux_stream) {
int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
if (stream == NULL)
return 0;
// the stream's parent should have a YamuxChannelContext
char proto = ((uint8_t*)stream->parent_stream->stream_context)[0];
if (proto == YAMUX_CHANNEL_CONTEXT) {
// wrap the new stream in a YamuxChannelContext
struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream);
if (channel_stream == NULL)
return 0;
struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context;
// the negotiation was successful. Add it to the list of channels that we have
int itemNo = libp2p_utils_vector_add(ctx->channels, stream);
struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context;
if (incoming->channel != 0) {
// this is wrong. There should have not been a channel number
return 0;
}
incoming->channel = itemNo;
int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream);
channel_context->channel = itemNo;
return 1;
}
return 0;
}
/**
* Clean up resources from libp2p_yamux_channel_new
* @param ctx the YamuxChannelContext
*/
int libp2p_yamux_channel_close(void* context) {
if (context == NULL)
return 0;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context;
if (ctx != NULL) {
if (ctx->stream != NULL)
free(ctx->stream);
free(ctx);
}
return 1;
}
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* @param parent_stream the parent yamux stream
* @returns a new Stream that is a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->address = parent_stream->address;
out->close = libp2p_yamux_channel_close;
out->parent_stream = parent_stream;
out->peek = parent_stream->peek;
out->read = parent_stream->read;
out->read_raw = parent_stream->read_raw;
out->socket_mutex = parent_stream->socket_mutex;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
ctx->channel = 0;
ctx->closed = 0;
ctx->state = 0;
ctx->window_size = 0;
ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->yamux_context = parent_stream->stream_context;
ctx->stream = out;
out->stream_context = ctx;
out->write = parent_stream->write;
}
return out;
}