Various changes for yamux adding a protocol

yamux
John Jones 2017-11-19 13:37:03 -05:00
parent f2e5af4058
commit b3c8e77ed9
18 changed files with 691 additions and 162 deletions

View File

@ -139,7 +139,7 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream;
// yamux over multistream
new_stream = libp2p_yamux_stream_new(new_stream);
new_stream = libp2p_yamux_stream_new(new_stream, 0, NULL);
if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream;
// we have our swarm connection. Now we ask for some "channels"

View File

@ -157,6 +157,26 @@ void libp2p_stream_message_free(struct StreamMessage* msg) {
}
}
/***
* Make a copy of a message
* @param original the original message
* @returns a StreamMessage that is a copy of the original
*/
struct StreamMessage* libp2p_stream_message_copy(const struct StreamMessage* original) {
struct StreamMessage* copy = libp2p_stream_message_new();
if (copy != NULL) {
copy->error_number = original->error_number;
copy->data_size = original->data_size;
copy->data = (uint8_t*) malloc(copy->data_size);
if (copy->data == NULL) {
libp2p_stream_message_free(copy);
return NULL;
}
memcpy(copy->data, original->data, copy->data_size);
}
return copy;
}
/****
* Make a copy of a SessionContext
* @param original the original

View File

@ -3,6 +3,7 @@
#include "varint.h"
#include "libp2p/net/protocol.h"
#include "libp2p/net/protocol.h"
#include "libp2p/net/multistream.h"
#include "libp2p/utils/vector.h"
#include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
@ -50,7 +51,7 @@ int libp2p_identify_send_protocol(struct Stream *stream) {
/***
* Check to see if the reply is the identify header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @param stream the incoming stream of the underlying protocol
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_identify_receive_protocol(struct Stream* stream) {
@ -81,12 +82,11 @@ int libp2p_identify_receive_protocol(struct Stream* stream) {
* @returns <0 on error, 0 if loop should not continue, >0 on success
*/
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// attempt to create a new Identify connection with them.
// send the protocol id back, and set up the channel
struct Stream* new_stream = libp2p_identify_stream_new(stream);
if (new_stream == NULL)
return -1;
// attempt to create a new Identify connection with them.
// send the protocol id back, and set up the channel
//TODO: now add this "channel"
return stream->handle_upgrade(stream, new_stream);
}
@ -149,6 +149,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
ctx->stream = out;
out->stream_context = ctx;
out->close = libp2p_identify_close;
out->negotiate = libp2p_identify_stream_new;
if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) {
libp2p_stream_free(out);
free(ctx);
@ -158,3 +159,40 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
return out;
}
/***
* Create a new stream that negotiates the identify protocol
* on top of the multistream protocol
*
* NOTE: This will be sent by our side (us asking them).
* Incoming "Identify" requests should be handled by the
* external protocol handler, not this function.
*
* @param parent_stream the parent stream
* @returns a new Stream that is a multistream, but with "identify" already negotiated
*/
struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent_stream) {
if (parent_stream == NULL)
return NULL;
struct Stream* multistream = libp2p_net_multistream_stream_new(parent_stream);
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->stream_type = STREAM_TYPE_IDENTIFY;
out->parent_stream = multistream;
struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext));
if (ctx == NULL) {
libp2p_stream_free(out);
return NULL;
}
ctx->parent_stream = multistream;
ctx->stream = out;
out->stream_context = ctx;
out->close = libp2p_identify_close;
out->negotiate = libp2p_identify_stream_new_with_multistream;
if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) {
libp2p_stream_free(out);
free(ctx);
return NULL;
}
}
return out;
}

View File

@ -95,6 +95,15 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx);
*/
struct KademliaMessage* libp2p_net_multistream_get_message(struct Stream* stream);
/**
* Add the transmission size to the front of a StreamMessage.
* NOTE: This is used internally by multistream. It is accessible to help
* with testing.
* @param incoming the incoming message
* @returns a new StreamMessage, in the format of a MessageStream buffer
*/
struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessage* incoming);
struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream);
void libp2p_net_multistream_stream_free(struct Stream* stream);

View File

@ -25,6 +25,12 @@ struct StreamMessage* libp2p_stream_message_new();
*/
void libp2p_stream_message_free(struct StreamMessage* msg);
/***
* Make a copy of a message
* @param original the original message
* @returns a StreamMessage that is a copy of the original
*/
struct StreamMessage* libp2p_stream_message_copy(const struct StreamMessage* original);
/**
* This is a context struct for a basic IP connection
@ -35,6 +41,19 @@ struct ConnectionContext {
struct SessionContext* session_context;
};
/**
* The different types of protocols
*/
enum stream_type {
STREAM_TYPE_UNKNOWN = 0x0,
STREAM_TYPE_MULTISTREAM = 0x1,
STREAM_TYPE_SECIO = 0x2,
STREAM_TYPE_KADEMLIA = 0x3,
STREAM_TYPE_IDENTIFY = 0x4,
STREAM_TYPE_YAMUX = 0x5,
STREAM_TYPE_JOURNAL = 0x6,
STREAM_TYPE_RAW = 0x7
};
/**
* An interface in front of various streams
@ -47,6 +66,7 @@ struct Stream {
pthread_mutex_t* socket_mutex; // only 1 transmission at a time
struct Stream* parent_stream; // what stream wraps this stream
int channel; // the channel (for multiplexing streams)
enum stream_type stream_type;
/**
* A generic place to store implementation-specific context items
@ -103,6 +123,13 @@ struct Stream {
* @param new_stream the newly created stream
*/
int (*handle_upgrade)(struct Stream* stream, struct Stream* new_stream);
/***
* Negotiate this protocol using the parent stream
* @param parent_stream the connection to use
* @returns a new Stream, or NULL on error
*/
struct Stream* (*negotiate)(struct Stream* parent_stream);
};
struct Stream* libp2p_stream_new();

View File

@ -23,14 +23,18 @@ enum yamux_error
yamux_error_intern = 0x02
};
// forward declarations
struct yamux_session;
struct yamux_stream;
struct YamuxContext;
struct Stream;
struct StreamMessage;
typedef void* (*yamux_session_get_str_ud_fn)(struct yamux_session* session, yamux_streamid newid );
typedef void (*yamux_session_ping_fn )(struct yamux_session* session, uint32_t val );
typedef void (*yamux_session_pong_fn )(struct yamux_session* session, uint32_t val, struct timespec dt);
typedef void (*yamux_session_go_away_fn )(struct yamux_session* session, enum yamux_error err );
typedef void (*yamux_session_new_stream_fn)(struct yamux_session* session, struct yamux_stream* stream);
typedef void (*yamux_session_new_stream_fn)(struct YamuxContext* context, struct Stream* stream, struct StreamMessage* msg);
typedef void (*yamux_session_free_fn )(struct yamux_session* sesssion );
struct yamux_session_stream
@ -38,31 +42,71 @@ struct yamux_session_stream
struct yamux_stream* stream;
int alive;
};
/**
* A yamux session. This keeps all the streams related to a yamux session
*/
struct yamux_session
{
struct yamux_config* config;
struct yamux_config* config; // configuration of size of windows and max number of streams
size_t num_streams;
size_t cap_streams;
struct yamux_session_stream* streams;
size_t num_streams; // number of streams
size_t cap_streams; // capacity of stream array
struct yamux_session_stream* streams; // array of streams
/**
* Get user data
*/
yamux_session_get_str_ud_fn get_str_ud_fn;
/**
* Ping
*/
yamux_session_ping_fn ping_fn ;
/**
* Respond to ping
*/
yamux_session_pong_fn pong_fn ;
/**
* Hanging up
*/
yamux_session_go_away_fn go_away_fn ;
/**
* A new stream is coming in
*/
yamux_session_new_stream_fn new_stream_fn;
/**
* Free resources
*/
yamux_session_free_fn free_fn ;
/**
* User data
*/
void* userdata;
/**
* for heartbeat
*/
struct timespec since_ping;
/**
* Session type (client or server)
*/
enum yamux_session_type type;
/***
* The parent stream
*/
struct Stream* parent_stream;
/***
* The next id to use
*/
yamux_streamid nextid;
/**
* Determine if this session is closed
*/
int closed;
};
@ -92,12 +136,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
// defers to stream read handlers
ssize_t yamux_session_read(struct yamux_session* session);
struct YamuxChannelContext;
/**
* Decode an incoming message
* @param channel the channel
* @param context a YamuxChannelContext or YamuxContext
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes
* @param return_message the return message (usually the bytes after the frame)
* @returns true(1) on success, false(0) otherwise
*/
int yamux_decode(struct YamuxChannelContext* channel, const uint8_t* incoming, size_t incoming_size);
int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message);

View File

@ -7,6 +7,9 @@
#include "libp2p/conn/session.h"
#include "libp2p/yamux/yamux.h"
// forward declarations
struct YamuxChannelContext;
// NOTE: 'data' is not guaranteed to be preserved when the read_fn
// handler exists (read: it will be freed).
struct yamux_stream;
@ -35,7 +38,7 @@ struct yamux_stream
yamux_stream_rst_fn rst_fn ;
yamux_stream_free_fn free_fn;
void* userdata;
struct Stream* stream;
enum yamux_stream_state state;
@ -44,16 +47,26 @@ struct yamux_stream
uint32_t window_size;
};
// does not init the stream
struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata);
/**
* Create a new stream that has a YamuxChannelContext
* @param context the Yamux context
* @param id the stream id
* @param msg the incoming message
* @returns a stream that is a Yamux channel
*/
struct Stream* yamux_channel_new(struct YamuxContext* context, yamux_streamid id, struct StreamMessage* msg);
// not obligatory, SYN is sent by yamux_stream_write when the stream
// isn't initialised anyway
ssize_t yamux_stream_init (struct YamuxChannelContext* channel_ctx);
// doesn't free the stream
// uses FIN
ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx);
/***
* Closes the stream
* NOTE: doesn't free the stream, uses FIN
* @param context the YamuxContext or YamuxChannelContext
* @returns number of bytes sent
*/
ssize_t yamux_stream_close(void* context);
// uses RST
ssize_t yamux_stream_reset(struct YamuxChannelContext* stream);
@ -68,9 +81,14 @@ ssize_t yamux_stream_write(struct YamuxChannelContext* ctx, uint32_t data_length
* @param frame the frame
* @param incoming the stream bytes (after the frame)
* @param incoming_size the size of incoming
* @param session_context the SessionContext
* @returns the number of bytes processed (can be zero) or negative number on error
*/
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context);
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size);
/**
* Retrieve the flags for this context
* @param context the context
* @returns the correct flag
*/
enum yamux_frame_flags get_flags(void* context);

View File

@ -21,6 +21,9 @@ struct YamuxContext {
struct Stream* stream;
struct yamux_session* session;
struct Libp2pVector* channels;
int am_server;
int state; // the state of the connection
struct Libp2pVector* protocol_handlers;
};
struct YamuxChannelContext {
@ -60,7 +63,14 @@ int yamux_send_protocol(struct Stream* stream);
*/
int yamux_receive_protocol(struct YamuxContext* context);
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream);
/***
* Negotiate the Yamux protocol
* @param parent_stream the parent stream
* @param am_server true(1) if we are considered the server, false(0) if we are the client.
* @param protocol_handlers the protocol handlers (used when a new protocol is requested)
* @returns a Stream initialized and ready for yamux
*/
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_server, struct Libp2pVector* protocol_handlers);
void libp2p_yamux_stream_free(struct Stream* stream);
@ -74,9 +84,22 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream);
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* @param parent_stream the parent yamux stream
* @returns a new Stream that is a YamuxChannelContext
* NOTE: If incoming_stream is not of the Yamux protocol, this "wraps" the incoming
* stream, so that the returned stream is the parent of the incoming_stream. If the
* incoming stream is of the yamux protocol, the YamuxChannelContext.child_stream
* will be NULL, awaiting an upgrade to fill it in.
* @param incoming_stream the stream of the new protocol
* @param channelNumber the channel number (0 if unknown)
* @returns a new Stream that has a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream);
struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber);
void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx);
/***
* Prepare a new Yamux StreamMessage based on another StreamMessage
* NOTE: This is here for testing. This should normally not be used.
* @param incoming the incoming message
* @returns a new StreamMessage that has a yamux_frame
*/
struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming);

View File

@ -117,6 +117,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg)
struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context) {
struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream));
if (out != NULL) {
out->stream_type = STREAM_TYPE_RAW;
out->close = libp2p_net_connection_close;
out->peek = libp2p_net_connection_peek;
out->read = libp2p_net_connection_read;

View File

@ -131,6 +131,31 @@ int libp2p_net_multistream_peek(void* stream_context) {
return parent_stream->peek(parent_stream->stream_context);
}
/**
* Add the transmission size to the front of a StreamMessage.
* NOTE: This is used internally by multistream. It is accessible to help
* with testing.
* @param incoming the incoming message
* @returns a new StreamMessage, in the format of a MessageStream buffer
*/
struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessage* incoming) {
struct StreamMessage* out = libp2p_stream_message_new();
if (out != NULL) {
unsigned char varint[12];
size_t varint_size = 0;
varint_encode(incoming->data_size, &varint[0], 12, &varint_size);
out->data_size = varint_size + incoming->data_size;
out->data = malloc(out->data_size);
if (out->data == NULL) {
libp2p_stream_message_free(out);
return NULL;
}
memcpy(&out->data[0], varint, varint_size);
memcpy(&out->data[varint_size], incoming->data, incoming->data_size);
}
return out;
}
/**
* Write to an open multistream host
* @param stream_context the session context
@ -143,27 +168,14 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* inc
int num_bytes = 0;
if (incoming->data_size > 0) { // only do this is if there is something to send
// first get the size as a varint
unsigned char varint[12];
size_t varint_size = 0;
varint_encode(incoming->data_size, &varint[0], 12, &varint_size);
// now put the size with the data
struct StreamMessage outgoing;
outgoing.data_size = varint_size + incoming->data_size;
outgoing.data = (uint8_t*) malloc(outgoing.data_size);
if (outgoing.data == NULL) {
return 0;
}
memset(outgoing.data, 0, outgoing.data_size);
memcpy(outgoing.data, varint, varint_size);
memcpy(&outgoing.data[varint_size], incoming->data, incoming->data_size);
struct StreamMessage* out = libp2p_net_multistream_prepare_to_send(incoming);
// now ship it
libp2p_logger_debug("multistream", "Attempting write %d bytes.\n", (int)outgoing.data_size);
num_bytes = parent_stream->write(parent_stream->stream_context, &outgoing);
libp2p_logger_debug("multistream", "Attempting write %d bytes.\n", (int)out->data_size);
num_bytes = parent_stream->write(parent_stream->stream_context, out);
// subtract the varint if all went well
if (num_bytes == outgoing.data_size)
if (num_bytes == out->data_size)
num_bytes = incoming->data_size;
free(outgoing.data);
libp2p_stream_message_free(out);
}
return num_bytes;
@ -349,7 +361,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx) {
void libp2p_net_multistream_stream_free(struct Stream* stream) {
if (stream != NULL) {
stream->parent_stream->close(stream->parent_stream->stream_context);
stream->parent_stream->close(stream->parent_stream);
// TODO: free memory allocations
}
}
@ -370,6 +382,7 @@ int libp2p_net_multistream_read_raw(void* stream_context, uint8_t* buffer, int b
struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) {
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream));
if (out != NULL) {
out->stream_type = STREAM_TYPE_MULTISTREAM;
out->parent_stream = parent_stream;
out->close = libp2p_net_multistream_close;
out->read = libp2p_net_multistream_read;

View File

@ -42,7 +42,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() {
/***
* Handle an incoming message
* @param message the incoming message
* @param session the SessionContext of the incoming connection
* @param stream the stream the message came in on
* @param handlers a Vector of protocol handlers
* @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success
*/

View File

@ -1353,6 +1353,7 @@ int libp2p_secio_close(struct Stream* stream) {
struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) {
struct Stream* new_stream = libp2p_stream_new();
if (new_stream != NULL) {
new_stream->stream_type = STREAM_TYPE_SECIO;
struct SecioContext* ctx = (struct SecioContext*) malloc(sizeof(struct SecioContext));
if (ctx == NULL) {
libp2p_stream_free(new_stream);

View File

@ -3,6 +3,9 @@
#include <unistd.h>
#include "libp2p/net/stream.h"
struct StreamMessage* mock_message = NULL;
int mock_message_position = 0;
void mock_stream_free(struct Stream* stream);
int mock_stream_close(struct Stream* stream) {
@ -14,19 +17,27 @@ int mock_stream_close(struct Stream* stream) {
}
int mock_stream_peek(void* context) {
return 1;
if (mock_message == NULL)
return 0;
return mock_message->data_size;
}
int mock_stream_read(void* context, struct StreamMessage** msg, int timeout_secs) {
*msg = libp2p_stream_message_copy(mock_message);
return 1;
}
int mock_stream_read_raw(void* context, uint8_t* buffer, int buffer_size, int timeout_secs) {
return 1;
if (mock_message == NULL)
return 0;
int min = buffer_size > mock_message->data_size - mock_message_position ? mock_message->data_size - mock_message_position : buffer_size;
memcpy(buffer, mock_message->data, min);
mock_message_position += min;
return min;
}
int mock_stream_write(void* context, struct StreamMessage* msg) {
return 1;
return msg->data_size;
}
struct Stream* mock_stream_new() {

View File

@ -3,11 +3,22 @@
#include "libp2p/identify/identify.h"
#include "mock_stream.h"
#include "libp2p/utils/logger.h"
#include "libp2p/net/stream.h"
#include "libp2p/net/multistream.h"
/***
* Helpers
*/
struct StreamMessage* build_message(const char* data) {
struct StreamMessage* out = libp2p_stream_message_new();
if (out != NULL) {
out->data_size = strlen(data);
out->data = (uint8_t*) malloc(out->data_size);
memcpy(out->data, data, out->data_size);
}
return out;
}
/***
* Sends back the yamux protocol to fake negotiation
*/
@ -22,18 +33,49 @@ int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int netw
}
/***
* Sends back the yamux protocol to fake negotiation
* Sends back the identify protocol (in a yamux wrapper) to fake negotiation
*/
int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
*msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg;
struct StreamMessage message;
const char* id = "/ipfs/id/1.0.0\n";
message->data_size = strlen(id);
message->data = malloc(message->data_size);
memcpy(message->data, id, message->data_size);
message.data_size = strlen(id);
message.data = (uint8_t*)id;
*msg = libp2p_yamux_prepare_to_send(&message);
// adjust the frame
struct yamux_frame* frame = (struct yamux_frame*)(*msg)->data;
frame->streamid = 1;
frame->flags = yamux_frame_syn;
encode_frame(frame);
return 1;
}
int mock_counter = 0;
/***
* Sends back the yamux protocol to fake negotiation
*/
int mock_multistream_then_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
// prepare the message
*msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg;
message->data_size = mock_message->data_size - mock_message_position;
message->data = malloc(message->data_size);
memcpy(message->data, &mock_message->data[mock_message_position], message->data_size);
if (mock_counter == 0) {
// this is the first time through. Set mock_message to the identify protocol
libp2p_stream_message_free(mock_message);
mock_message = libp2p_net_multistream_prepare_to_send(build_message("/ipfs/id/1.0.0\n"));
mock_message_position = 0;
} else {
libp2p_stream_message_free(mock_message);
mock_message = NULL;
mock_message_position = 0;
}
return (*msg != NULL);
}
/***
* Tests
*/
@ -43,10 +85,11 @@ int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int n
*/
int test_yamux_stream_new() {
int retVal = 0;
const char* yamux_id = "/yamux/1.0.0\n";
// setup
struct Stream* mock_stream = mock_stream_new();
mock_stream->read = mock_yamux_read_protocol;
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream);
mock_message = build_message(yamux_id);
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream, 0, NULL);
if (yamux_stream == NULL)
goto exit;
// tear down
@ -54,6 +97,8 @@ int test_yamux_stream_new() {
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream);
if (mock_message != NULL)
libp2p_stream_message_free(mock_message);
return retVal;
}
@ -63,9 +108,15 @@ int test_yamux_stream_new() {
int test_yamux_identify() {
int retVal = 0;
// setup
// mock
struct Stream* mock_stream = mock_stream_new();
mock_stream->read = mock_yamux_read_protocol;
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream);
// protocol handlers
struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1);
struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
// yamux
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream, 0, protocol_handlers);
if (yamux_stream == NULL)
goto exit;
// Now add in another protocol
@ -78,12 +129,91 @@ int test_yamux_identify() {
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream);
libp2p_protocol_handlers_shutdown(protocol_handlers);
if (mock_message != NULL) {
libp2p_stream_message_free(mock_message);
mock_message = NULL;
}
return retVal;
}
int test_yamux_incoming_protocol_request() {
int retVal = 0;
// setup
// build the protocol handler that can handle yamux, multistream, and identify protocol
struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1);
struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
handler = libp2p_yamux_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler);
// set up basic streams
struct Stream* mock_stream = mock_stream_new();
struct SessionContext* session_context = ((struct ConnectionContext*)mock_stream->stream_context)->session_context;
mock_message = build_message("/yamux/1.0.0\n");
struct StreamMessage* result_message = NULL;
if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) {
libp2p_logger_error("test_yamux", "Unable to read Yamux protocol from mock stream.\n");
goto exit;
}
if (libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers) < 0) {
libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol unsuccessful.\n");
goto exit;
}
// now we should have upgraded to the yamux protocol
libp2p_stream_message_free(result_message);
result_message = NULL;
if (session_context->default_stream->parent_stream == NULL) {
libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol appeared susccessful, but was not.\n");
goto exit;
}
// Someone is requesting the multistream protocol
libp2p_stream_message_free(mock_message);
mock_message = libp2p_yamux_prepare_to_send(libp2p_net_multistream_prepare_to_send(build_message("/multistream/1.0.0\n")));
// act like this is new
struct yamux_frame* frame = (struct yamux_frame*)mock_message->data;
frame->streamid = (uint32_t)1;
frame->flags = yamux_frame_syn;
encode_frame(frame);
mock_stream->read = mock_stream_read;
if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) {
libp2p_logger_error("test_yamux", "Unable to read multistream protocol.\n");
goto exit;
}
// handle the marshaling of the multistream protocol
libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers);
libp2p_stream_message_free(result_message);
result_message = NULL;
// now verify the results
if (session_context->default_stream->stream_type != STREAM_TYPE_YAMUX) {
libp2p_logger_error("test_yamux", "Expected stream type of %d, but received %d.\n", STREAM_TYPE_YAMUX, session_context->default_stream->stream_type);
goto exit;
}
struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context;
if (yamux_context->channels->total != 2) {
libp2p_logger_error("test_yamux", "Identify protocol was not found.\n");
goto exit;
}
// tear down
retVal = 1;
exit:
if (session_context->default_stream != NULL)
session_context->default_stream->close(session_context->default_stream);
libp2p_protocol_handlers_shutdown(protocol_handlers);
return retVal;
}
/**
* Attempt to negotiate the identity protocol, then use it.
* This makes sure the framing is working correctly betwee identity
* and yamux
*/
int test_yamux_identity_frame() {
int retVal = 0;
// setup
// build the protocol handler that can handle yamux and identify protocol
struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1);
@ -111,7 +241,7 @@ int test_yamux_incoming_protocol_request() {
goto exit;
}
// Someone is requesting the identity protocol
mock_stream->read = mock_identify_read_protocol;
mock_stream->read = mock_multistream_then_identify_read_protocol;
if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) {
libp2p_logger_error("test_yamux", "Unable to read identify protocol.\n");
goto exit;
@ -122,11 +252,15 @@ int test_yamux_incoming_protocol_request() {
result_message = NULL;
// now verify the results
struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context;
if (yamux_context->channels->total != 1) {
if (yamux_context->channels->total != 2) {
libp2p_logger_error("test_yamux", "Identify protocol was not found.\n");
goto exit;
}
// prepare a yamux frame that is an identity message
// send the message
// tear down
retVal = 1;
exit:
@ -134,4 +268,5 @@ int test_yamux_incoming_protocol_request() {
session_context->default_stream->close(session_context->default_stream);
libp2p_protocol_handlers_shutdown(protocol_handlers);
return retVal;
}

View File

@ -147,12 +147,27 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
/**
* Decode an incoming message
* @param session the session
* @param context the YamuxContext or YamuxChannelContext
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise
* @param return_message the results (usually the stuff after the frame)
* @returns 0 on success, negative number on error
*/
int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* incoming, size_t incoming_size) {
int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message) {
// retrieve the yamux context
struct yamux_session* yamux_session = NULL;
struct YamuxContext* yamuxContext = NULL;
if (context == NULL)
return 0;
if ( ((char*)context)[0] == YAMUX_CONTEXT) {
yamuxContext = (struct YamuxContext*)context;
yamux_session = yamuxContext->session;
} else if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) {
struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)context;
yamuxContext = channelContext->yamux_context;
yamux_session = channelContext->yamux_context->session;
}
// decode frame
struct yamux_frame f;
@ -173,14 +188,14 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
case yamux_frame_ping: // ping
if (f.flags & yamux_frame_syn)
{
yamux_session_ping(channelContext->yamux_context->session, f.length, 1);
yamux_session_ping(yamux_session, f.length, 1);
if (channelContext->yamux_context->session->ping_fn)
channelContext->yamux_context->session->ping_fn(channelContext->yamux_context->session, f.length);
if (yamux_session->ping_fn)
yamux_session->ping_fn(yamux_session, f.length);
}
else if ((f.flags & yamux_frame_ack) && channelContext->yamux_context->session->pong_fn)
else if ((f.flags & yamux_frame_ack) && yamux_session->pong_fn)
{
struct timespec now, dt, last = channelContext->yamux_context->session->since_ping;
struct timespec now, dt, last = yamux_session->since_ping;
if (!timespec_get(&now, TIME_UTC))
return -EACCES;
@ -193,32 +208,33 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
else
dt.tv_nsec = now.tv_nsec - last.tv_nsec;
channelContext->yamux_context->session->pong_fn(channelContext->yamux_context->session, f.length, dt);
yamux_session->pong_fn(yamux_session, f.length, dt);
}
else
return -EPROTO;
break;
case yamux_frame_go_away: // go away (hanging up)
channelContext->yamux_context->session->closed = 1;
if (channelContext->yamux_context->session->go_away_fn)
channelContext->yamux_context->session->go_away_fn(channelContext->yamux_context->session, (enum yamux_error)f.length);
yamux_session->closed = 1;
if (yamux_session->go_away_fn)
yamux_session->go_away_fn(yamux_session, (enum yamux_error)f.length);
break;
default:
return -EPROTO;
}
else { // we're handling a stream, not something at the yamux protocol level
for (size_t i = 0; i < channelContext->yamux_context->session->cap_streams; ++i)
for (size_t i = 0; i < yamux_session->cap_streams; ++i)
{
struct yamux_session_stream* ss = &channelContext->yamux_context->session->streams[i];
struct yamux_session_stream* ss = &yamux_session->streams[i];
struct yamux_stream* s = ss->stream;
if (!ss->alive || s->state == yamux_stream_closed)
if (!ss->alive || s->state == yamux_stream_closed) // skip dead or closed streams
continue;
if (s->id == f.streamid)
if (s->id == f.streamid) // we have a match between the stored stream and the current stream
{
if (f.flags & yamux_frame_rst)
{
// close the stream
s->state = yamux_stream_closed;
if (s->rst_fn)
@ -228,7 +244,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
{
// local stream didn't initiate FIN
if (s->state != yamux_stream_closing)
yamux_stream_close(channelContext);
yamux_stream_close(context);
s->state = yamux_stream_closed;
@ -237,6 +253,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
}
else if (f.flags & yamux_frame_ack)
{
// acknowldegement
if (s->state != yamux_stream_syn_sent)
return -EPROTO;
@ -246,7 +263,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
return -EPROTO;
int sz = sizeof(struct yamux_frame);
ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, channelContext->yamux_context->stream->parent_stream->stream_context);
ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz);
return (re < 0) ? re : (re + incoming_size);
}
}
@ -255,17 +272,22 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco
// It must not exist yet, so let's try to make it
if (f.flags & yamux_frame_syn)
{
void* ud = NULL; // user data
struct StreamMessage* msg = libp2p_stream_message_new();
if (channelContext->yamux_context->session->get_str_ud_fn)
ud = channelContext->yamux_context->session->get_str_ud_fn(channelContext->yamux_context->session, f.streamid);
if (incoming_size > sizeof(struct yamux_frame)) {
msg->data_size = incoming_size - sizeof(struct yamux_frame);
msg->data = malloc(msg->data_size);
memcpy(msg->data, &incoming[sizeof(struct yamux_frame)], msg->data_size);
}
struct yamux_stream* st = yamux_stream_new(channelContext->yamux_context->session, f.streamid, ud);
struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg);
struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context;
if (channelContext->yamux_context->session->new_stream_fn)
channelContext->yamux_context->session->new_stream_fn(channelContext->yamux_context->session, st);
if (yamux_session->new_stream_fn)
yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg);
st->state = yamux_stream_syn_recv;
channelContext->state = yamux_stream_syn_recv;
*return_message = msg;
}
else
return -EPROTO;

View File

@ -9,57 +9,62 @@
#include "libp2p/net/stream.h"
#include "libp2p/yamux/frame.h"
#include "libp2p/yamux/stream.h"
#include "libp2p/yamux/yamux.h"
#define MIN(x,y) (y^((x^y)&-(x<y)))
#define MAX(x,y) (x^((x^y)&-(x<y)))
/***
* Create a new stream
* @param session the session
* @param context the yamux context
* @param id the id (0 to set it to the next id)
* @Param userdata the user data
* @Param msg the message (probably the protocol id)
* @returns a new yamux_stream struct
*/
struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata)
struct Stream* yamux_channel_new(struct YamuxContext* context, yamux_streamid id, struct StreamMessage* msg)
{
if (!session)
if (!context)
return NULL;
struct yamux_session* session = context->session;
if (!id)
{
id = session->nextid;
session->nextid += 2;
}
struct yamux_stream* st = NULL;
struct yamux_session_stream* ss;
struct yamux_stream* y_stream = NULL;
struct yamux_session_stream* session_stream = NULL;
if (session->num_streams != session->cap_streams)
if (session->num_streams != session->cap_streams) {
// attempt to reuse dead streams
for (size_t i = 0; i < session->cap_streams; ++i)
{
ss = &session->streams[i];
session_stream = &session->streams[i];
if (!ss->alive)
if (!session_stream->alive)
{
st = ss->stream;
ss->alive = 1;
y_stream = session_stream->stream;
session_stream->alive = 1;
goto FOUND;
}
}
}
if (session->cap_streams == session->config->accept_backlog)
return NULL;
ss = &session->streams[session->cap_streams];
// we didn't find a dead stream, so create a new one
session_stream = &session->streams[session->cap_streams];
if (ss->alive)
if (session_stream->alive)
return NULL;
session->cap_streams++;
ss->alive = 1;
st = ss->stream = malloc(sizeof(struct yamux_stream));
session_stream->alive = 1;
y_stream = session_stream->stream = malloc(sizeof(struct yamux_stream));
FOUND:;
@ -72,12 +77,24 @@ FOUND:;
.read_fn = NULL,
.fin_fn = NULL,
.rst_fn = NULL,
.userdata = userdata
.stream = libp2p_yamux_channel_stream_new(context->stream, id)
};
*st = nst;
*y_stream = nst;
return st;
if (libp2p_protocol_marshal(msg, nst.stream, context->protocol_handlers) >= 0) {
// success
}
/*
struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream);
struct YamuxChannelContext* channel = (struct YamuxChannelContext*)channelStream->stream_context;
channel->channel = id;
channel->child_stream = NULL;
channel->state = yamux_stream_inited;
return channelStream;
*/
return 0;
}
/**
@ -121,25 +138,40 @@ ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx)
/***
* Close a stream
* @param stream the stream
* @param context the YamuxChannelContext or YamuxContext
* @returns the number of bytes sent
*/
ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx)
ssize_t yamux_stream_close(void* context)
{
if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed)
return -EINVAL;
if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) {
struct YamuxChannelContext* channel_ctx = (struct YamuxChannelContext*) context;
if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed)
return -EINVAL;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_fin,
.streamid = channel_ctx->channel,
.length = 0
};
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_fin,
.streamid = channel_ctx->channel,
.length = 0
};
channel_ctx->state = yamux_stream_closing;
channel_ctx->state = yamux_stream_closing;
return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
} else if ( ((char*)context)[0] == YAMUX_CONTEXT) {
struct YamuxContext* ctx = (struct YamuxContext*)context;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_fin,
.streamid = 0,
.length = 0
};
return yamux_write_frame(ctx, &f);
}
return 0;
}
/**
@ -165,19 +197,42 @@ ssize_t yamux_stream_reset(struct YamuxChannelContext* channel_ctx)
return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
}
static enum yamux_frame_flags get_flags(struct YamuxChannelContext* ctx)
{
switch (ctx->state)
{
case yamux_stream_inited:
ctx->state = yamux_stream_syn_sent;
return yamux_frame_syn;
case yamux_stream_syn_recv:
ctx->state = yamux_stream_est;
return yamux_frame_ack;
default:
return 0;
}
/**
* Retrieve the flags for this context
* @param context the context
* @returns the correct flag
*/
enum yamux_frame_flags get_flags(void* context) {
if (context == NULL)
return 0;
if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) {
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context;
switch (ctx->state)
{
case yamux_stream_inited:
ctx->state = yamux_stream_syn_sent;
return yamux_frame_syn;
case yamux_stream_syn_recv:
ctx->state = yamux_stream_est;
return yamux_frame_ack;
default:
return 0;
}
} else if ( ((char*)context)[0] == YAMUX_CONTEXT) {
struct YamuxContext* ctx = (struct YamuxContext*)context;
switch (ctx->state)
{
case yamux_stream_inited:
ctx->state = yamux_stream_syn_sent;
return yamux_frame_syn;
case yamux_stream_syn_recv:
ctx->state = yamux_stream_est;
return yamux_frame_ack;
default:
return 0;
}
}
return 0;
}
/**
@ -300,10 +355,9 @@ void yamux_stream_free(struct yamux_stream* stream)
* @param frame the frame
* @param incoming the stream bytes (after the frame)
* @param incoming_size the size of incoming
* @param session_context the SessionContext
* @returns the number of bytes processed (can be zero) or negative number on error
*/
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context)
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size)
{
struct yamux_frame f = *frame;

View File

@ -153,7 +153,7 @@ int do_client() {
}
sess->new_stream_fn = on_new;
struct yamux_stream* strm = yamux_stream_new(sess, 0, NULL);
struct yamux_stream* strm = yamux_channel_new(sess, 0, NULL);
if (!strm)
{
printf("yamux_new_stream() failed\n");

View File

@ -105,7 +105,7 @@ int yamux_receive_protocol(struct YamuxContext* context) {
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
struct Stream* new_stream = libp2p_yamux_stream_new(stream);
struct Stream* new_stream = libp2p_yamux_stream_new(stream, 1, protocol_context);
if (new_stream == NULL)
return -1;
// upgrade
@ -127,7 +127,7 @@ int yamux_shutdown(void* protocol_context) {
struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = handler;
handler->context = handlers;
handler->CanHandle = yamux_can_handle;
handler->HandleMessage = yamux_handle_message;
handler->Shutdown = yamux_shutdown;
@ -146,8 +146,11 @@ int libp2p_yamux_close(struct Stream* stream) {
return 0;
if (stream->stream_context == NULL)
return 0;
if (stream->parent_stream->close(stream->parent_stream))
libp2p_yamux_stream_free(stream);
struct Stream* parent_stream = stream->parent_stream;
// this should close everything above yamux (i.e. the protocols that are riding on top of yamux)
libp2p_yamux_stream_free(stream);
// and this should close everything below
parent_stream->close(parent_stream);
return 1;
}
@ -180,14 +183,50 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
return 0;
// TODO: This is not right. It must be sorted out.
struct StreamMessage* msg = *message;
return yamux_decode(channel, msg->data, msg->data_size);
if (yamux_decode(channel, msg->data, msg->data_size, message) == 0)
return 1;
} else if (ctx != NULL) {
// We are still negotiating...
return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout);
// We are still negotiating. They are probably attempting to negotiate a new protocol
struct StreamMessage* incoming = NULL;
if (ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &incoming, yamux_default_timeout)) {
// parse the frame
if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) {
libp2p_stream_message_free(incoming);
return 1;
}
libp2p_stream_message_free(incoming);
}
}
return 0;
}
/***
* Prepare a new Yamux StreamMessage based on another StreamMessage
* NOTE: The frame is not encoded yet
* @param incoming the incoming message
* @returns a new StreamMessage that has a yamux_frame
*/
struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming) {
struct StreamMessage* out = libp2p_stream_message_new();
if (out != NULL) {
out->data_size = sizeof(struct yamux_frame) + incoming->data_size;
out->data = (uint8_t*) malloc(out->data_size);
if (out->data == NULL) {
libp2p_stream_message_free(out);
return NULL;
}
memset(out->data, 0, out->data_size);
// the first part of the data is the yamux frame
// Set values in the frame, which is the first part of the outgoing message data
struct yamux_frame* frame = (struct yamux_frame*)out->data;
frame->length = incoming->data_size;
frame->type = yamux_frame_data;
frame->version = YAMUX_VERSION;
// the last part of the data is the original data
memcpy(&out->data[sizeof(struct yamux_frame)], incoming->data, incoming->data_size);
}
return out;
}
/***
* Write to the remote
* @param stream_context the context. Could be a YamuxContext or YamuxChannelContext
@ -209,14 +248,28 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) {
ctx = (struct YamuxContext*)stream_context;
}
if (ctx == NULL && channel == NULL)
return 0;
struct StreamMessage* outgoing_message = libp2p_yamux_prepare_to_send(message);
// now convert fame for network use
struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data;
// set a few more flags
frame->flags = get_flags(stream_context);
if (channel != NULL)
frame->streamid = channel->channel;
encode_frame(frame);
int retVal = 0;
if (channel != NULL && channel->channel != 0) {
// we have an established channel. Use it.
return yamux_stream_write(channel, message->data_size, message->data);
retVal = channel->stream->write(channel->stream->stream_context, outgoing_message);
} else if (ctx != NULL) {
// We are still negotiating...
return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message);
retVal = ctx->stream->parent_stream->write(ctx->stream->parent_stream, outgoing_message);
}
return 0;
libp2p_stream_message_free(outgoing_message);
return retVal;
}
/***
@ -241,12 +294,20 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size
return -1;
}
struct YamuxContext* libp2p_yamux_context_new() {
/**
* Create a new YamuxContext struct
* @param stream the parent stream
* @returns a YamuxContext
*/
struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) {
struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext));
if (ctx != NULL) {
ctx->type = YAMUX_CONTEXT;
ctx->stream = NULL;
ctx->channels = libp2p_utils_vector_new(1);
ctx->session = yamux_session_new(NULL, stream, yamux_session_server, NULL);
ctx->am_server = 0;
ctx->state = 0;
}
return ctx;
}
@ -324,14 +385,31 @@ int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_
return libp2p_yamux_stream_add(yamux_context, new_stream);
}
void libp2p_yamux_read_from_yamux_session(struct yamux_stream* stream, uint32_t data_len, void* data) {
}
/***
* Internal yamux code calls this when a new stream is created
* @param context the context
* @param stream the new stream
*/
void libp2p_yamux_new_stream(struct YamuxContext* context, struct Stream* stream, struct StreamMessage* msg) {
// ok, we have the new stream structure. We now need to read what was sent.
libp2p_protocol_marshal(msg, stream, context->protocol_handlers);
}
/***
* Negotiate the Yamux protocol
* @param parent_stream the parent stream
* @param am_server true(1) if we are considered the server, false(0) if we are the client.
* @param protocol_handlers the protocol handlers
* @returns a Stream initialized and ready for yamux
*/
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_server, struct Libp2pVector* protocol_handlers) {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->stream_type = STREAM_TYPE_YAMUX;
out->parent_stream = parent_stream;
out->close = libp2p_yamux_close;
out->read = libp2p_yamux_read;
@ -341,13 +419,16 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
out->handle_upgrade = libp2p_yamux_handle_upgrade;
out->address = parent_stream->address;
// build YamuxContext
struct YamuxContext* ctx = libp2p_yamux_context_new();
struct YamuxContext* ctx = libp2p_yamux_context_new(out);
if (ctx == NULL) {
libp2p_yamux_stream_free(out);
return NULL;
}
ctx->session->new_stream_fn = libp2p_yamux_new_stream;
out->stream_context = ctx;
ctx->stream = out;
ctx->am_server = am_server;
ctx->protocol_handlers = protocol_handlers;
// attempt to negotiate yamux protocol
if (!libp2p_yamux_negotiate(ctx)) {
libp2p_yamux_stream_free(out);
@ -391,6 +472,8 @@ void libp2p_yamux_context_free(struct YamuxContext* ctx) {
}
libp2p_utils_vector_free(ctx->channels);
}
if (ctx->session != NULL)
yamux_session_free(ctx->session);
free(ctx);
return;
}
@ -419,34 +502,54 @@ int libp2p_yamux_channel_null_close(struct Stream* stream) {
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* NOTE: This "wraps" the incoming stream, so that the returned stream is the parent
* of the incoming_stream
* NOTE: If incoming_stream is not of the Yamux protocol, this "wraps" the incoming
* stream, so that the returned stream is the parent of the incoming_stream. If the
* incoming stream is of the yamux protocol, the YamuxChannelContext.child_stream
* will be NULL, awaiting an upgrade to fill it in.
* @param incoming_stream the stream of the new protocol
* @returns a new Stream that has a YamuxChannelContext, and incoming_stream->parent_stream is set to this stream
* @param channelNumber the channel number (0 if unknown)
* @returns a new Stream that has a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream) {
struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber) {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
int isYamux = 0;
char first_char = ((uint8_t*)incoming_stream->stream_context)[0];
if (first_char == YAMUX_CONTEXT)
isYamux = 1;
out->stream_type = STREAM_TYPE_YAMUX;
out->address = incoming_stream->address;
// don't allow the incoming_stream to close the channel
out->close = libp2p_yamux_channel_null_close;
out->parent_stream = incoming_stream->parent_stream;
out->peek = incoming_stream->parent_stream->peek;
out->read = incoming_stream->parent_stream->read;
out->read_raw = incoming_stream->parent_stream->read_raw;
out->socket_mutex = incoming_stream->parent_stream->socket_mutex;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
ctx->channel = 0;
if (!isYamux) {
out->parent_stream = incoming_stream->parent_stream;
out->peek = incoming_stream->parent_stream->peek;
out->read = incoming_stream->parent_stream->read;
out->read_raw = incoming_stream->parent_stream->read_raw;
out->write = incoming_stream->parent_stream->write;
out->socket_mutex = incoming_stream->parent_stream->socket_mutex;
ctx->yamux_context = incoming_stream->parent_stream->stream_context;
ctx->child_stream = incoming_stream;
// this does the wrap
incoming_stream->parent_stream = out;
} else {
out->parent_stream = incoming_stream;
out->peek = incoming_stream->peek;
out->read = incoming_stream->read;
out->read_raw = incoming_stream->read_raw;
out->write = incoming_stream->write;
out->socket_mutex = incoming_stream->socket_mutex;
ctx->yamux_context = incoming_stream->stream_context;
ctx->child_stream = NULL;
}
ctx->channel = channelNumber;
ctx->closed = 0;
ctx->state = 0;
ctx->window_size = 0;
ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->yamux_context = incoming_stream->parent_stream->stream_context;
ctx->stream = out;
ctx->child_stream = incoming_stream;
out->stream_context = ctx;
out->write = incoming_stream->parent_stream->write;
incoming_stream->parent_stream = out;
}
return out;
}
@ -461,12 +564,22 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
if (stream == NULL)
return 0;
// wrap the new stream in a YamuxChannelContext
struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream);
struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream, 0);
if (channel_stream == NULL)
return 0;
struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context;
// the negotiation was successful. Add it to the list of channels that we have
int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream);
// There are 2 streams for each protocol. A server has the even numbered streams, the
// client the odd number streams. If we are the server, we need to kick off the
// process to add a stream of the same type.
channel_context->channel = itemNo;
if (ctx->am_server && itemNo % 2 != 0) {
// we're the server, and they have a negotiated a new protocol.
// negotiate a stream for us to talk to them.
struct Stream* yamux_stream = stream->parent_stream->parent_stream;
struct Stream* server_to_client_stream = stream->negotiate(yamux_stream);
libp2p_yamux_stream_add(ctx, server_to_client_stream);
}
return 1;
}