squashed some memory leak bugs in yamux

yamux
John Jones 2017-11-06 16:38:55 -05:00
parent 852629a4f8
commit 18b0139b81
9 changed files with 167 additions and 83 deletions

View File

@ -65,9 +65,11 @@ int libp2p_identify_receive_protocol(struct IdentifyContext* context) {
if (results->data[0] != '/') if (results->data[0] != '/')
start = 1; start = 1;
char* ptr = strstr((char*)&results->data[start], protocol); char* ptr = strstr((char*)&results->data[start], protocol);
if (ptr == NULL || ptr - (char*)results > 1) { if (ptr == NULL || ptr - (char*)&results->data[start] > 1) {
libp2p_stream_message_free(results);
return 0; return 0;
} }
libp2p_stream_message_free(results);
return 1; return 1;
} }
@ -106,6 +108,16 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp
return handler; return handler;
} }
int libp2p_identify_close(void* stream_context) {
if (stream_context == NULL)
return 0;
struct IdentifyContext* ctx = (struct IdentifyContext*)stream_context;
ctx->parent_stream->close(ctx->parent_stream->stream_context);
free(ctx->stream);
free(ctx);
return 1;
}
/*** /***
* Create a new stream that negotiates the identify protocol * Create a new stream that negotiates the identify protocol
* *
@ -128,7 +140,9 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
return NULL; return NULL;
} }
ctx->parent_stream = parent_stream; ctx->parent_stream = parent_stream;
ctx->stream = out;
out->stream_context = ctx; out->stream_context = ctx;
out->close = libp2p_identify_close;
if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) { if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) {
libp2p_stream_free(out); libp2p_stream_free(out);
free(ctx); free(ctx);

View File

@ -25,6 +25,7 @@ typedef struct {
struct IdentifyContext { struct IdentifyContext {
struct Stream* parent_stream; struct Stream* parent_stream;
struct Stream* stream;
}; };
int libp2p_identify_can_handle(const struct StreamMessage* msg); int libp2p_identify_can_handle(const struct StreamMessage* msg);

View File

@ -9,6 +9,7 @@
#include "frame.h" #include "frame.h"
#include "stream.h" #include "stream.h"
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
//#include "libp2p/yamux/yamux.h"
enum yamux_session_type enum yamux_session_type
{ {
@ -91,10 +92,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
// defers to stream read handlers // defers to stream read handlers
ssize_t yamux_session_read(struct yamux_session* session); ssize_t yamux_session_read(struct yamux_session* session);
struct YamuxChannelContext;
/** /**
* Decode an incoming message * Decode an incoming message
* @param channel the channel
* @param incoming the incoming bytes * @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes * @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size); int yamux_decode(struct YamuxChannelContext* channel, const uint8_t* incoming, size_t incoming_size);

View File

@ -5,6 +5,7 @@
#include "session.h" #include "session.h"
#include "libp2p/conn/session.h" #include "libp2p/conn/session.h"
#include "libp2p/yamux/yamux.h"
// NOTE: 'data' is not guaranteed to be preserved when the read_fn // NOTE: 'data' is not guaranteed to be preserved when the read_fn
// handler exists (read: it will be freed). // handler exists (read: it will be freed).
@ -48,18 +49,18 @@ struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_strea
// not obligatory, SYN is sent by yamux_stream_write when the stream // not obligatory, SYN is sent by yamux_stream_write when the stream
// isn't initialised anyway // isn't initialised anyway
ssize_t yamux_stream_init (struct yamux_stream* stream); ssize_t yamux_stream_init (struct YamuxChannelContext* channel_ctx);
// doesn't free the stream // doesn't free the stream
// uses FIN // uses FIN
ssize_t yamux_stream_close(struct yamux_stream* stream); ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx);
// uses RST // uses RST
ssize_t yamux_stream_reset(struct yamux_stream* stream); ssize_t yamux_stream_reset(struct YamuxChannelContext* stream);
void yamux_stream_free(struct yamux_stream* stream); void yamux_stream_free(struct yamux_stream* stream);
ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta); ssize_t yamux_stream_window_update(struct YamuxChannelContext* ctx, int32_t delta);
ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data); ssize_t yamux_stream_write(struct YamuxChannelContext* ctx, uint32_t data_length, void* data);
/*** /***
* process stream * process stream

View File

@ -26,7 +26,15 @@ struct YamuxContext {
struct YamuxChannelContext { struct YamuxChannelContext {
char type; char type;
struct YamuxContext* yamux_context; struct YamuxContext* yamux_context;
struct yamux_stream* channel; struct Stream* stream;
// the channel number
int channel;
// the window size for this channel
int window_size;
// the state of the connection
int state;
// whether or not the connection is closed
int closed;
}; };
/** /**
@ -39,7 +47,7 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler();
* @param context the SessionContext * @param context the SessionContext
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_send_protocol(struct SessionContext* context); int yamux_send_protocol(struct YamuxContext* context);
/*** /***
* Check to see if the reply is the yamux protocol header we expect * Check to see if the reply is the yamux protocol header we expect
@ -47,7 +55,7 @@ int yamux_send_protocol(struct SessionContext* context);
* @param context the SessionContext * @param context the SessionContext
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_receive_protocol(struct SessionContext* context); int yamux_receive_protocol(struct YamuxContext* context);
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream);
@ -67,3 +75,5 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream);
* @returns a new Stream that is a YamuxChannelContext * @returns a new Stream that is a YamuxChannelContext
*/ */
struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream); struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream);
void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx);

View File

@ -13,8 +13,10 @@
int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
*msg = libp2p_stream_message_new(); *msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg; struct StreamMessage* message = *msg;
message->data = "/yamux/1.0.0\n"; const char* id = "/yamux/1.0.0\n";
message->data_size = strlen(message->data); message->data_size = strlen(id);
message->data = malloc(message->data_size);
memcpy(message->data, id, message->data_size);
return 1; return 1;
} }
@ -24,8 +26,10 @@ int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int netw
int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) {
*msg = libp2p_stream_message_new(); *msg = libp2p_stream_message_new();
struct StreamMessage* message = *msg; struct StreamMessage* message = *msg;
message->data = "/ipfs/id/1.0.0\n"; const char* id = "/ipfs/id/1.0.0\n";
message->data_size = strlen(message->data); message->data_size = strlen(id);
message->data = malloc(message->data_size);
memcpy(message->data, id, message->data_size);
return 1; return 1;
} }

View File

@ -11,6 +11,7 @@
#include "libp2p/os/timespec.h" #include "libp2p/os/timespec.h"
#include "libp2p/yamux/session.h" #include "libp2p/yamux/session.h"
#include "libp2p/yamux/stream.h" #include "libp2p/yamux/stream.h"
#include "libp2p/yamux/yamux.h"
static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG;
@ -151,14 +152,14 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
* @param incoming_size the size of the incoming bytes * @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size) { int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* incoming, size_t incoming_size) {
// decode frame // decode frame
struct yamux_frame f; struct yamux_frame f;
if (incoming_size < sizeof(struct yamux_frame)) { if (incoming_size < sizeof(struct yamux_frame)) {
return 0; return 0;
} }
memcpy(f, incoming, sizeof(struct yamux_frame)); memcpy((void*)&f, incoming, sizeof(struct yamux_frame));
decode_frame(&f); decode_frame(&f);
@ -172,14 +173,14 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
case yamux_frame_ping: // ping case yamux_frame_ping: // ping
if (f.flags & yamux_frame_syn) if (f.flags & yamux_frame_syn)
{ {
yamux_session_ping(session, f.length, 1); yamux_session_ping(channelContext->yamux_context->session, f.length, 1);
if (session->ping_fn) if (channelContext->yamux_context->session->ping_fn)
session->ping_fn(session, f.length); channelContext->yamux_context->session->ping_fn(channelContext->yamux_context->session, f.length);
} }
else if ((f.flags & yamux_frame_ack) && session->pong_fn) else if ((f.flags & yamux_frame_ack) && channelContext->yamux_context->session->pong_fn)
{ {
struct timespec now, dt, last = session->since_ping; struct timespec now, dt, last = channelContext->yamux_context->session->since_ping;
if (!timespec_get(&now, TIME_UTC)) if (!timespec_get(&now, TIME_UTC))
return -EACCES; return -EACCES;
@ -192,23 +193,23 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
else else
dt.tv_nsec = now.tv_nsec - last.tv_nsec; dt.tv_nsec = now.tv_nsec - last.tv_nsec;
session->pong_fn(session, f.length, dt); channelContext->yamux_context->session->pong_fn(channelContext->yamux_context->session, f.length, dt);
} }
else else
return -EPROTO; return -EPROTO;
break; break;
case yamux_frame_go_away: // go away (hanging up) case yamux_frame_go_away: // go away (hanging up)
session->closed = 1; channelContext->yamux_context->session->closed = 1;
if (session->go_away_fn) if (channelContext->yamux_context->session->go_away_fn)
session->go_away_fn(session, (enum yamux_error)f.length); channelContext->yamux_context->session->go_away_fn(channelContext->yamux_context->session, (enum yamux_error)f.length);
break; break;
default: default:
return -EPROTO; return -EPROTO;
} }
else { // we're handling a stream, not something at the yamux protocol level else { // we're handling a stream, not something at the yamux protocol level
for (size_t i = 0; i < session->cap_streams; ++i) for (size_t i = 0; i < channelContext->yamux_context->session->cap_streams; ++i)
{ {
struct yamux_session_stream* ss = &session->streams[i]; struct yamux_session_stream* ss = &channelContext->yamux_context->session->streams[i];
struct yamux_stream* s = ss->stream; struct yamux_stream* s = ss->stream;
if (!ss->alive || s->state == yamux_stream_closed) if (!ss->alive || s->state == yamux_stream_closed)
@ -227,7 +228,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
{ {
// local stream didn't initiate FIN // local stream didn't initiate FIN
if (s->state != yamux_stream_closing) if (s->state != yamux_stream_closing)
yamux_stream_close(s); yamux_stream_close(channelContext);
s->state = yamux_stream_closed; s->state = yamux_stream_closed;
@ -245,7 +246,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
return -EPROTO; return -EPROTO;
int sz = sizeof(struct yamux_frame); int sz = sizeof(struct yamux_frame);
ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->parent_stream->stream_context); ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, channelContext->yamux_context->stream->parent_stream->stream_context);
return (re < 0) ? re : (re + incoming_size); return (re < 0) ? re : (re + incoming_size);
} }
} }
@ -256,13 +257,13 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t
{ {
void* ud = NULL; // user data void* ud = NULL; // user data
if (session->get_str_ud_fn) if (channelContext->yamux_context->session->get_str_ud_fn)
ud = session->get_str_ud_fn(session, f.streamid); ud = channelContext->yamux_context->session->get_str_ud_fn(channelContext->yamux_context->session, f.streamid);
struct yamux_stream* st = yamux_stream_new(session, f.streamid, ud); struct yamux_stream* st = yamux_stream_new(channelContext->yamux_context->session, f.streamid, ud);
if (session->new_stream_fn) if (channelContext->yamux_context->session->new_stream_fn)
session->new_stream_fn(session, st); channelContext->yamux_context->session->new_stream_fn(channelContext->yamux_context->session, st);
st->state = yamux_stream_syn_recv; st->state = yamux_stream_syn_recv;
} }

View File

@ -85,12 +85,12 @@ FOUND:;
* @param yamux_stream the stream context * @param yamux_stream the stream context
* @param f the frame * @param f the frame
*/ */
int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f) { int yamux_write_frame(struct YamuxContext* ctx, struct yamux_frame* f) {
encode_frame(f); encode_frame(f);
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)f; outgoing.data = (uint8_t*)f;
outgoing.data_size = sizeof(struct yamux_frame); outgoing.data_size = sizeof(struct yamux_frame);
if (!yamux_stream->session->parent_stream->write(yamux_stream->session->parent_stream->stream_context, &outgoing)) if (!ctx->stream->write(ctx->stream->stream_context, &outgoing))
return 0; return 0;
return outgoing.data_size; return outgoing.data_size;
} }
@ -100,9 +100,9 @@ int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f)
* @param stream the stream to initialize * @param stream the stream to initialize
* @returns the number of bytes sent * @returns the number of bytes sent
*/ */
ssize_t yamux_stream_init(struct yamux_stream* stream) ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx)
{ {
if (!stream || stream->state != yamux_stream_inited || stream->session->closed) { if (!channel_ctx || channel_ctx->state != yamux_stream_inited || channel_ctx->closed) {
return -EINVAL; return -EINVAL;
} }
@ -110,13 +110,13 @@ ssize_t yamux_stream_init(struct yamux_stream* stream)
.version = YAMUX_VERSION, .version = YAMUX_VERSION,
.type = yamux_frame_window_update, .type = yamux_frame_window_update,
.flags = yamux_frame_syn, .flags = yamux_frame_syn,
.streamid = stream->id, .streamid = channel_ctx->channel,
.length = 0 .length = 0
}; };
stream->state = yamux_stream_syn_sent; channel_ctx->state = yamux_stream_syn_sent;
return yamux_write_frame(stream, &f); return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
} }
/*** /***
@ -124,22 +124,22 @@ ssize_t yamux_stream_init(struct yamux_stream* stream)
* @param stream the stream * @param stream the stream
* @returns the number of bytes sent * @returns the number of bytes sent
*/ */
ssize_t yamux_stream_close(struct yamux_stream* stream) ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx)
{ {
if (!stream || stream->state != yamux_stream_est || stream->session->closed) if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed)
return -EINVAL; return -EINVAL;
struct yamux_frame f = (struct yamux_frame){ struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION, .version = YAMUX_VERSION,
.type = yamux_frame_window_update, .type = yamux_frame_window_update,
.flags = yamux_frame_fin, .flags = yamux_frame_fin,
.streamid = stream->id, .streamid = channel_ctx->channel,
.length = 0 .length = 0
}; };
stream->state = yamux_stream_closing; channel_ctx->state = yamux_stream_closing;
return yamux_write_frame(stream, &f); return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
} }
/** /**
@ -147,33 +147,33 @@ ssize_t yamux_stream_close(struct yamux_stream* stream)
* @param stream the stream * @param stream the stream
* @returns the number of bytes sent * @returns the number of bytes sent
*/ */
ssize_t yamux_stream_reset(struct yamux_stream* stream) ssize_t yamux_stream_reset(struct YamuxChannelContext* channel_ctx)
{ {
if (!stream || stream->session->closed) if (!channel_ctx || channel_ctx->closed)
return -EINVAL; return -EINVAL;
struct yamux_frame f = (struct yamux_frame){ struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION, .version = YAMUX_VERSION,
.type = yamux_frame_window_update, .type = yamux_frame_window_update,
.flags = yamux_frame_rst, .flags = yamux_frame_rst,
.streamid = stream->id, .streamid = channel_ctx->channel,
.length = 0 .length = 0
}; };
stream->state = yamux_stream_closed; channel_ctx->state = yamux_stream_closed;
return yamux_write_frame(stream, &f); return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
} }
static enum yamux_frame_flags get_flags(struct yamux_stream* stream) static enum yamux_frame_flags get_flags(struct YamuxChannelContext* ctx)
{ {
switch (stream->state) switch (ctx->state)
{ {
case yamux_stream_inited: case yamux_stream_inited:
stream->state = yamux_stream_syn_sent; ctx->state = yamux_stream_syn_sent;
return yamux_frame_syn; return yamux_frame_syn;
case yamux_stream_syn_recv: case yamux_stream_syn_recv:
stream->state = yamux_stream_est; ctx->state = yamux_stream_est;
return yamux_frame_ack; return yamux_frame_ack;
default: default:
return 0; return 0;
@ -186,21 +186,21 @@ static enum yamux_frame_flags get_flags(struct yamux_stream* stream)
* @param delta the new window size * @param delta the new window size
* @returns number of bytes sent * @returns number of bytes sent
*/ */
ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) ssize_t yamux_stream_window_update(struct YamuxChannelContext* channel_ctx, int32_t delta)
{ {
if (!stream || stream->state == yamux_stream_closed if (!channel_ctx || channel_ctx->state == yamux_stream_closed
|| stream->state == yamux_stream_closing || stream->session->closed) || channel_ctx->state == yamux_stream_closing || channel_ctx->closed)
return -EINVAL; return -EINVAL;
struct yamux_frame f = (struct yamux_frame){ struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION, .version = YAMUX_VERSION,
.type = yamux_frame_window_update, .type = yamux_frame_window_update,
.flags = get_flags(stream), .flags = get_flags(channel_ctx),
.streamid = stream->id, .streamid = channel_ctx->channel,
.length = (uint32_t)delta .length = (uint32_t)delta
}; };
return yamux_write_frame(stream, &f); return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f);
} }
/*** /***
@ -210,19 +210,23 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta)
* @param data_ the data to be sent * @param data_ the data to be sent
* @return the number of bytes sent * @return the number of bytes sent
*/ */
ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_) ssize_t yamux_stream_write(struct YamuxChannelContext* channel_ctx, uint32_t data_length, void* data_)
{ {
// validate parameters // validate parameters
if (channel_ctx == NULL || data_ == NULL || data_length == 0)
return -EINVAL;
/*
if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed
|| stream->state == yamux_stream_closing || stream->session->closed) || stream->state == yamux_stream_closing || stream->session->closed)
return -EINVAL; return -EINVAL;
*/
// gather details // gather details
char* data = (char*)data_; char* data = (char*)data_;
struct yamux_session* s = stream->session;
char* data_end = data + data_length; char* data_end = data + data_length;
uint32_t ws = stream->window_size; uint32_t ws = channel_ctx->window_size;
yamux_streamid id = stream->id; int id = channel_ctx->channel;
char sendd[ws + sizeof(struct yamux_frame)]; char sendd[ws + sizeof(struct yamux_frame)];
// Send the data, breaking it up into pieces if it is too large // Send the data, breaking it up into pieces if it is too large
@ -233,7 +237,7 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo
struct yamux_frame f = (struct yamux_frame){ struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION , .version = YAMUX_VERSION ,
.type = yamux_frame_data, .type = yamux_frame_data,
.flags = get_flags(stream), .flags = get_flags(channel_ctx),
.streamid = id, .streamid = id,
.length = adv .length = adv
}; };
@ -248,7 +252,7 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)sendd; outgoing.data = (uint8_t*)sendd;
outgoing.data_size = adv + sizeof(struct yamux_frame); outgoing.data_size = adv + sizeof(struct yamux_frame);
if (!s->parent_stream->write(s->parent_stream->stream_context, &outgoing)) if (!channel_ctx->yamux_context->stream->parent_stream->write(channel_ctx->yamux_context->stream->parent_stream->stream_context, &outgoing))
return adv; return adv;
// prepare to loop again // prepare to loop again

View File

@ -59,12 +59,12 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
* @param context the SessionContext * @param context the SessionContext
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_send_protocol(struct SessionContext* context) { int yamux_send_protocol(struct YamuxContext* context) {
char* protocol = "/yamux/1.0.0\n"; char* protocol = "/yamux/1.0.0\n";
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol; outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol); outgoing.data_size = strlen(protocol);
if (!context->default_stream->write(context, &outgoing)) if (!context->stream->parent_stream->write(context->stream->parent_stream->stream_context, &outgoing))
return 0; return 0;
return 1; return 1;
} }
@ -75,12 +75,12 @@ int yamux_send_protocol(struct SessionContext* context) {
* @param context the SessionContext * @param context the SessionContext
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_receive_protocol(struct SessionContext* context) { int yamux_receive_protocol(struct YamuxContext* context) {
char* protocol = "/yamux/1.0.0\n"; char* protocol = "/yamux/1.0.0\n";
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
int retVal = 0; int retVal = 0;
if (!context->default_stream->read(context, &results, 30)) { if (!context->stream->parent_stream->read(context->stream->parent_stream->stream_context, &results, 30)) {
libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n");
goto exit; goto exit;
} }
@ -104,6 +104,12 @@ int yamux_receive_protocol(struct SessionContext* context) {
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
struct YamuxContext* ctx = (struct YamuxContext*)protocol_context;
// we should have the yamux protocol in msg. Send the protocol back.
if (!yamux_send_protocol(ctx)) {
return 0;
}
/*
struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context); struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(msg->data_size); uint8_t* buf = (uint8_t*) malloc(msg->data_size);
if (buf == NULL) if (buf == NULL)
@ -119,7 +125,7 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext*
// TODO need more information as to what this loop should do // TODO need more information as to what this loop should do
} }
} }
*/
return 1; return 1;
} }
@ -143,12 +149,18 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector*
return handler; return handler;
} }
/***
* Close the stream and clean up all resources
* NOTE: This also goes through the channels
* @param stream_context the YamuxContext
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_close(void* stream_context) { int libp2p_yamux_close(void* stream_context) {
if (stream_context == NULL) if (stream_context == NULL)
return 0; return 0;
struct YamuxContext* ctx = (struct YamuxContext*)stream_context; struct YamuxContext* ctx = (struct YamuxContext*)stream_context;
libp2p_yamux_stream_free(ctx->stream); libp2p_yamux_stream_free(ctx->stream);
return 0; return 1;
} }
/** /**
@ -174,13 +186,13 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
ctx = (struct YamuxContext*)stream_context; ctx = (struct YamuxContext*)stream_context;
} }
if (channel != NULL && channel->channel != NULL) { if (channel != NULL && channel->channel != 0) {
// we have an established channel. Use it. // we have an established channel. Use it.
if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout)) if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout))
return 0; return 0;
// TODO: This is not right. It must be sorted out. // TODO: This is not right. It must be sorted out.
struct StreamMessage* msg = *message; struct StreamMessage* msg = *message;
return yamux_decode(channel->channel->session, msg->data, msg->data_size); return yamux_decode(channel, msg->data, msg->data_size);
} else if (ctx != NULL) { } else if (ctx != NULL) {
// We are still negotiating... // We are still negotiating...
return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout); return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout);
@ -209,9 +221,9 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) {
ctx = (struct YamuxContext*)stream_context; ctx = (struct YamuxContext*)stream_context;
} }
if (channel != NULL && channel->channel != NULL) { if (channel != NULL && channel->channel != 0) {
// we have an established channel. Use it. // we have an established channel. Use it.
return yamux_stream_write(channel->channel, message->data_size, message->data); return yamux_stream_write(channel, message->data_size, message->data);
} else if (ctx != NULL) { } else if (ctx != NULL) {
// We are still negotiating... // We are still negotiating...
return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message); return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message);
@ -251,10 +263,21 @@ struct YamuxContext* libp2p_yamux_context_new() {
return ctx; return ctx;
} }
/***
* Free the resources from libp2p_yamux_context_new
* @param ctx the context
*/
void libp2p_yamux_context_free(struct YamuxContext* ctx) { void libp2p_yamux_context_free(struct YamuxContext* ctx) {
if (ctx == NULL) if (ctx == NULL)
return; return;
libp2p_utils_vector_free(ctx->channels); // free all the channels
if (ctx->channels) {
for(int i = 0; i < ctx->channels->total; i++) {
struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i);
curr->close(curr->stream_context);
}
libp2p_utils_vector_free(ctx->channels);
}
free(ctx); free(ctx);
return; return;
} }
@ -281,6 +304,8 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data);
goto exit; goto exit;
} }
libp2p_stream_message_free(results);
results = NULL;
haveTheirs = 1; haveTheirs = 1;
} }
@ -314,7 +339,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
retVal = 1; retVal = 1;
exit: exit:
if (results != NULL) if (results != NULL)
free(results); libp2p_stream_message_free(results);
return retVal; return retVal;
} }
@ -377,16 +402,32 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
// the negotiation was successful. Add it to the list of channels that we have // the negotiation was successful. Add it to the list of channels that we have
int itemNo = libp2p_utils_vector_add(ctx->channels, stream); int itemNo = libp2p_utils_vector_add(ctx->channels, stream);
struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context; struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context;
if (incoming->channel == NULL) { if (incoming->channel != 0) {
// this is wrong. There should have been a yamux_stream there // this is wrong. There should have not been a channel number
return 0; return 0;
} }
incoming->channel->id = itemNo; incoming->channel = itemNo;
return 1; return 1;
} }
return 0; return 0;
} }
/**
* Clean up resources from libp2p_yamux_channel_new
* @param ctx the YamuxChannelContext
*/
int libp2p_yamux_channel_close(void* context) {
if (context == NULL)
return 0;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context;
if (ctx != NULL) {
if (ctx->stream != NULL)
free(ctx->stream);
free(ctx);
}
return 1;
}
/** /**
* Create a stream that has a "YamuxChannelContext" related to this yamux protocol * Create a stream that has a "YamuxChannelContext" related to this yamux protocol
* @param parent_stream the parent yamux stream * @param parent_stream the parent yamux stream
@ -396,15 +437,20 @@ struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) {
struct Stream* out = libp2p_stream_new(); struct Stream* out = libp2p_stream_new();
if (out != NULL) { if (out != NULL) {
out->address = parent_stream->address; out->address = parent_stream->address;
out->close = parent_stream->close; out->close = libp2p_yamux_channel_close;
out->parent_stream = parent_stream; out->parent_stream = parent_stream;
out->peek = parent_stream->peek; out->peek = parent_stream->peek;
out->read = parent_stream->read; out->read = parent_stream->read;
out->read_raw = parent_stream->read_raw; out->read_raw = parent_stream->read_raw;
out->socket_mutex = parent_stream->socket_mutex; out->socket_mutex = parent_stream->socket_mutex;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
ctx->channel = 0;
ctx->closed = 0;
ctx->state = 0;
ctx->window_size = 0;
ctx->type = YAMUX_CHANNEL_CONTEXT; ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->yamux_context = parent_stream->stream_context; ctx->yamux_context = parent_stream->stream_context;
ctx->stream = out;
out->stream_context = ctx; out->stream_context = ctx;
out->write = parent_stream->write; out->write = parent_stream->write;
} }