More details of the yamux implementation

This commit is contained in:
John Jones 2017-11-06 13:36:11 -05:00
parent d3f740b4e0
commit 852629a4f8
14 changed files with 345 additions and 85 deletions

View file

@ -40,7 +40,7 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) {
struct StreamMessage msg; struct StreamMessage msg;
msg.data = (uint8_t*) protocol; msg.data = (uint8_t*) protocol;
msg.data_size = strlen(protocol); msg.data_size = strlen(protocol);
if (!context->parent_stream->write(context, &msg)) { if (!context->parent_stream->write(context->parent_stream->stream_context, &msg)) {
libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n");
return 0; return 0;
} }
@ -56,21 +56,34 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) {
int libp2p_identify_receive_protocol(struct IdentifyContext* context) { int libp2p_identify_receive_protocol(struct IdentifyContext* context) {
const char *protocol = "/ipfs/id/1.0.0\n"; const char *protocol = "/ipfs/id/1.0.0\n";
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
if (!context->parent_stream->read(context, &results, 30)) { if (!context->parent_stream->read(context->parent_stream->stream_context, &results, 30)) {
libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n");
return 0; return 0;
} }
// the first byte is the size, so skip it // the first byte may be the size, so skip it
char* ptr = strstr((char*)&results[1], protocol); int start = 0;
if (results->data[0] != '/')
start = 1;
char* ptr = strstr((char*)&results->data[start], protocol);
if (ptr == NULL || ptr - (char*)results > 1) { if (ptr == NULL || ptr - (char*)results > 1) {
return 0; return 0;
} }
return 1; return 1;
} }
/**
* A remote node is attempting to send us an Identify message
* @param msg the message sent
* @param context the SessionContext
* @param protocol_context the identify protocol context
* @returns <0 on error, 0 if loop should not continue, >0 on success
*/
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) {
//TODO: Implement if (protocol_context == NULL)
return 0; return -1;
//struct IdentifyContext* ctx = (struct IdentifyContext*) protocol_context;
// TODO: Do something with the incoming msg
return 1;
} }
/** /**
@ -108,6 +121,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
return NULL; return NULL;
struct Stream* out = libp2p_stream_new(); struct Stream* out = libp2p_stream_new();
if (out != NULL) { if (out != NULL) {
out->parent_stream = parent_stream;
struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext)); struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext));
if (ctx == NULL) { if (ctx == NULL) {
libp2p_stream_free(out); libp2p_stream_free(out);

View file

@ -1,5 +1,7 @@
#pragma once #pragma once
#include "libp2p/utils/vector.h"
typedef struct { typedef struct {
// publicKey is this node's public key (which also gives its node.ID) // publicKey is this node's public key (which also gives its node.ID)
// - may not need to be sent, as secure channel implies it has been sent. // - may not need to be sent, as secure channel implies it has been sent.

View file

@ -46,7 +46,6 @@ struct Stream {
struct MultiAddress* address; // helps identify who is on the other end struct MultiAddress* address; // helps identify who is on the other end
pthread_mutex_t* socket_mutex; // only 1 transmission at a time pthread_mutex_t* socket_mutex; // only 1 transmission at a time
struct Stream* parent_stream; // what stream wraps this stream struct Stream* parent_stream; // what stream wraps this stream
int channel; // the channel this stream uses, for multiplexing protocols such as yamux
/** /**
* A generic place to store implementation-specific context items * A generic place to store implementation-specific context items
*/ */

View file

@ -3,11 +3,12 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <time.h> #include <time.h>
#include <sys/types.h>
#include "config.h" #include "config.h"
#include "frame.h" #include "frame.h"
#include "stream.h" #include "stream.h"
#include "libp2p/conn/session.h" #include "libp2p/net/stream.h"
enum yamux_session_type enum yamux_session_type
{ {
@ -57,7 +58,7 @@ struct yamux_session
enum yamux_session_type type; enum yamux_session_type type;
struct SessionContext* session_context; struct Stream* parent_stream;
yamux_streamid nextid; yamux_streamid nextid;
@ -72,13 +73,14 @@ struct yamux_session
* @param userdata user data * @param userdata user data
* @returns the yamux_session struct * @returns the yamux_session struct
*/ */
struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata); struct yamux_session* yamux_session_new(struct yamux_config* config, struct Stream* parent_stream, enum yamux_session_type type, void* userdata);
// does not close the socket, but does close the session // does not close the socket, but does close the session
void yamux_session_free(struct yamux_session* session); void yamux_session_free(struct yamux_session* session);
// does not free used memory // does not free used memory
ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err); ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err);
inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err) inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err)
{ {
return yamux_session_close(session, err); return yamux_session_close(session, err);
@ -96,5 +98,3 @@ ssize_t yamux_session_read(struct yamux_session* session);
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size); int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size);

View file

@ -2,6 +2,7 @@
#include "libp2p/net/protocol.h" #include "libp2p/net/protocol.h"
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
#include "libp2p/yamux/stream.h"
/*** /***
* Declarations for the Yamux protocol * Declarations for the Yamux protocol
@ -9,15 +10,25 @@
static const int yamux_default_timeout = 10; static const int yamux_default_timeout = 10;
static const char YAMUX_CONTEXT = 'Y';
static const char YAMUX_CHANNEL_CONTEXT = 'C';
/*** /***
* Context struct for Yamux * Context struct for Yamux
*/ */
struct YamuxContext { struct YamuxContext {
char type;
struct Stream* stream; struct Stream* stream;
struct yamux_session* session; struct yamux_session* session;
struct Libp2pVector* channels; struct Libp2pVector* channels;
}; };
struct YamuxChannelContext {
char type;
struct YamuxContext* yamux_context;
struct yamux_stream* channel;
};
/** /**
* Build a handler that can handle the yamux protocol * Build a handler that can handle the yamux protocol
*/ */
@ -40,6 +51,8 @@ int yamux_receive_protocol(struct SessionContext* context);
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream);
void libp2p_yamux_stream_free(struct Stream* stream);
/**** /****
* Add a stream "channel" to the yamux handler * Add a stream "channel" to the yamux handler
* @param ctx the context * @param ctx the context
@ -47,3 +60,10 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream);
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream); int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream);
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* @param parent_stream the parent yamux stream
* @returns a new Stream that is a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream);

View file

@ -15,7 +15,6 @@ struct Stream* libp2p_stream_new() {
stream->socket_mutex = NULL; stream->socket_mutex = NULL;
stream->stream_context = NULL; stream->stream_context = NULL;
stream->write = NULL; stream->write = NULL;
stream->channel = 0;
} }
return stream; return stream;
} }

View file

@ -1,5 +1,5 @@
CC = gcc CC = gcc
CFLAGS = -O0 -I../include -I../../c-protobuf -I../../c-multihash/include -I../../c-multiaddr/include -g3 -std=c99 CFLAGS = -O0 -I../include -I../../c-protobuf -I../../c-multihash/include -I../../c-multiaddr/include -g3 -std=c11
LFLAGS = LFLAGS =
DEPS = DEPS =
OBJS = peer.o peerstore.o providerstore.o OBJS = peer.o peerstore.o providerstore.o

View file

@ -1,5 +1,5 @@
CC = gcc CC = gcc
CFLAGS = -O0 -I../include -I. -I../../c-multihash/include -I../../c-multiaddr/include -std=c99 CFLAGS = -O0 -I../include -I. -I../../c-multihash/include -I../../c-multiaddr/include -std=c11
ifdef DEBUG ifdef DEBUG
CFLAGS += -g3 CFLAGS += -g3

57
test/mock_stream.h Normal file
View file

@ -0,0 +1,57 @@
#pragma once
#include <unistd.h>
#include "libp2p/net/stream.h"
struct MockContext {
struct Stream* stream;
};
void mock_stream_free(struct Stream* stream);
int mock_stream_close(void* context) {
if (context == NULL)
return 1;
struct MockContext* ctx = (struct MockContext*)context;
mock_stream_free(ctx->stream);
return 1;
}
int mock_stream_peek(void* context) {
return 1;
}
int mock_stream_read(void* context, struct StreamMessage** msg, int timeout_secs) {
return 1;
}
int mock_stream_read_raw(void* context, uint8_t* buffer, int buffer_size, int timeout_secs) {
return 1;
}
int mock_stream_write(void* context, struct StreamMessage* msg) {
return 1;
}
struct Stream* mock_stream_new() {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->close = mock_stream_close;
out->peek = mock_stream_peek;
out->read = mock_stream_read;
out->read_raw = mock_stream_read_raw;
out->write = mock_stream_write;
struct MockContext* ctx = malloc(sizeof(struct MockContext));
ctx->stream = out;
out->stream_context = ctx;
}
return out;
}
void mock_stream_free(struct Stream* stream) {
if (stream == NULL)
return;
if (stream->stream_context != NULL)
free(stream->stream_context);
free(stream);
}

79
test/test_yamux.h Normal file
View file

@ -0,0 +1,79 @@
#pragma once
#include "libp2p/yamux/yamux.h"
#include "libp2p/identify/identify.h"
#include "mock_stream.h"
/***
* Helpers
*/
/***
* Sends back the yamux protocol to fake negotiation
*/
int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
*msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg;
message->data = "/yamux/1.0.0\n";
message->data_size = strlen(message->data);
return 1;
}
/***
* Sends back the yamux protocol to fake negotiation
*/
int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
*msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg;
message->data = "/ipfs/id/1.0.0\n";
message->data_size = strlen(message->data);
return 1;
}
/***
* Tests
*/
/***
* Verify that we can initiate a yamux session
*/
int test_yamux_stream_new() {
int retVal = 0;
// setup
struct Stream* mock_stream = mock_stream_new();
mock_stream->read = mock_yamux_read_protocol;
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream);
if (yamux_stream == NULL)
goto exit;
// tear down
retVal = 1;
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream->stream_context);
mock_stream->close(mock_stream->stream_context);
return retVal;
}
/***
* Attempt to add a protocol to the Yamux protocol
*/
int test_yamux_identify() {
int retVal = 0;
// setup
struct Stream* mock_stream = mock_stream_new();
mock_stream->read = mock_yamux_read_protocol;
struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream);
if (yamux_stream == NULL)
goto exit;
// TODO: Now add in another protocol
mock_stream->read = mock_identify_read_protocol;
if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(libp2p_yamux_channel_new(yamux_stream)))) {
goto exit;
}
// tear down
retVal = 1;
exit:
if (yamux_stream != NULL)
yamux_stream->close(yamux_stream->stream_context);
mock_stream->close(mock_stream->stream_context);
return retVal;
}

View file

@ -13,6 +13,7 @@
#include "test_conn.h" #include "test_conn.h"
#include "test_record.h" #include "test_record.h"
#include "test_peer.h" #include "test_peer.h"
#include "test_yamux.h"
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
struct test { struct test {
@ -113,6 +114,8 @@ int build_test_collection() {
add_test("test_peer_protobuf", test_peer_protobuf,1); add_test("test_peer_protobuf", test_peer_protobuf,1);
add_test("test_peerstore", test_peerstore,1); add_test("test_peerstore", test_peerstore,1);
add_test("test_aes", test_aes, 1); add_test("test_aes", test_aes, 1);
add_test("test_yamux_stream_new", test_yamux_stream_new, 1);
add_test("test_yamux_identify", test_yamux_identify, 1);
return 1; return 1;
}; };

View file

@ -22,9 +22,9 @@ static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG;
* @param userdata user data * @param userdata user data
* @returns the yamux_session struct * @returns the yamux_session struct
*/ */
struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata) struct yamux_session* yamux_session_new(struct yamux_config* config, struct Stream* parent_stream, enum yamux_session_type type, void* userdata)
{ {
if (!session_context) if (!parent_stream)
return NULL; return NULL;
if (!config) if (!config)
@ -42,7 +42,7 @@ struct yamux_session* yamux_session_new(struct yamux_config* config, struct Sess
if (sess != NULL) { if (sess != NULL) {
sess->config = config; sess->config = config;
sess->type = type; sess->type = type;
sess->session_context = session_context; sess->parent_stream = parent_stream;
sess->closed = 0; sess->closed = 0;
sess->nextid = 1 + (type == yamux_session_server); sess->nextid = 1 + (type == yamux_session_server);
sess->num_streams = 0; sess->num_streams = 0;
@ -108,7 +108,7 @@ ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err)
outgoing.data = (uint8_t*)&f; outgoing.data = (uint8_t*)&f;
outgoing.data_size = sizeof(struct yamux_frame); outgoing.data_size = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, &outgoing)) if (!session->parent_stream->write(session->parent_stream->stream_context, &outgoing))
return 0; return 0;
return outgoing.data_size; return outgoing.data_size;
} }
@ -139,13 +139,14 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)&f; outgoing.data = (uint8_t*)&f;
outgoing.data_size = sizeof(struct yamux_frame); outgoing.data_size = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, &outgoing)) if (!session->parent_stream->write(session->parent_stream->stream_context, &outgoing))
return 0; return 0;
return outgoing.data_size; return outgoing.data_size;
} }
/** /**
* Decode an incoming message * Decode an incoming message
* @param session the session
* @param incoming the incoming bytes * @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes * @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
@ -157,13 +158,15 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
if (incoming_size < sizeof(struct yamux_frame)) { if (incoming_size < sizeof(struct yamux_frame)) {
return 0; return 0;
} }
memcpy(f, incoming, sizeof(struct yamux_frame));
decode_frame(&f); decode_frame(&f);
// check yamux version // check yamux version
if (f.version != YAMUX_VERSION) if (f.version != YAMUX_VERSION)
return 0; return 0;
if (!f.streamid) // we're not dealing with a stream if (!f.streamid) // we're not dealing with a stream, we're dealing with something at the yamux protocol level
switch (f.type) switch (f.type)
{ {
case yamux_frame_ping: // ping case yamux_frame_ping: // ping
@ -202,7 +205,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
default: default:
return -EPROTO; return -EPROTO;
} }
else { // we're handling a stream else { // we're handling a stream, not something at the yamux protocol level
for (size_t i = 0; i < session->cap_streams; ++i) for (size_t i = 0; i < session->cap_streams; ++i)
{ {
struct yamux_session_stream* ss = &session->streams[i]; struct yamux_session_stream* ss = &session->streams[i];
@ -242,15 +245,16 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
return -EPROTO; return -EPROTO;
int sz = sizeof(struct yamux_frame); int sz = sizeof(struct yamux_frame);
ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->session_context); ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->parent_stream->stream_context);
return (re < 0) ? re : (re + incoming_size); return (re < 0) ? re : (re + incoming_size);
} }
} }
// stream doesn't exist yet // This stream is not in my list of streams.
// It must not exist yet, so let's try to make it
if (f.flags & yamux_frame_syn) if (f.flags & yamux_frame_syn)
{ {
void* ud = NULL; void* ud = NULL; // user data
if (session->get_str_ud_fn) if (session->get_str_ud_fn)
ud = session->get_str_ud_fn(session, f.streamid); ud = session->get_str_ud_fn(session, f.streamid);

View file

@ -13,6 +13,14 @@
#define MIN(x,y) (y^((x^y)&-(x<y))) #define MIN(x,y) (y^((x^y)&-(x<y)))
#define MAX(x,y) (x^((x^y)&-(x<y))) #define MAX(x,y) (x^((x^y)&-(x<y)))
/***
* Create a new stream
* @param session the session
* @param id the id (0 to set it to the next id)
* @Param userdata the user data
* @returns a new yamux_stream struct
*/
struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata) struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata)
{ {
if (!session) if (!session)
@ -82,7 +90,7 @@ int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f)
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)f; outgoing.data = (uint8_t*)f;
outgoing.data_size = sizeof(struct yamux_frame); outgoing.data_size = sizeof(struct yamux_frame);
if (!yamux_stream->session->session_context->default_stream->write(yamux_stream->session->session_context, &outgoing)) if (!yamux_stream->session->parent_stream->write(yamux_stream->session->parent_stream->stream_context, &outgoing))
return 0; return 0;
return outgoing.data_size; return outgoing.data_size;
} }
@ -196,32 +204,31 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta)
} }
/*** /***
* Write data to the stream * Write data to the stream.
* @param stream the stream * @param stream the stream (includes the "channel")
* @param data_length the length of the data to be sent * @param data_length the length of the data to be sent
* @param data_ the data to be sent * @param data_ the data to be sent
* @return the number of bytes sent * @return the number of bytes sent
*/ */
ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_) ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_)
{ {
// validate parameters
if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed
|| stream->state == yamux_stream_closing || stream->session->closed) || stream->state == yamux_stream_closing || stream->session->closed)
return -EINVAL; return -EINVAL;
// gather details
char* data = (char*)data_; char* data = (char*)data_;
struct yamux_session* s = stream->session; struct yamux_session* s = stream->session;
char* data_end = data + data_length; char* data_end = data + data_length;
uint32_t ws = stream->window_size; uint32_t ws = stream->window_size;
yamux_streamid id = stream->id; yamux_streamid id = stream->id;
char sendd[ws + sizeof(struct yamux_frame)]; char sendd[ws + sizeof(struct yamux_frame)];
// Send the data, breaking it up into pieces if it is too large
while (data < data_end) { while (data < data_end) {
uint32_t uint32_t dr = (uint32_t)(data_end - data); // length of the data for this round
dr = (uint32_t)(data_end - data), uint32_t adv = MIN(dr, ws); // the size of the data we will send this round
adv = MIN(dr, ws);
struct yamux_frame f = (struct yamux_frame){ struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION , .version = YAMUX_VERSION ,
@ -232,15 +239,19 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo
}; };
encode_frame(&f); encode_frame(&f);
// put the frame into the buffer
memcpy(sendd, &f, sizeof(struct yamux_frame)); memcpy(sendd, &f, sizeof(struct yamux_frame));
// put the data into the buffer
memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv); memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv);
// send the buffer through the network
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)sendd; outgoing.data = (uint8_t*)sendd;
outgoing.data_size = adv + sizeof(struct yamux_frame); outgoing.data_size = adv + sizeof(struct yamux_frame);
if (!s->session_context->default_stream->write(s->session_context, &outgoing)) if (!s->parent_stream->write(s->parent_stream->stream_context, &outgoing))
return adv; return adv;
// prepare to loop again
data += adv; data += adv;
} }

View file

@ -35,6 +35,7 @@ int yamux_can_handle(const struct StreamMessage* msg) {
* @param msg the message * @param msg the message
* @param incoming the stream buffer * @param incoming the stream buffer
*/ */
/*
void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
struct Libp2pVector* handlers = stream->userdata; struct Libp2pVector* handlers = stream->userdata;
int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers); int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers);
@ -50,6 +51,7 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
} }
return; return;
} }
*/
/*** /***
* Send the yamux protocol out the default stream * Send the yamux protocol out the default stream
@ -94,7 +96,7 @@ int yamux_receive_protocol(struct SessionContext* context) {
} }
/*** /***
* Handles the message * The remote is attempting to negotiate yamux
* @param msg the incoming message * @param msg the incoming message
* @param incoming_size the size of the incoming data buffer * @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection * @param session_context the information about the incoming connection
@ -102,8 +104,7 @@ int yamux_receive_protocol(struct SessionContext* context) {
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
// they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context);
struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(msg->data_size); uint8_t* buf = (uint8_t*) malloc(msg->data_size);
if (buf == NULL) if (buf == NULL)
return -1; return -1;
@ -119,37 +120,7 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext*
} }
} }
/* return 1;
struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context;
uint8_t* results = NULL;
size_t bytes_read = 0;
int numRetries = 0;
int retVal = 0;
int max_retries = 100; // try for 5 minutes
for(;;) {
// try to read for 5 seconds
if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) {
// we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (yamux_can_handle(results, bytes_read))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
}
}
*/
return 0;
} }
/** /**
@ -173,7 +144,10 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector*
} }
int libp2p_yamux_close(void* stream_context) { int libp2p_yamux_close(void* stream_context) {
//TODO: Implement if (stream_context == NULL)
return 0;
struct YamuxContext* ctx = (struct YamuxContext*)stream_context;
libp2p_yamux_stream_free(ctx->stream);
return 0; return 0;
} }
@ -186,19 +160,62 @@ int libp2p_yamux_close(void* stream_context) {
* @returns true(1) on success, false(0) on failure * @returns true(1) on success, false(0) on failure
*/ */
int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) { int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) {
struct YamuxContext* ctx = (struct YamuxContext*)stream_context; if (stream_context == NULL)
struct Stream* parent_stream = ctx->stream->parent_stream;
struct StreamMessage* incoming;
if (!parent_stream->read(parent_stream->stream_context, &incoming, timeout_secs))
return 0; return 0;
// look at the first byte of the context to determine if this is a YamuxContext (we're negotiating)
// or a YamuxChannelContext (we're talking to an established channel)
struct YamuxContext* ctx = NULL;
struct YamuxChannelContext* channel = NULL;
char proto = ((uint8_t*)stream_context)[0];
if (proto == YAMUX_CHANNEL_CONTEXT) {
channel = (struct YamuxChannelContext*)stream_context;
ctx = channel->yamux_context;
} else if (proto == YAMUX_CONTEXT) {
ctx = (struct YamuxContext*)stream_context;
}
// we've got bytes from the network. process them as a yamux frame if (channel != NULL && channel->channel != NULL) {
return yamux_decode(ctx->session, incoming->data, incoming->data_size); // we have an established channel. Use it.
if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout))
return 0;
// TODO: This is not right. It must be sorted out.
struct StreamMessage* msg = *message;
return yamux_decode(channel->channel->session, msg->data, msg->data_size);
} else if (ctx != NULL) {
// We are still negotiating...
return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout);
}
return 0;
} }
/***
* Write to the remote
* @param stream_context the context. Could be a YamuxContext or YamuxChannelContext
* @param message the message to write
* @returns the number of bytes written
*/
int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) {
//TODO: Implement if (stream_context == NULL)
return 0;
// look at the first byte of the context to determine if this is a YamuxContext (we're negotiating)
// or a YamuxChannelContext (we're talking to an established channel)
struct YamuxContext* ctx = NULL;
struct YamuxChannelContext* channel = NULL;
char proto = ((uint8_t*)stream_context)[0];
if (proto == YAMUX_CHANNEL_CONTEXT) {
channel = (struct YamuxChannelContext*)stream_context;
ctx = channel->yamux_context;
} else if (proto == YAMUX_CONTEXT) {
ctx = (struct YamuxContext*)stream_context;
}
if (channel != NULL && channel->channel != NULL) {
// we have an established channel. Use it.
return yamux_stream_write(channel->channel, message->data_size, message->data);
} else if (ctx != NULL) {
// We are still negotiating...
return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message);
}
return 0; return 0;
} }
@ -227,14 +244,19 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size
struct YamuxContext* libp2p_yamux_context_new() { struct YamuxContext* libp2p_yamux_context_new() {
struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext));
if (ctx != NULL) { if (ctx != NULL) {
ctx->type = YAMUX_CONTEXT;
ctx->stream = NULL; ctx->stream = NULL;
ctx->channels = libp2p_utils_vector_new(1); ctx->channels = libp2p_utils_vector_new(1);
} }
return ctx; return ctx;
} }
void libp2p_yamux_stream_free(struct Stream* yamux_stream) { void libp2p_yamux_context_free(struct YamuxContext* ctx) {
//TODO: Implement if (ctx == NULL)
return;
libp2p_utils_vector_free(ctx->channels);
free(ctx);
return;
} }
int libp2p_yamux_negotiate(struct YamuxContext* ctx) { int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
@ -302,7 +324,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
* @returns a Stream initialized and ready for yamux * @returns a Stream initialized and ready for yamux
*/ */
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); struct Stream* out = libp2p_stream_new();
if (out != NULL) { if (out != NULL) {
out->parent_stream = parent_stream; out->parent_stream = parent_stream;
out->close = libp2p_yamux_close; out->close = libp2p_yamux_close;
@ -328,6 +350,18 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
return out; return out;
} }
/**
* Frees resources held by the stream
* @param yamux_stream the stream
*/
void libp2p_yamux_stream_free(struct Stream* yamux_stream) {
if (yamux_stream == NULL)
return;
struct YamuxContext* ctx = (struct YamuxContext*)yamux_stream->stream_context;
libp2p_yamux_context_free(ctx);
libp2p_stream_free(yamux_stream);
}
/**** /****
* Add a stream "channel" to the yamux handler * Add a stream "channel" to the yamux handler
* @param ctx the context * @param ctx the context
@ -335,8 +369,46 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
if (stream == NULL)
return 0;
// the stream's parent should have a YamuxChannelContext
char proto = ((uint8_t*)stream->parent_stream->stream_context)[0];
if (proto == YAMUX_CHANNEL_CONTEXT) {
// the negotiation was successful. Add it to the list of channels that we have
int itemNo = libp2p_utils_vector_add(ctx->channels, stream); int itemNo = libp2p_utils_vector_add(ctx->channels, stream);
stream->channel = itemNo; struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context;
if (incoming->channel == NULL) {
// this is wrong. There should have been a yamux_stream there
return 0;
}
incoming->channel->id = itemNo;
return 1; return 1;
}
return 0;
} }
/**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* @param parent_stream the parent yamux stream
* @returns a new Stream that is a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) {
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
out->address = parent_stream->address;
out->close = parent_stream->close;
out->parent_stream = parent_stream;
out->peek = parent_stream->peek;
out->read = parent_stream->read;
out->read_raw = parent_stream->read_raw;
out->socket_mutex = parent_stream->socket_mutex;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->yamux_context = parent_stream->stream_context;
out->stream_context = ctx;
out->write = parent_stream->write;
}
return out;
}