diff --git a/identify/identify.c b/identify/identify.c index 1aa1365..affb7f5 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -40,7 +40,7 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) { struct StreamMessage msg; msg.data = (uint8_t*) protocol; msg.data_size = strlen(protocol); - if (!context->parent_stream->write(context, &msg)) { + if (!context->parent_stream->write(context->parent_stream->stream_context, &msg)) { libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); return 0; } @@ -56,21 +56,34 @@ int libp2p_identify_send_protocol(struct IdentifyContext *context) { int libp2p_identify_receive_protocol(struct IdentifyContext* context) { const char *protocol = "/ipfs/id/1.0.0\n"; struct StreamMessage* results = NULL; - if (!context->parent_stream->read(context, &results, 30)) { + if (!context->parent_stream->read(context->parent_stream->stream_context, &results, 30)) { libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); return 0; } - // the first byte is the size, so skip it - char* ptr = strstr((char*)&results[1], protocol); + // the first byte may be the size, so skip it + int start = 0; + if (results->data[0] != '/') + start = 1; + char* ptr = strstr((char*)&results->data[start], protocol); if (ptr == NULL || ptr - (char*)results > 1) { return 0; } return 1; } +/** + * A remote node is attempting to send us an Identify message + * @param msg the message sent + * @param context the SessionContext + * @param protocol_context the identify protocol context + * @returns <0 on error, 0 if loop should not continue, >0 on success + */ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context) { - //TODO: Implement - return 0; + if (protocol_context == NULL) + return -1; + //struct IdentifyContext* ctx = (struct IdentifyContext*) protocol_context; + // TODO: Do something with the incoming msg + return 1; } /** @@ -108,6 +121,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { return NULL; struct Stream* out = libp2p_stream_new(); if (out != NULL) { + out->parent_stream = parent_stream; struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext)); if (ctx == NULL) { libp2p_stream_free(out); diff --git a/include/libp2p/identify/identify.h b/include/libp2p/identify/identify.h index 940fcd4..a149f4d 100644 --- a/include/libp2p/identify/identify.h +++ b/include/libp2p/identify/identify.h @@ -1,5 +1,7 @@ #pragma once +#include "libp2p/utils/vector.h" + typedef struct { // publicKey is this node's public key (which also gives its node.ID) // - may not need to be sent, as secure channel implies it has been sent. diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index bf20699..a6a920a 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -46,7 +46,6 @@ struct Stream { struct MultiAddress* address; // helps identify who is on the other end pthread_mutex_t* socket_mutex; // only 1 transmission at a time struct Stream* parent_stream; // what stream wraps this stream - int channel; // the channel this stream uses, for multiplexing protocols such as yamux /** * A generic place to store implementation-specific context items */ diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h index d81b6e8..31681ee 100644 --- a/include/libp2p/yamux/session.h +++ b/include/libp2p/yamux/session.h @@ -3,11 +3,12 @@ #include #include #include +#include #include "config.h" #include "frame.h" #include "stream.h" -#include "libp2p/conn/session.h" +#include "libp2p/net/stream.h" enum yamux_session_type { @@ -57,7 +58,7 @@ struct yamux_session enum yamux_session_type type; - struct SessionContext* session_context; + struct Stream* parent_stream; yamux_streamid nextid; @@ -72,13 +73,14 @@ struct yamux_session * @param userdata user data * @returns the yamux_session struct */ -struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata); +struct yamux_session* yamux_session_new(struct yamux_config* config, struct Stream* parent_stream, enum yamux_session_type type, void* userdata); // does not close the socket, but does close the session -void yamux_session_free(struct yamux_session* session); +void yamux_session_free(struct yamux_session* session); // does not free used memory ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err); + inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err) { return yamux_session_close(session, err); @@ -96,5 +98,3 @@ ssize_t yamux_session_read(struct yamux_session* session); * @returns true(1) on success, false(0) otherwise */ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size); - - diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 1f2e37a..0b30d0a 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -2,6 +2,7 @@ #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" +#include "libp2p/yamux/stream.h" /*** * Declarations for the Yamux protocol @@ -9,15 +10,25 @@ static const int yamux_default_timeout = 10; +static const char YAMUX_CONTEXT = 'Y'; +static const char YAMUX_CHANNEL_CONTEXT = 'C'; + /*** * Context struct for Yamux */ struct YamuxContext { + char type; struct Stream* stream; struct yamux_session* session; struct Libp2pVector* channels; }; +struct YamuxChannelContext { + char type; + struct YamuxContext* yamux_context; + struct yamux_stream* channel; +}; + /** * Build a handler that can handle the yamux protocol */ @@ -40,6 +51,8 @@ int yamux_receive_protocol(struct SessionContext* context); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); +void libp2p_yamux_stream_free(struct Stream* stream); + /**** * Add a stream "channel" to the yamux handler * @param ctx the context @@ -47,3 +60,10 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); * @returns true(1) on success, false(0) otherwise */ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream); + +/** + * Create a stream that has a "YamuxChannelContext" related to this yamux protocol + * @param parent_stream the parent yamux stream + * @returns a new Stream that is a YamuxChannelContext + */ +struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream); diff --git a/net/stream.c b/net/stream.c index 2f7948d..6bd5590 100644 --- a/net/stream.c +++ b/net/stream.c @@ -15,7 +15,6 @@ struct Stream* libp2p_stream_new() { stream->socket_mutex = NULL; stream->stream_context = NULL; stream->write = NULL; - stream->channel = 0; } return stream; } diff --git a/peer/Makefile b/peer/Makefile index 0ff9452..c41dcbb 100644 --- a/peer/Makefile +++ b/peer/Makefile @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -O0 -I../include -I../../c-protobuf -I../../c-multihash/include -I../../c-multiaddr/include -g3 -std=c99 +CFLAGS = -O0 -I../include -I../../c-protobuf -I../../c-multihash/include -I../../c-multiaddr/include -g3 -std=c11 LFLAGS = DEPS = OBJS = peer.o peerstore.o providerstore.o diff --git a/test/Makefile b/test/Makefile index 4eac3d7..c53b6d0 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,5 +1,5 @@ CC = gcc -CFLAGS = -O0 -I../include -I. -I../../c-multihash/include -I../../c-multiaddr/include -std=c99 +CFLAGS = -O0 -I../include -I. -I../../c-multihash/include -I../../c-multiaddr/include -std=c11 ifdef DEBUG CFLAGS += -g3 diff --git a/test/mock_stream.h b/test/mock_stream.h new file mode 100644 index 0000000..605cdb9 --- /dev/null +++ b/test/mock_stream.h @@ -0,0 +1,57 @@ +#pragma once + +#include +#include "libp2p/net/stream.h" + +struct MockContext { + struct Stream* stream; +}; + +void mock_stream_free(struct Stream* stream); + +int mock_stream_close(void* context) { + if (context == NULL) + return 1; + struct MockContext* ctx = (struct MockContext*)context; + mock_stream_free(ctx->stream); + return 1; +} + +int mock_stream_peek(void* context) { + return 1; +} + +int mock_stream_read(void* context, struct StreamMessage** msg, int timeout_secs) { + return 1; +} + +int mock_stream_read_raw(void* context, uint8_t* buffer, int buffer_size, int timeout_secs) { + return 1; +} + +int mock_stream_write(void* context, struct StreamMessage* msg) { + return 1; +} + +struct Stream* mock_stream_new() { + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + out->close = mock_stream_close; + out->peek = mock_stream_peek; + out->read = mock_stream_read; + out->read_raw = mock_stream_read_raw; + out->write = mock_stream_write; + struct MockContext* ctx = malloc(sizeof(struct MockContext)); + ctx->stream = out; + out->stream_context = ctx; + } + return out; +} + +void mock_stream_free(struct Stream* stream) { + if (stream == NULL) + return; + if (stream->stream_context != NULL) + free(stream->stream_context); + free(stream); +} diff --git a/test/test_yamux.h b/test/test_yamux.h new file mode 100644 index 0000000..99430a6 --- /dev/null +++ b/test/test_yamux.h @@ -0,0 +1,79 @@ +#pragma once +#include "libp2p/yamux/yamux.h" +#include "libp2p/identify/identify.h" +#include "mock_stream.h" + +/*** + * Helpers + */ + +/*** + * Sends back the yamux protocol to fake negotiation + */ +int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { + *msg = libp2p_stream_message_new(); + struct StreamMessage* message = *msg; + message->data = "/yamux/1.0.0\n"; + message->data_size = strlen(message->data); + return 1; +} + +/*** + * Sends back the yamux protocol to fake negotiation + */ +int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { + *msg = libp2p_stream_message_new(); + struct StreamMessage* message = *msg; + message->data = "/ipfs/id/1.0.0\n"; + message->data_size = strlen(message->data); + return 1; +} + +/*** + * Tests + */ + +/*** + * Verify that we can initiate a yamux session + */ +int test_yamux_stream_new() { + int retVal = 0; + // setup + struct Stream* mock_stream = mock_stream_new(); + mock_stream->read = mock_yamux_read_protocol; + struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); + if (yamux_stream == NULL) + goto exit; + // tear down + retVal = 1; + exit: + if (yamux_stream != NULL) + yamux_stream->close(yamux_stream->stream_context); + mock_stream->close(mock_stream->stream_context); + return retVal; +} + +/*** + * Attempt to add a protocol to the Yamux protocol + */ +int test_yamux_identify() { + int retVal = 0; + // setup + struct Stream* mock_stream = mock_stream_new(); + mock_stream->read = mock_yamux_read_protocol; + struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); + if (yamux_stream == NULL) + goto exit; + // TODO: Now add in another protocol + mock_stream->read = mock_identify_read_protocol; + if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(libp2p_yamux_channel_new(yamux_stream)))) { + goto exit; + } + // tear down + retVal = 1; + exit: + if (yamux_stream != NULL) + yamux_stream->close(yamux_stream->stream_context); + mock_stream->close(mock_stream->stream_context); + return retVal; +} diff --git a/test/testit.c b/test/testit.c index f167d07..6e4a0df 100644 --- a/test/testit.c +++ b/test/testit.c @@ -13,6 +13,7 @@ #include "test_conn.h" #include "test_record.h" #include "test_peer.h" +#include "test_yamux.h" #include "libp2p/utils/logger.h" struct test { @@ -113,6 +114,8 @@ int build_test_collection() { add_test("test_peer_protobuf", test_peer_protobuf,1); add_test("test_peerstore", test_peerstore,1); add_test("test_aes", test_aes, 1); + add_test("test_yamux_stream_new", test_yamux_stream_new, 1); + add_test("test_yamux_identify", test_yamux_identify, 1); return 1; }; diff --git a/yamux/session.c b/yamux/session.c index 61aaa31..d97afaa 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -22,9 +22,9 @@ static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; * @param userdata user data * @returns the yamux_session struct */ -struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata) +struct yamux_session* yamux_session_new(struct yamux_config* config, struct Stream* parent_stream, enum yamux_session_type type, void* userdata) { - if (!session_context) + if (!parent_stream) return NULL; if (!config) @@ -42,7 +42,7 @@ struct yamux_session* yamux_session_new(struct yamux_config* config, struct Sess if (sess != NULL) { sess->config = config; sess->type = type; - sess->session_context = session_context; + sess->parent_stream = parent_stream; sess->closed = 0; sess->nextid = 1 + (type == yamux_session_server); sess->num_streams = 0; @@ -108,7 +108,7 @@ ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err) outgoing.data = (uint8_t*)&f; outgoing.data_size = sizeof(struct yamux_frame); - if (!session->session_context->default_stream->write(session->session_context, &outgoing)) + if (!session->parent_stream->write(session->parent_stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } @@ -139,13 +139,14 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po struct StreamMessage outgoing; outgoing.data = (uint8_t*)&f; outgoing.data_size = sizeof(struct yamux_frame); - if (!session->session_context->default_stream->write(session->session_context, &outgoing)) + if (!session->parent_stream->write(session->parent_stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } /** * Decode an incoming message + * @param session the session * @param incoming the incoming bytes * @param incoming_size the size of the incoming bytes * @returns true(1) on success, false(0) otherwise @@ -157,13 +158,15 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t if (incoming_size < sizeof(struct yamux_frame)) { return 0; } + memcpy(f, incoming, sizeof(struct yamux_frame)); + decode_frame(&f); // check yamux version if (f.version != YAMUX_VERSION) return 0; - if (!f.streamid) // we're not dealing with a stream + if (!f.streamid) // we're not dealing with a stream, we're dealing with something at the yamux protocol level switch (f.type) { case yamux_frame_ping: // ping @@ -202,7 +205,7 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t default: return -EPROTO; } - else { // we're handling a stream + else { // we're handling a stream, not something at the yamux protocol level for (size_t i = 0; i < session->cap_streams; ++i) { struct yamux_session_stream* ss = &session->streams[i]; @@ -242,15 +245,16 @@ int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t return -EPROTO; int sz = sizeof(struct yamux_frame); - ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->session_context); + ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->parent_stream->stream_context); return (re < 0) ? re : (re + incoming_size); } } - // stream doesn't exist yet + // This stream is not in my list of streams. + // It must not exist yet, so let's try to make it if (f.flags & yamux_frame_syn) { - void* ud = NULL; + void* ud = NULL; // user data if (session->get_str_ud_fn) ud = session->get_str_ud_fn(session, f.streamid); diff --git a/yamux/stream.c b/yamux/stream.c index 5ceb321..98747d1 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -13,6 +13,14 @@ #define MIN(x,y) (y^((x^y)&-(xsession->session_context->default_stream->write(yamux_stream->session->session_context, &outgoing)) + if (!yamux_stream->session->parent_stream->write(yamux_stream->session->parent_stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } @@ -196,32 +204,31 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) } /*** - * Write data to the stream - * @param stream the stream + * Write data to the stream. + * @param stream the stream (includes the "channel") * @param data_length the length of the data to be sent * @param data_ the data to be sent * @return the number of bytes sent */ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_) { + // validate parameters if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed || stream->state == yamux_stream_closing || stream->session->closed) return -EINVAL; + // gather details char* data = (char*)data_; - struct yamux_session* s = stream->session; - char* data_end = data + data_length; - uint32_t ws = stream->window_size; yamux_streamid id = stream->id; char sendd[ws + sizeof(struct yamux_frame)]; + // Send the data, breaking it up into pieces if it is too large while (data < data_end) { - uint32_t - dr = (uint32_t)(data_end - data), - adv = MIN(dr, ws); + uint32_t dr = (uint32_t)(data_end - data); // length of the data for this round + uint32_t adv = MIN(dr, ws); // the size of the data we will send this round struct yamux_frame f = (struct yamux_frame){ .version = YAMUX_VERSION , @@ -232,15 +239,19 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo }; encode_frame(&f); + // put the frame into the buffer memcpy(sendd, &f, sizeof(struct yamux_frame)); + // put the data into the buffer memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv); + // send the buffer through the network struct StreamMessage outgoing; outgoing.data = (uint8_t*)sendd; outgoing.data_size = adv + sizeof(struct yamux_frame); - if (!s->session_context->default_stream->write(s->session_context, &outgoing)) + if (!s->parent_stream->write(s->parent_stream->stream_context, &outgoing)) return adv; + // prepare to loop again data += adv; } diff --git a/yamux/yamux.c b/yamux/yamux.c index e27ff19..6833bed 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -35,6 +35,7 @@ int yamux_can_handle(const struct StreamMessage* msg) { * @param msg the message * @param incoming the stream buffer */ +/* void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { struct Libp2pVector* handlers = stream->userdata; int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers); @@ -50,6 +51,7 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { } return; } +*/ /*** * Send the yamux protocol out the default stream @@ -94,7 +96,7 @@ int yamux_receive_protocol(struct SessionContext* context) { } /*** - * Handles the message + * The remote is attempting to negotiate yamux * @param msg the incoming message * @param incoming_size the size of the incoming data buffer * @param session_context the information about the incoming connection @@ -102,8 +104,7 @@ int yamux_receive_protocol(struct SessionContext* context) { * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { - // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream - struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context); + struct yamux_session* yamux = yamux_session_new(NULL, session_context->default_stream, yamux_session_server, protocol_context); uint8_t* buf = (uint8_t*) malloc(msg->data_size); if (buf == NULL) return -1; @@ -119,37 +120,7 @@ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* } } - /* - struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context; - uint8_t* results = NULL; - size_t bytes_read = 0; - int numRetries = 0; - int retVal = 0; - int max_retries = 100; // try for 5 minutes - for(;;) { - // try to read for 5 seconds - if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) { - // we read something from the network. Process it. - // NOTE: If it is a multistream protocol that we are receiving, ignore it. - if (yamux_can_handle(results, bytes_read)) - continue; - numRetries = 0; - retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers); - if (results != NULL) - free(results); - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; - } else { - // we were unable to read from the network. - // if it timed out, we should try again (if we're not out of retries) - if (numRetries >= max_retries) - break; - numRetries++; - } - } - */ - return 0; + return 1; } /** @@ -173,7 +144,10 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* } int libp2p_yamux_close(void* stream_context) { - //TODO: Implement + if (stream_context == NULL) + return 0; + struct YamuxContext* ctx = (struct YamuxContext*)stream_context; + libp2p_yamux_stream_free(ctx->stream); return 0; } @@ -186,19 +160,62 @@ int libp2p_yamux_close(void* stream_context) { * @returns true(1) on success, false(0) on failure */ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) { - struct YamuxContext* ctx = (struct YamuxContext*)stream_context; - struct Stream* parent_stream = ctx->stream->parent_stream; - - struct StreamMessage* incoming; - if (!parent_stream->read(parent_stream->stream_context, &incoming, timeout_secs)) + if (stream_context == NULL) return 0; + // look at the first byte of the context to determine if this is a YamuxContext (we're negotiating) + // or a YamuxChannelContext (we're talking to an established channel) + struct YamuxContext* ctx = NULL; + struct YamuxChannelContext* channel = NULL; + char proto = ((uint8_t*)stream_context)[0]; + if (proto == YAMUX_CHANNEL_CONTEXT) { + channel = (struct YamuxChannelContext*)stream_context; + ctx = channel->yamux_context; + } else if (proto == YAMUX_CONTEXT) { + ctx = (struct YamuxContext*)stream_context; + } - // we've got bytes from the network. process them as a yamux frame - return yamux_decode(ctx->session, incoming->data, incoming->data_size); + if (channel != NULL && channel->channel != NULL) { + // we have an established channel. Use it. + if (!channel->yamux_context->stream->parent_stream->read(channel->yamux_context->stream->parent_stream->stream_context, message, yamux_default_timeout)) + return 0; + // TODO: This is not right. It must be sorted out. + struct StreamMessage* msg = *message; + return yamux_decode(channel->channel->session, msg->data, msg->data_size); + } else if (ctx != NULL) { + // We are still negotiating... + return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout); + } + return 0; } +/*** + * Write to the remote + * @param stream_context the context. Could be a YamuxContext or YamuxChannelContext + * @param message the message to write + * @returns the number of bytes written + */ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { - //TODO: Implement + if (stream_context == NULL) + return 0; + // look at the first byte of the context to determine if this is a YamuxContext (we're negotiating) + // or a YamuxChannelContext (we're talking to an established channel) + struct YamuxContext* ctx = NULL; + struct YamuxChannelContext* channel = NULL; + char proto = ((uint8_t*)stream_context)[0]; + if (proto == YAMUX_CHANNEL_CONTEXT) { + channel = (struct YamuxChannelContext*)stream_context; + ctx = channel->yamux_context; + } else if (proto == YAMUX_CONTEXT) { + ctx = (struct YamuxContext*)stream_context; + } + + if (channel != NULL && channel->channel != NULL) { + // we have an established channel. Use it. + return yamux_stream_write(channel->channel, message->data_size, message->data); + } else if (ctx != NULL) { + // We are still negotiating... + return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message); + } return 0; } @@ -227,14 +244,19 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size struct YamuxContext* libp2p_yamux_context_new() { struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); if (ctx != NULL) { + ctx->type = YAMUX_CONTEXT; ctx->stream = NULL; ctx->channels = libp2p_utils_vector_new(1); } return ctx; } -void libp2p_yamux_stream_free(struct Stream* yamux_stream) { - //TODO: Implement +void libp2p_yamux_context_free(struct YamuxContext* ctx) { + if (ctx == NULL) + return; + libp2p_utils_vector_free(ctx->channels); + free(ctx); + return; } int libp2p_yamux_negotiate(struct YamuxContext* ctx) { @@ -302,7 +324,7 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) { * @returns a Stream initialized and ready for yamux */ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { - struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); + struct Stream* out = libp2p_stream_new(); if (out != NULL) { out->parent_stream = parent_stream; out->close = libp2p_yamux_close; @@ -328,6 +350,18 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { return out; } +/** + * Frees resources held by the stream + * @param yamux_stream the stream + */ +void libp2p_yamux_stream_free(struct Stream* yamux_stream) { + if (yamux_stream == NULL) + return; + struct YamuxContext* ctx = (struct YamuxContext*)yamux_stream->stream_context; + libp2p_yamux_context_free(ctx); + libp2p_stream_free(yamux_stream); +} + /**** * Add a stream "channel" to the yamux handler * @param ctx the context @@ -335,8 +369,46 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { * @returns true(1) on success, false(0) otherwise */ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { - int itemNo = libp2p_utils_vector_add(ctx->channels, stream); - stream->channel = itemNo; - return 1; + if (stream == NULL) + return 0; + // the stream's parent should have a YamuxChannelContext + char proto = ((uint8_t*)stream->parent_stream->stream_context)[0]; + if (proto == YAMUX_CHANNEL_CONTEXT) { + // the negotiation was successful. Add it to the list of channels that we have + int itemNo = libp2p_utils_vector_add(ctx->channels, stream); + struct YamuxChannelContext* incoming = (struct YamuxChannelContext*)stream->parent_stream->stream_context; + if (incoming->channel == NULL) { + // this is wrong. There should have been a yamux_stream there + return 0; + } + incoming->channel->id = itemNo; + return 1; + } + return 0; } +/** + * Create a stream that has a "YamuxChannelContext" related to this yamux protocol + * @param parent_stream the parent yamux stream + * @returns a new Stream that is a YamuxChannelContext + */ +struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream) { + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + out->address = parent_stream->address; + out->close = parent_stream->close; + out->parent_stream = parent_stream; + out->peek = parent_stream->peek; + out->read = parent_stream->read; + out->read_raw = parent_stream->read_raw; + out->socket_mutex = parent_stream->socket_mutex; + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); + ctx->type = YAMUX_CHANNEL_CONTEXT; + ctx->yamux_context = parent_stream->stream_context; + out->stream_context = ctx; + out->write = parent_stream->write; + } + return out; +} + +