From 18b0139b81637be36d5172d01d2a9d3eeff10a8f Mon Sep 17 00:00:00 2001 From: John Jones Date: Mon, 6 Nov 2017 16:38:55 -0500 Subject: [PATCH] squashed some memory leak bugs in yamux --- identify/identify.c | 16 +++++- include/libp2p/identify/identify.h | 1 + include/libp2p/yamux/session.h | 5 +- include/libp2p/yamux/stream.h | 11 +++-- include/libp2p/yamux/yamux.h | 16 ++++-- test/test_yamux.h | 12 +++-- yamux/session.c | 41 ++++++++-------- yamux/stream.c | 70 ++++++++++++++------------- yamux/yamux.c | 78 ++++++++++++++++++++++++------ 9 files changed, 167 insertions(+), 83 deletions(-) diff --git a/identify/identify.c b/identify/identify.c index affb7f5..cb69d4b 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -65,9 +65,11 @@ int libp2p_identify_receive_protocol(struct IdentifyContext* context) { if (results->data[0] != '/') start = 1; char* ptr = strstr((char*)&results->data[start], protocol); - if (ptr == NULL || ptr - (char*)results > 1) { + if (ptr == NULL || ptr - (char*)&results->data[start] > 1) { + libp2p_stream_message_free(results); return 0; } + libp2p_stream_message_free(results); return 1; } @@ -106,6 +108,16 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp return handler; } +int libp2p_identify_close(void* stream_context) { + if (stream_context == NULL) + return 0; + struct IdentifyContext* ctx = (struct IdentifyContext*)stream_context; + ctx->parent_stream->close(ctx->parent_stream->stream_context); + free(ctx->stream); + free(ctx); + return 1; +} + /*** * Create a new stream that negotiates the identify protocol * @@ -128,7 +140,9 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { return NULL; } ctx->parent_stream = parent_stream; + ctx->stream = out; out->stream_context = ctx; + out->close = libp2p_identify_close; if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) { libp2p_stream_free(out); free(ctx); diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index a149f4d..2311d25 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -25,6 +25,7 @@ typedef struct { struct IdentifyContext { struct Stream* parent_stream; + struct Stream* stream; }; int libp2p_identify_can_handle(const struct StreamMessage* msg); diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h index 31681ee..fc116cc 100644 --- a/include/libp2p/yamux/session.h +++ b/include/libp2p/yamux/session.h @@ -9,6 +9,7 @@ #include "frame.h" #include "stream.h" #include "libp2p/net/stream.h" +//#include "libp2p/yamux/yamux.h" enum yamux_session_type { @@ -91,10 +92,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po // defers to stream read handlers ssize_t yamux_session_read(struct yamux_session* session); +struct YamuxChannelContext; /** * Decode an incoming message + * @param channel the channel * @param incoming the incoming bytes * @param incoming_size the size of the incoming bytes * @returns true(1) on success, false(0) otherwise */ -int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size); +int yamux_decode(struct YamuxChannelContext* channel, const uint8_t* incoming, size_t incoming_size); diff --git a/include/libp2p/yamux/stream.h b/include/libp2p/yamux/stream.h index d7cbe3d..3984313 100644 --- a/include/libp2p/yamux/stream.h +++ b/include/libp2p/yamux/stream.h @@ -5,6 +5,7 @@ #include "session.h" #include "libp2p/conn/session.h" +#include "libp2p/yamux/yamux.h" // NOTE: 'data' is not guaranteed to be preserved when the read_fn // handler exists (read: it will be freed). @@ -48,18 +49,18 @@ struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_strea // not obligatory, SYN is sent by yamux_stream_write when the stream // isn't initialised anyway -ssize_t yamux_stream_init (struct yamux_stream* stream); +ssize_t yamux_stream_init (struct YamuxChannelContext* channel_ctx); // doesn't free the stream // uses FIN -ssize_t yamux_stream_close(struct yamux_stream* stream); +ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx); // uses RST -ssize_t yamux_stream_reset(struct yamux_stream* stream); +ssize_t yamux_stream_reset(struct YamuxChannelContext* stream); void yamux_stream_free(struct yamux_stream* stream); -ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta); -ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data); +ssize_t yamux_stream_window_update(struct YamuxChannelContext* ctx, int32_t delta); +ssize_t yamux_stream_write(struct YamuxChannelContext* ctx, uint32_t data_length, void* data); /*** * process stream diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 0b30d0a..27218eb 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -26,7 +26,15 @@ struct YamuxContext { struct YamuxChannelContext { char type; struct YamuxContext* yamux_context; - struct yamux_stream* channel; + struct Stream* stream; + // the channel number + int channel; + // the window size for this channel + int window_size; + // the state of the connection + int state; + // whether or not the connection is closed + int closed; }; /** @@ -39,7 +47,7 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(); * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct SessionContext* context); +int yamux_send_protocol(struct YamuxContext* context); /*** * Check to see if the reply is the yamux protocol header we expect @@ -47,7 +55,7 @@ int yamux_send_protocol(struct SessionContext* context); * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_receive_protocol(struct SessionContext* context); +int yamux_receive_protocol(struct YamuxContext* context); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); @@ -67,3 +75,5 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream); * @returns a new Stream that is a YamuxChannelContext */ struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream); + +void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx); diff --git a/test/test_yamux.h b/test/test_yamux.h index 99430a6..5d5a90c 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -13,8 +13,10 @@ int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { *msg = libp2p_stream_message_new(); struct StreamMessage* message = *msg; - message->data = "/yamux/1.0.0\n"; - message->data_size = strlen(message->data); + const char* id = "/yamux/1.0.0\n"; + message->data_size = strlen(id); + message->data = malloc(message->data_size); + memcpy(message->data, id, message->data_size); return 1; } @@ -24,8 +26,10 @@ int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int netw int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { *msg = libp2p_stream_message_new(); struct StreamMessage* message = *msg; - message->data = "/ipfs/id/1.0.0\n"; - message->data_size = strlen(message->data); + const char* id = "/ipfs/id/1.0.0\n"; + message->data_size = strlen(id); + message->data = malloc(message->data_size); + memcpy(message->data, id, message->data_size); return 1; } diff --git a/yamux/session.c b/yamux/session.c index d97afaa..1952ea9 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -11,6 +11,7 @@ #include "libp2p/os/timespec.h" #include "libp2p/yamux/session.h" #include "libp2p/yamux/stream.h" +#include "libp2p/yamux/yamux.h" static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; @@ -151,14 +152,14 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po * @param incoming_size the size of the incoming bytes * @returns true(1) on success, false(0) otherwise */ -int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size) { +int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* incoming, size_t incoming_size) { // decode frame struct yamux_frame f; if (incoming_size < sizeof(struct yamux_frame)) { return 0; } - memcpy(f, incoming, sizeof(struct yamux_frame)); + memcpy((void*)&f, incoming, sizeof(struct yamux_frame)); decode_frame(&f); @@ -172,14 +173,14 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t case yamux_frame_ping: // ping if (f.flags & yamux_frame_syn) { - yamux_session_ping(session, f.length, 1); + yamux_session_ping(channelContext->yamux_context->session, f.length, 1); - if (session->ping_fn) - session->ping_fn(session, f.length); + if (channelContext->yamux_context->session->ping_fn) + channelContext->yamux_context->session->ping_fn(channelContext->yamux_context->session, f.length); } - else if ((f.flags & yamux_frame_ack) && session->pong_fn) + else if ((f.flags & yamux_frame_ack) && channelContext->yamux_context->session->pong_fn) { - struct timespec now, dt, last = session->since_ping; + struct timespec now, dt, last = channelContext->yamux_context->session->since_ping; if (!timespec_get(&now, TIME_UTC)) return -EACCES; @@ -192,23 +193,23 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t else dt.tv_nsec = now.tv_nsec - last.tv_nsec; - session->pong_fn(session, f.length, dt); + channelContext->yamux_context->session->pong_fn(channelContext->yamux_context->session, f.length, dt); } else return -EPROTO; break; case yamux_frame_go_away: // go away (hanging up) - session->closed = 1; - if (session->go_away_fn) - session->go_away_fn(session, (enum yamux_error)f.length); + channelContext->yamux_context->session->closed = 1; + if (channelContext->yamux_context->session->go_away_fn) + channelContext->yamux_context->session->go_away_fn(channelContext->yamux_context->session, (enum yamux_error)f.length); break; default: return -EPROTO; } else { // we're handling a stream, not something at the yamux protocol level - for (size_t i = 0; i < session->cap_streams; ++i) + for (size_t i = 0; i < channelContext->yamux_context->session->cap_streams; ++i) { - struct yamux_session_stream* ss = &session->streams[i]; + struct yamux_session_stream* ss = &channelContext->yamux_context->session->streams[i]; struct yamux_stream* s = ss->stream; if (!ss->alive || s->state == yamux_stream_closed) @@ -227,7 +228,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t { // local stream didn't initiate FIN if (s->state != yamux_stream_closing) - yamux_stream_close(s); + yamux_stream_close(channelContext); s->state = yamux_stream_closed; @@ -245,7 +246,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t return -EPROTO; int sz = sizeof(struct yamux_frame); - ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->parent_stream->stream_context); + ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, channelContext->yamux_context->stream->parent_stream->stream_context); return (re < 0) ? re : (re + incoming_size); } } @@ -256,13 +257,13 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t { void* ud = NULL; // user data - if (session->get_str_ud_fn) - ud = session->get_str_ud_fn(session, f.streamid); + if (channelContext->yamux_context->session->get_str_ud_fn) + ud = channelContext->yamux_context->session->get_str_ud_fn(channelContext->yamux_context->session, f.streamid); - struct yamux_stream* st = yamux_stream_new(session, f.streamid, ud); + struct yamux_stream* st = yamux_stream_new(channelContext->yamux_context->session, f.streamid, ud); - if (session->new_stream_fn) - session->new_stream_fn(session, st); + if (channelContext->yamux_context->session->new_stream_fn) + channelContext->yamux_context->session->new_stream_fn(channelContext->yamux_context->session, st); st->state = yamux_stream_syn_recv; } diff --git a/yamux/stream.c b/yamux/stream.c index 98747d1..52e8825 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -85,12 +85,12 @@ FOUND:; * @param yamux_stream the stream context * @param f the frame */ -int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f) { +int yamux_write_frame(struct YamuxContext* ctx, struct yamux_frame* f) { encode_frame(f); struct StreamMessage outgoing; outgoing.data = (uint8_t*)f; outgoing.data_size = sizeof(struct yamux_frame); - if (!yamux_stream->session->parent_stream->write(yamux_stream->session->parent_stream->stream_context, &outgoing)) + if (!ctx->stream->write(ctx->stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } @@ -100,9 +100,9 @@ int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f) * @param stream the stream to initialize * @returns the number of bytes sent */ -ssize_t yamux_stream_init(struct yamux_stream* stream) +ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx) { - if (!stream || stream->state != yamux_stream_inited || stream->session->closed) { + if (!channel_ctx || channel_ctx->state != yamux_stream_inited || channel_ctx->closed) { return -EINVAL; } @@ -110,13 +110,13 @@ ssize_t yamux_stream_init(struct yamux_stream* stream) .version = YAMUX_VERSION, .type = yamux_frame_window_update, .flags = yamux_frame_syn, - .streamid = stream->id, + .streamid = channel_ctx->channel, .length = 0 }; - stream->state = yamux_stream_syn_sent; + channel_ctx->state = yamux_stream_syn_sent; - return yamux_write_frame(stream, &f); + return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); } /*** @@ -124,22 +124,22 @@ ssize_t yamux_stream_init(struct yamux_stream* stream) * @param stream the stream * @returns the number of bytes sent */ -ssize_t yamux_stream_close(struct yamux_stream* stream) +ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx) { - if (!stream || stream->state != yamux_stream_est || stream->session->closed) + if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed) return -EINVAL; struct yamux_frame f = (struct yamux_frame){ .version = YAMUX_VERSION, .type = yamux_frame_window_update, .flags = yamux_frame_fin, - .streamid = stream->id, + .streamid = channel_ctx->channel, .length = 0 }; - stream->state = yamux_stream_closing; + channel_ctx->state = yamux_stream_closing; - return yamux_write_frame(stream, &f); + return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); } /** @@ -147,33 +147,33 @@ ssize_t yamux_stream_close(struct yamux_stream* stream) * @param stream the stream * @returns the number of bytes sent */ -ssize_t yamux_stream_reset(struct yamux_stream* stream) +ssize_t yamux_stream_reset(struct YamuxChannelContext* channel_ctx) { - if (!stream || stream->session->closed) + if (!channel_ctx || channel_ctx->closed) return -EINVAL; struct yamux_frame f = (struct yamux_frame){ .version = YAMUX_VERSION, .type = yamux_frame_window_update, .flags = yamux_frame_rst, - .streamid = stream->id, + .streamid = channel_ctx->channel, .length = 0 }; - stream->state = yamux_stream_closed; + channel_ctx->state = yamux_stream_closed; - return yamux_write_frame(stream, &f); + return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); } -static enum yamux_frame_flags get_flags(struct yamux_stream* stream) +static enum yamux_frame_flags get_flags(struct YamuxChannelContext* ctx) { - switch (stream->state) + switch (ctx->state) { case yamux_stream_inited: - stream->state = yamux_stream_syn_sent; + ctx->state = yamux_stream_syn_sent; return yamux_frame_syn; case yamux_stream_syn_recv: - stream->state = yamux_stream_est; + ctx->state = yamux_stream_est; return yamux_frame_ack; default: return 0; @@ -186,21 +186,21 @@ static enum yamux_frame_flags get_flags(struct yamux_stream* stream) * @param delta the new window size * @returns number of bytes sent */ -ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) +ssize_t yamux_stream_window_update(struct YamuxChannelContext* channel_ctx, int32_t delta) { - if (!stream || stream->state == yamux_stream_closed - || stream->state == yamux_stream_closing || stream->session->closed) + if (!channel_ctx || channel_ctx->state == yamux_stream_closed + || channel_ctx->state == yamux_stream_closing || channel_ctx->closed) return -EINVAL; struct yamux_frame f = (struct yamux_frame){ .version = YAMUX_VERSION, .type = yamux_frame_window_update, - .flags = get_flags(stream), - .streamid = stream->id, + .flags = get_flags(channel_ctx), + .streamid = channel_ctx->channel, .length = (uint32_t)delta }; - return yamux_write_frame(stream, &f); + return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); } /*** @@ -210,19 +210,23 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) * @param data_ the data to be sent * @return the number of bytes sent */ -ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_) +ssize_t yamux_stream_write(struct YamuxChannelContext* channel_ctx, uint32_t data_length, void* data_) { // validate parameters + if (channel_ctx == NULL || data_ == NULL || data_length == 0) + return -EINVAL; + /* if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed || stream->state == yamux_stream_closing || stream->session->closed) return -EINVAL; + */ // gather details char* data = (char*)data_; - struct yamux_session* s = stream->session; char* data_end = data + data_length; - uint32_t ws = stream->window_size; - yamux_streamid id = stream->id; + uint32_t ws = channel_ctx->window_size; + int id = channel_ctx->channel; + char sendd[ws + sizeof(struct yamux_frame)]; // Send the data, breaking it up into pieces if it is too large @@ -233,7 +237,7 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo struct yamux_frame f = (struct yamux_frame){ .version = YAMUX_VERSION , .type = yamux_frame_data, - .flags = get_flags(stream), + .flags = get_flags(channel_ctx), .streamid = id, .length = adv }; @@ -248,7 +252,7 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo struct StreamMessage outgoing; outgoing.data = (uint8_t*)sendd; outgoing.data_size = adv + sizeof(struct yamux_frame); - if (!s->parent_stream->write(s->parent_stream->stream_context, &outgoing)) + if (!channel_ctx->yamux_context->stream->parent_stream->write(channel_ctx->yamux_context->stream->parent_stream->stream_context, &outgoing)) return adv; // prepare to loop again diff --git a/yamux/yamux.c b/yamux/yamux.c index 6833bed..5a322ad 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -59,12 +59,12 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_send_protocol(struct SessionContext* context) { +int yamux_send_protocol(struct YamuxContext* context) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); - if (!context->default_stream->write(context, &outgoing)) + if (!context->stream->parent_stream->write(context->stream->parent_stream->stream_context, &outgoing)) return 0; return 1; } @@ -75,12 +75,12 @@ int yamux_send_protocol(struct SessionContext* context) { * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ -int yamux_receive_protocol(struct SessionContext* context) { +int yamux_receive_protocol(struct YamuxContext* context) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage* results = NULL; int retVal = 0; - if (!context->default_stream->read(context, &results, 30)) { + if (!context->stream->parent_stream->read(context->stream->parent_stream->stream_context, &results, 30)) { libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); goto exit; } @@ -104,6 +104,12 @@ int yamux_receive_protocol(struct SessionContext* context) { * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { + struct YamuxContext* ctx = (struct YamuxContext*)protocol_context; + // we should have the yamux protocol in msg. Send the protocol back. + if (!yamux_send_protocol(ctx)) { + return 0; + } + /* struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context); uint8_t* buf = (uint8_t*) malloc(msg->data_size); if (buf == NULL) @@ -119,7 +125,7 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* // TODO need more information as to what this loop should do } } - + */ return 1; } @@ -143,12 +149,18 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* return handler; } +/*** + * Close the stream and clean up all resources + * NOTE: This also goes through the channels + * @param stream_context the YamuxContext + * @returns true(1) on success, false(0) otherwise + */ int libp2p_yamux_close(void* stream_context) { if (stream_context == NULL) return 0; struct YamuxContext* ctx = (struct YamuxContext*)stream_context; libp2p_yamux_stream_free(ctx->stream); - return 0; + return 1; } /** @@ -174,13 +186,13 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int ctx = (struct YamuxContext*)stream_context; } - if (channel != NULL && channel->channel != NULL) { + if (channel != NULL && channel->channel != 0) { // we have an established channel. Use it. if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout)) return 0; // TODO: This is not right. It must be sorted out. struct StreamMessage* msg = *message; - return yamux_decode(channel->channel->session, msg->data, msg->data_size); + return yamux_decode(channel, msg->data, msg->data_size); } else if (ctx != NULL) { // We are still negotiating... return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout); @@ -209,9 +221,9 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { ctx = (struct YamuxContext*)stream_context; } - if (channel != NULL && channel->channel != NULL) { + if (channel != NULL && channel->channel != 0) { // we have an established channel. Use it. - return yamux_stream_write(channel->channel, message->data_size, message->data); + return yamux_stream_write(channel, message->data_size, message->data); } else if (ctx != NULL) { // We are still negotiating... return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message); @@ -251,10 +263,21 @@ struct YamuxContext* libp2p_yamux_context_new() { return ctx; } +/*** + * Free the resources from libp2p_yamux_context_new + * @param ctx the context + */ void libp2p_yamux_context_free(struct YamuxContext* ctx) { if (ctx == NULL) return; - libp2p_utils_vector_free(ctx->channels); + // free all the channels + if (ctx->channels) { + for(int i = 0; i < ctx->channels->total; i++) { + struct Stream* curr = (struct Stream*) libp2p_utils_vector_get(ctx->channels, i); + curr->close(curr->stream_context); + } + libp2p_utils_vector_free(ctx->channels); + } free(ctx); return; } @@ -281,6 +304,8 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); goto exit; } + libp2p_stream_message_free(results); + results = NULL; haveTheirs = 1; } @@ -314,7 +339,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { retVal = 1; exit: if (results != NULL) - free(results); + libp2p_stream_message_free(results); return retVal; } @@ -377,16 +402,32 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { // the negotiation was successful. Add it to the list of channels that we have int itemNo = libp2p_utils_vector_add(ctx->channels, stream); struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context; - if (incoming->channel == NULL) { - // this is wrong. There should have been a yamux_stream there + if (incoming->channel != 0) { + // this is wrong. There should have not been a channel number return 0; } - incoming->channel->id = itemNo; + incoming->channel = itemNo; return 1; } return 0; } +/** + * Clean up resources from libp2p_yamux_channel_new + * @param ctx the YamuxChannelContext + */ +int libp2p_yamux_channel_close(void* context) { + if (context == NULL) + return 0; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; + if (ctx != NULL) { + if (ctx->stream != NULL) + free(ctx->stream); + free(ctx); + } + return 1; +} + /** * Create a stream that has a "YamuxChannelContext" related to this yamux protocol * @param parent_stream the parent yamux stream @@ -396,15 +437,20 @@ struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) { struct Stream* out = libp2p_stream_new(); if (out != NULL) { out->address = parent_stream->address; - out->close = parent_stream->close; + out->close = libp2p_yamux_channel_close; out->parent_stream = parent_stream; out->peek = parent_stream->peek; out->read = parent_stream->read; out->read_raw = parent_stream->read_raw; out->socket_mutex = parent_stream->socket_mutex; struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); + ctx->channel = 0; + ctx->closed = 0; + ctx->state = 0; + ctx->window_size = 0; ctx->type = YAMUX_CHANNEL_CONTEXT; ctx->yamux_context = parent_stream->stream_context; + ctx->stream = out; out->stream_context = ctx; out->write = parent_stream->write; }