diff --git a/conn/dialer.c b/conn/dialer.c index efb9904..9f34914 100644 --- a/conn/dialer.c +++ b/conn/dialer.c @@ -139,7 +139,7 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; // yamux over multistream - new_stream = libp2p_yamux_stream_new(new_stream); + new_stream = libp2p_yamux_stream_new(new_stream, 0, NULL); if (new_stream != NULL) { peer->sessionContext->default_stream = new_stream; // we have our swarm connection. Now we ask for some "channels" diff --git a/conn/session.c b/conn/session.c index db8f3eb..b49e8b5 100644 --- a/conn/session.c +++ b/conn/session.c @@ -157,6 +157,26 @@ void libp2p_stream_message_free(struct StreamMessage* msg) { } } +/*** + * Make a copy of a message + * @param original the original message + * @returns a StreamMessage that is a copy of the original + */ +struct StreamMessage* libp2p_stream_message_copy(const struct StreamMessage* original) { + struct StreamMessage* copy = libp2p_stream_message_new(); + if (copy != NULL) { + copy->error_number = original->error_number; + copy->data_size = original->data_size; + copy->data = (uint8_t*) malloc(copy->data_size); + if (copy->data == NULL) { + libp2p_stream_message_free(copy); + return NULL; + } + memcpy(copy->data, original->data, copy->data_size); + } + return copy; +} + /**** * Make a copy of a SessionContext * @param original the original diff --git a/identify/identify.c b/identify/identify.c index 3eafbd8..8d1c7fd 100644 --- a/identify/identify.c +++ b/identify/identify.c @@ -3,6 +3,7 @@ #include "varint.h" #include "libp2p/net/protocol.h" #include "libp2p/net/protocol.h" +#include "libp2p/net/multistream.h" #include "libp2p/utils/vector.h" #include "libp2p/net/stream.h" #include "libp2p/conn/session.h" @@ -50,7 +51,7 @@ int libp2p_identify_send_protocol(struct Stream *stream) { /*** * Check to see if the reply is the identify header we expect * NOTE: if we initiate the connection, we should expect the same back - * @param context the SessionContext + * @param stream the incoming stream of the underlying protocol * @returns true(1) on success, false(0) otherwise */ int libp2p_identify_receive_protocol(struct Stream* stream) { @@ -81,12 +82,11 @@ int libp2p_identify_receive_protocol(struct Stream* stream) { * @returns <0 on error, 0 if loop should not continue, >0 on success */ int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { + // attempt to create a new Identify connection with them. + // send the protocol id back, and set up the channel struct Stream* new_stream = libp2p_identify_stream_new(stream); if (new_stream == NULL) return -1; - // attempt to create a new Identify connection with them. - // send the protocol id back, and set up the channel - //TODO: now add this "channel" return stream->handle_upgrade(stream, new_stream); } @@ -149,6 +149,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { ctx->stream = out; out->stream_context = ctx; out->close = libp2p_identify_close; + out->negotiate = libp2p_identify_stream_new; if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) { libp2p_stream_free(out); free(ctx); @@ -158,3 +159,40 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) { return out; } +/*** + * Create a new stream that negotiates the identify protocol + * on top of the multistream protocol + * + * NOTE: This will be sent by our side (us asking them). + * Incoming "Identify" requests should be handled by the + * external protocol handler, not this function. + * + * @param parent_stream the parent stream + * @returns a new Stream that is a multistream, but with "identify" already negotiated + */ +struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent_stream) { + if (parent_stream == NULL) + return NULL; + struct Stream* multistream = libp2p_net_multistream_stream_new(parent_stream); + struct Stream* out = libp2p_stream_new(); + if (out != NULL) { + out->stream_type = STREAM_TYPE_IDENTIFY; + out->parent_stream = multistream; + struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext)); + if (ctx == NULL) { + libp2p_stream_free(out); + return NULL; + } + ctx->parent_stream = multistream; + ctx->stream = out; + out->stream_context = ctx; + out->close = libp2p_identify_close; + out->negotiate = libp2p_identify_stream_new_with_multistream; + if (!libp2p_identify_send_protocol(parent_stream) || !libp2p_identify_receive_protocol(parent_stream)) { + libp2p_stream_free(out); + free(ctx); + return NULL; + } + } + return out; +} diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 6ac018d..4da9f87 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -95,6 +95,15 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx); */ struct KademliaMessage* libp2p_net_multistream_get_message(struct Stream* stream); +/** + * Add the transmission size to the front of a StreamMessage. + * NOTE: This is used internally by multistream. It is accessible to help + * with testing. + * @param incoming the incoming message + * @returns a new StreamMessage, in the format of a MessageStream buffer + */ +struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessage* incoming); + struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream); void libp2p_net_multistream_stream_free(struct Stream* stream); diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 370d372..360e565 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -25,6 +25,12 @@ struct StreamMessage* libp2p_stream_message_new(); */ void libp2p_stream_message_free(struct StreamMessage* msg); +/*** + * Make a copy of a message + * @param original the original message + * @returns a StreamMessage that is a copy of the original + */ +struct StreamMessage* libp2p_stream_message_copy(const struct StreamMessage* original); /** * This is a context struct for a basic IP connection @@ -35,6 +41,19 @@ struct ConnectionContext { struct SessionContext* session_context; }; +/** + * The different types of protocols + */ +enum stream_type { + STREAM_TYPE_UNKNOWN = 0x0, + STREAM_TYPE_MULTISTREAM = 0x1, + STREAM_TYPE_SECIO = 0x2, + STREAM_TYPE_KADEMLIA = 0x3, + STREAM_TYPE_IDENTIFY = 0x4, + STREAM_TYPE_YAMUX = 0x5, + STREAM_TYPE_JOURNAL = 0x6, + STREAM_TYPE_RAW = 0x7 +}; /** * An interface in front of various streams @@ -47,6 +66,7 @@ struct Stream { pthread_mutex_t* socket_mutex; // only 1 transmission at a time struct Stream* parent_stream; // what stream wraps this stream int channel; // the channel (for multiplexing streams) + enum stream_type stream_type; /** * A generic place to store implementation-specific context items @@ -103,6 +123,13 @@ struct Stream { * @param new_stream the newly created stream */ int (*handle_upgrade)(struct Stream* stream, struct Stream* new_stream); + + /*** + * Negotiate this protocol using the parent stream + * @param parent_stream the connection to use + * @returns a new Stream, or NULL on error + */ + struct Stream* (*negotiate)(struct Stream* parent_stream); }; struct Stream* libp2p_stream_new(); diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h index fc116cc..79ad0ec 100644 --- a/include/libp2p/yamux/session.h +++ b/include/libp2p/yamux/session.h @@ -23,14 +23,18 @@ enum yamux_error yamux_error_intern = 0x02 }; +// forward declarations struct yamux_session; struct yamux_stream; +struct YamuxContext; +struct Stream; +struct StreamMessage; typedef void* (*yamux_session_get_str_ud_fn)(struct yamux_session* session, yamux_streamid newid ); typedef void (*yamux_session_ping_fn )(struct yamux_session* session, uint32_t val ); typedef void (*yamux_session_pong_fn )(struct yamux_session* session, uint32_t val, struct timespec dt); typedef void (*yamux_session_go_away_fn )(struct yamux_session* session, enum yamux_error err ); -typedef void (*yamux_session_new_stream_fn)(struct yamux_session* session, struct yamux_stream* stream); +typedef void (*yamux_session_new_stream_fn)(struct YamuxContext* context, struct Stream* stream, struct StreamMessage* msg); typedef void (*yamux_session_free_fn )(struct yamux_session* sesssion ); struct yamux_session_stream @@ -38,31 +42,71 @@ struct yamux_session_stream struct yamux_stream* stream; int alive; }; + +/** + * A yamux session. This keeps all the streams related to a yamux session + */ struct yamux_session { - struct yamux_config* config; + struct yamux_config* config; // configuration of size of windows and max number of streams - size_t num_streams; - size_t cap_streams; - struct yamux_session_stream* streams; + size_t num_streams; // number of streams + size_t cap_streams; // capacity of stream array + struct yamux_session_stream* streams; // array of streams + /** + * Get user data + */ yamux_session_get_str_ud_fn get_str_ud_fn; + /** + * Ping + */ yamux_session_ping_fn ping_fn ; + /** + * Respond to ping + */ yamux_session_pong_fn pong_fn ; + /** + * Hanging up + */ yamux_session_go_away_fn go_away_fn ; + /** + * A new stream is coming in + */ yamux_session_new_stream_fn new_stream_fn; + /** + * Free resources + */ yamux_session_free_fn free_fn ; + /** + * User data + */ void* userdata; + /** + * for heartbeat + */ struct timespec since_ping; + /** + * Session type (client or server) + */ enum yamux_session_type type; + /*** + * The parent stream + */ struct Stream* parent_stream; + /*** + * The next id to use + */ yamux_streamid nextid; + /** + * Determine if this session is closed + */ int closed; }; @@ -92,12 +136,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po // defers to stream read handlers ssize_t yamux_session_read(struct yamux_session* session); -struct YamuxChannelContext; /** * Decode an incoming message - * @param channel the channel + * @param context a YamuxChannelContext or YamuxContext * @param incoming the incoming bytes * @param incoming_size the size of the incoming bytes + * @param return_message the return message (usually the bytes after the frame) * @returns true(1) on success, false(0) otherwise */ -int yamux_decode(struct YamuxChannelContext* channel, const uint8_t* incoming, size_t incoming_size); +int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message); diff --git a/include/libp2p/yamux/stream.h b/include/libp2p/yamux/stream.h index 3984313..10db6a3 100644 --- a/include/libp2p/yamux/stream.h +++ b/include/libp2p/yamux/stream.h @@ -7,6 +7,9 @@ #include "libp2p/conn/session.h" #include "libp2p/yamux/yamux.h" +// forward declarations +struct YamuxChannelContext; + // NOTE: 'data' is not guaranteed to be preserved when the read_fn // handler exists (read: it will be freed). struct yamux_stream; @@ -35,7 +38,7 @@ struct yamux_stream yamux_stream_rst_fn rst_fn ; yamux_stream_free_fn free_fn; - void* userdata; + struct Stream* stream; enum yamux_stream_state state; @@ -44,16 +47,26 @@ struct yamux_stream uint32_t window_size; }; -// does not init the stream -struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata); +/** + * Create a new stream that has a YamuxChannelContext + * @param context the Yamux context + * @param id the stream id + * @param msg the incoming message + * @returns a stream that is a Yamux channel + */ +struct Stream* yamux_channel_new(struct YamuxContext* context, yamux_streamid id, struct StreamMessage* msg); // not obligatory, SYN is sent by yamux_stream_write when the stream // isn't initialised anyway ssize_t yamux_stream_init (struct YamuxChannelContext* channel_ctx); -// doesn't free the stream -// uses FIN -ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx); +/*** + * Closes the stream + * NOTE: doesn't free the stream, uses FIN + * @param context the YamuxContext or YamuxChannelContext + * @returns number of bytes sent + */ +ssize_t yamux_stream_close(void* context); // uses RST ssize_t yamux_stream_reset(struct YamuxChannelContext* stream); @@ -68,9 +81,14 @@ ssize_t yamux_stream_write(struct YamuxChannelContext* ctx, uint32_t data_length * @param frame the frame * @param incoming the stream bytes (after the frame) * @param incoming_size the size of incoming - * @param session_context the SessionContext * @returns the number of bytes processed (can be zero) or negative number on error */ -ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context); +ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size); +/** + * Retrieve the flags for this context + * @param context the context + * @returns the correct flag + */ +enum yamux_frame_flags get_flags(void* context); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 5c67f8f..6153e75 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -21,6 +21,9 @@ struct YamuxContext { struct Stream* stream; struct yamux_session* session; struct Libp2pVector* channels; + int am_server; + int state; // the state of the connection + struct Libp2pVector* protocol_handlers; }; struct YamuxChannelContext { @@ -60,7 +63,14 @@ int yamux_send_protocol(struct Stream* stream); */ int yamux_receive_protocol(struct YamuxContext* context); -struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); +/*** + * Negotiate the Yamux protocol + * @param parent_stream the parent stream + * @param am_server true(1) if we are considered the server, false(0) if we are the client. + * @param protocol_handlers the protocol handlers (used when a new protocol is requested) + * @returns a Stream initialized and ready for yamux + */ +struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_server, struct Libp2pVector* protocol_handlers); void libp2p_yamux_stream_free(struct Stream* stream); @@ -74,9 +84,22 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream); /** * Create a stream that has a "YamuxChannelContext" related to this yamux protocol - * @param parent_stream the parent yamux stream - * @returns a new Stream that is a YamuxChannelContext + * NOTE: If incoming_stream is not of the Yamux protocol, this "wraps" the incoming + * stream, so that the returned stream is the parent of the incoming_stream. If the + * incoming stream is of the yamux protocol, the YamuxChannelContext.child_stream + * will be NULL, awaiting an upgrade to fill it in. + * @param incoming_stream the stream of the new protocol + * @param channelNumber the channel number (0 if unknown) + * @returns a new Stream that has a YamuxChannelContext */ -struct Stream* libp2p_yamux_channel_new(struct Stream* parent_stream); +struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber); void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx); + +/*** + * Prepare a new Yamux StreamMessage based on another StreamMessage + * NOTE: This is here for testing. This should normally not be used. + * @param incoming the incoming message + * @returns a new StreamMessage that has a yamux_frame + */ +struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming); diff --git a/net/connectionstream.c b/net/connectionstream.c index 639fe27..a73f2d1 100644 --- a/net/connectionstream.c +++ b/net/connectionstream.c @@ -117,6 +117,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) struct Stream* libp2p_net_connection_new(int fd, char* ip, int port, struct SessionContext* session_context) { struct Stream* out = (struct Stream*) malloc(sizeof(struct Stream)); if (out != NULL) { + out->stream_type = STREAM_TYPE_RAW; out->close = libp2p_net_connection_close; out->peek = libp2p_net_connection_peek; out->read = libp2p_net_connection_read; diff --git a/net/multistream.c b/net/multistream.c index 532dff5..b209d89 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -131,6 +131,31 @@ int libp2p_net_multistream_peek(void* stream_context) { return parent_stream->peek(parent_stream->stream_context); } +/** + * Add the transmission size to the front of a StreamMessage. + * NOTE: This is used internally by multistream. It is accessible to help + * with testing. + * @param incoming the incoming message + * @returns a new StreamMessage, in the format of a MessageStream buffer + */ +struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessage* incoming) { + struct StreamMessage* out = libp2p_stream_message_new(); + if (out != NULL) { + unsigned char varint[12]; + size_t varint_size = 0; + varint_encode(incoming->data_size, &varint[0], 12, &varint_size); + out->data_size = varint_size + incoming->data_size; + out->data = malloc(out->data_size); + if (out->data == NULL) { + libp2p_stream_message_free(out); + return NULL; + } + memcpy(&out->data[0], varint, varint_size); + memcpy(&out->data[varint_size], incoming->data, incoming->data_size); + } + return out; +} + /** * Write to an open multistream host * @param stream_context the session context @@ -143,27 +168,14 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* inc int num_bytes = 0; if (incoming->data_size > 0) { // only do this is if there is something to send - // first get the size as a varint - unsigned char varint[12]; - size_t varint_size = 0; - varint_encode(incoming->data_size, &varint[0], 12, &varint_size); - // now put the size with the data - struct StreamMessage outgoing; - outgoing.data_size = varint_size + incoming->data_size; - outgoing.data = (uint8_t*) malloc(outgoing.data_size); - if (outgoing.data == NULL) { - return 0; - } - memset(outgoing.data, 0, outgoing.data_size); - memcpy(outgoing.data, varint, varint_size); - memcpy(&outgoing.data[varint_size], incoming->data, incoming->data_size); + struct StreamMessage* out = libp2p_net_multistream_prepare_to_send(incoming); // now ship it - libp2p_logger_debug("multistream", "Attempting write %d bytes.\n", (int)outgoing.data_size); - num_bytes = parent_stream->write(parent_stream->stream_context, &outgoing); + libp2p_logger_debug("multistream", "Attempting write %d bytes.\n", (int)out->data_size); + num_bytes = parent_stream->write(parent_stream->stream_context, out); // subtract the varint if all went well - if (num_bytes == outgoing.data_size) + if (num_bytes == out->data_size) num_bytes = incoming->data_size; - free(outgoing.data); + libp2p_stream_message_free(out); } return num_bytes; @@ -349,7 +361,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx) { void libp2p_net_multistream_stream_free(struct Stream* stream) { if (stream != NULL) { - stream->parent_stream->close(stream->parent_stream->stream_context); + stream->parent_stream->close(stream->parent_stream); // TODO: free memory allocations } } @@ -370,6 +382,7 @@ int libp2p_net_multistream_read_raw(void* stream_context, uint8_t* buffer, int b struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream) { struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); if (out != NULL) { + out->stream_type = STREAM_TYPE_MULTISTREAM; out->parent_stream = parent_stream; out->close = libp2p_net_multistream_close; out->read = libp2p_net_multistream_read; diff --git a/net/protocol.c b/net/protocol.c index 4ff4956..252d077 100644 --- a/net/protocol.c +++ b/net/protocol.c @@ -42,7 +42,7 @@ struct Libp2pProtocolHandler* libp2p_protocol_handler_new() { /*** * Handle an incoming message * @param message the incoming message - * @param session the SessionContext of the incoming connection + * @param stream the stream the message came in on * @param handlers a Vector of protocol handlers * @returns -1 on error, 0 if everything was okay, but the daemon should no longer handle this connection, 1 on success */ diff --git a/secio/secio.c b/secio/secio.c index e080a45..1c98b08 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -1353,6 +1353,7 @@ int libp2p_secio_close(struct Stream* stream) { struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { struct Stream* new_stream = libp2p_stream_new(); if (new_stream != NULL) { + new_stream->stream_type = STREAM_TYPE_SECIO; struct SecioContext* ctx = (struct SecioContext*) malloc(sizeof(struct SecioContext)); if (ctx == NULL) { libp2p_stream_free(new_stream); diff --git a/test/mock_stream.h b/test/mock_stream.h index 8f8d4f0..19f7283 100644 --- a/test/mock_stream.h +++ b/test/mock_stream.h @@ -3,6 +3,9 @@ #include #include "libp2p/net/stream.h" +struct StreamMessage* mock_message = NULL; +int mock_message_position = 0; + void mock_stream_free(struct Stream* stream); int mock_stream_close(struct Stream* stream) { @@ -14,19 +17,27 @@ int mock_stream_close(struct Stream* stream) { } int mock_stream_peek(void* context) { - return 1; + if (mock_message == NULL) + return 0; + return mock_message->data_size; } int mock_stream_read(void* context, struct StreamMessage** msg, int timeout_secs) { + *msg = libp2p_stream_message_copy(mock_message); return 1; } int mock_stream_read_raw(void* context, uint8_t* buffer, int buffer_size, int timeout_secs) { - return 1; + if (mock_message == NULL) + return 0; + int min = buffer_size > mock_message->data_size - mock_message_position ? mock_message->data_size - mock_message_position : buffer_size; + memcpy(buffer, mock_message->data, min); + mock_message_position += min; + return min; } int mock_stream_write(void* context, struct StreamMessage* msg) { - return 1; + return msg->data_size; } struct Stream* mock_stream_new() { diff --git a/test/test_yamux.h b/test/test_yamux.h index 6237c15..aa6da5f 100644 --- a/test/test_yamux.h +++ b/test/test_yamux.h @@ -3,11 +3,22 @@ #include "libp2p/identify/identify.h" #include "mock_stream.h" #include "libp2p/utils/logger.h" +#include "libp2p/net/stream.h" +#include "libp2p/net/multistream.h" /*** * Helpers */ +struct StreamMessage* build_message(const char* data) { + struct StreamMessage* out = libp2p_stream_message_new(); + if (out != NULL) { + out->data_size = strlen(data); + out->data = (uint8_t*) malloc(out->data_size); + memcpy(out->data, data, out->data_size); + } + return out; +} /*** * Sends back the yamux protocol to fake negotiation */ @@ -22,18 +33,49 @@ int mock_yamux_read_protocol(void* context, struct StreamMessage** msg, int netw } /*** - * Sends back the yamux protocol to fake negotiation + * Sends back the identify protocol (in a yamux wrapper) to fake negotiation */ int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { - *msg = libp2p_stream_message_new(); - struct StreamMessage* message = *msg; + struct StreamMessage message; const char* id = "/ipfs/id/1.0.0\n"; - message->data_size = strlen(id); - message->data = malloc(message->data_size); - memcpy(message->data, id, message->data_size); + message.data_size = strlen(id); + message.data = (uint8_t*)id; + + *msg = libp2p_yamux_prepare_to_send(&message); + // adjust the frame + struct yamux_frame* frame = (struct yamux_frame*)(*msg)->data; + frame->streamid = 1; + frame->flags = yamux_frame_syn; + encode_frame(frame); return 1; } +int mock_counter = 0; + +/*** + * Sends back the yamux protocol to fake negotiation + */ +int mock_multistream_then_identify_read_protocol(void* context, struct StreamMessage** msg, int network_timeout) { + // prepare the message + *msg = libp2p_stream_message_new(); + struct StreamMessage* message = *msg; + message->data_size = mock_message->data_size - mock_message_position; + message->data = malloc(message->data_size); + memcpy(message->data, &mock_message->data[mock_message_position], message->data_size); + if (mock_counter == 0) { + // this is the first time through. Set mock_message to the identify protocol + libp2p_stream_message_free(mock_message); + mock_message = libp2p_net_multistream_prepare_to_send(build_message("/ipfs/id/1.0.0\n")); + mock_message_position = 0; + } else { + libp2p_stream_message_free(mock_message); + mock_message = NULL; + mock_message_position = 0; + } + return (*msg != NULL); +} + + /*** * Tests */ @@ -43,10 +85,11 @@ int mock_identify_read_protocol(void* context, struct StreamMessage** msg, int n */ int test_yamux_stream_new() { int retVal = 0; + const char* yamux_id = "/yamux/1.0.0\n"; // setup struct Stream* mock_stream = mock_stream_new(); - mock_stream->read = mock_yamux_read_protocol; - struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); + mock_message = build_message(yamux_id); + struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream, 0, NULL); if (yamux_stream == NULL) goto exit; // tear down @@ -54,6 +97,8 @@ int test_yamux_stream_new() { exit: if (yamux_stream != NULL) yamux_stream->close(yamux_stream); + if (mock_message != NULL) + libp2p_stream_message_free(mock_message); return retVal; } @@ -63,9 +108,15 @@ int test_yamux_stream_new() { int test_yamux_identify() { int retVal = 0; // setup + // mock struct Stream* mock_stream = mock_stream_new(); mock_stream->read = mock_yamux_read_protocol; - struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream); + // protocol handlers + struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1); + struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + // yamux + struct Stream* yamux_stream = libp2p_yamux_stream_new(mock_stream, 0, protocol_handlers); if (yamux_stream == NULL) goto exit; // Now add in another protocol @@ -78,12 +129,91 @@ int test_yamux_identify() { exit: if (yamux_stream != NULL) yamux_stream->close(yamux_stream); + libp2p_protocol_handlers_shutdown(protocol_handlers); + if (mock_message != NULL) { + libp2p_stream_message_free(mock_message); + mock_message = NULL; + } return retVal; } int test_yamux_incoming_protocol_request() { int retVal = 0; + // setup + // build the protocol handler that can handle yamux, multistream, and identify protocol + struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1); + struct Libp2pProtocolHandler* handler = libp2p_identify_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + handler = libp2p_yamux_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); + libp2p_utils_vector_add(protocol_handlers, handler); + // set up basic streams + struct Stream* mock_stream = mock_stream_new(); + struct SessionContext* session_context = ((struct ConnectionContext*)mock_stream->stream_context)->session_context; + mock_message = build_message("/yamux/1.0.0\n"); + struct StreamMessage* result_message = NULL; + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { + libp2p_logger_error("test_yamux", "Unable to read Yamux protocol from mock stream.\n"); + goto exit; + } + if (libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers) < 0) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol unsuccessful.\n"); + goto exit; + } + // now we should have upgraded to the yamux protocol + libp2p_stream_message_free(result_message); + result_message = NULL; + if (session_context->default_stream->parent_stream == NULL) { + libp2p_logger_error("test_yamux", "Upgrade to Yamux protocol appeared susccessful, but was not.\n"); + goto exit; + } + // Someone is requesting the multistream protocol + libp2p_stream_message_free(mock_message); + mock_message = libp2p_yamux_prepare_to_send(libp2p_net_multistream_prepare_to_send(build_message("/multistream/1.0.0\n"))); + // act like this is new + struct yamux_frame* frame = (struct yamux_frame*)mock_message->data; + frame->streamid = (uint32_t)1; + frame->flags = yamux_frame_syn; + encode_frame(frame); + mock_stream->read = mock_stream_read; + if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { + libp2p_logger_error("test_yamux", "Unable to read multistream protocol.\n"); + goto exit; + } + // handle the marshaling of the multistream protocol + libp2p_protocol_marshal(result_message, session_context->default_stream, protocol_handlers); + libp2p_stream_message_free(result_message); + result_message = NULL; + // now verify the results + if (session_context->default_stream->stream_type != STREAM_TYPE_YAMUX) { + libp2p_logger_error("test_yamux", "Expected stream type of %d, but received %d.\n", STREAM_TYPE_YAMUX, session_context->default_stream->stream_type); + goto exit; + } + struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context; + if (yamux_context->channels->total != 2) { + libp2p_logger_error("test_yamux", "Identify protocol was not found.\n"); + goto exit; + } + + // tear down + retVal = 1; + exit: + if (session_context->default_stream != NULL) + session_context->default_stream->close(session_context->default_stream); + libp2p_protocol_handlers_shutdown(protocol_handlers); + return retVal; +} + +/** + * Attempt to negotiate the identity protocol, then use it. + * This makes sure the framing is working correctly betwee identity + * and yamux + */ +int test_yamux_identity_frame() { + int retVal = 0; + // setup // build the protocol handler that can handle yamux and identify protocol struct Libp2pVector* protocol_handlers = libp2p_utils_vector_new(1); @@ -111,7 +241,7 @@ int test_yamux_incoming_protocol_request() { goto exit; } // Someone is requesting the identity protocol - mock_stream->read = mock_identify_read_protocol; + mock_stream->read = mock_multistream_then_identify_read_protocol; if (!session_context->default_stream->read(session_context->default_stream->stream_context, &result_message, 10)) { libp2p_logger_error("test_yamux", "Unable to read identify protocol.\n"); goto exit; @@ -122,11 +252,15 @@ int test_yamux_incoming_protocol_request() { result_message = NULL; // now verify the results struct YamuxContext* yamux_context = (struct YamuxContext*)session_context->default_stream->stream_context; - if (yamux_context->channels->total != 1) { + if (yamux_context->channels->total != 2) { libp2p_logger_error("test_yamux", "Identify protocol was not found.\n"); goto exit; } + // prepare a yamux frame that is an identity message + + // send the message + // tear down retVal = 1; exit: @@ -134,4 +268,5 @@ int test_yamux_incoming_protocol_request() { session_context->default_stream->close(session_context->default_stream); libp2p_protocol_handlers_shutdown(protocol_handlers); return retVal; + } diff --git a/yamux/session.c b/yamux/session.c index 1952ea9..322dca1 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -147,12 +147,27 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po /** * Decode an incoming message - * @param session the session + * @param context the YamuxContext or YamuxChannelContext * @param incoming the incoming bytes * @param incoming_size the size of the incoming bytes - * @returns true(1) on success, false(0) otherwise + * @param return_message the results (usually the stuff after the frame) + * @returns 0 on success, negative number on error */ -int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* incoming, size_t incoming_size) { +int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message) { + + // retrieve the yamux context + struct yamux_session* yamux_session = NULL; + struct YamuxContext* yamuxContext = NULL; + if (context == NULL) + return 0; + if ( ((char*)context)[0] == YAMUX_CONTEXT) { + yamuxContext = (struct YamuxContext*)context; + yamux_session = yamuxContext->session; + } else if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) { + struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)context; + yamuxContext = channelContext->yamux_context; + yamux_session = channelContext->yamux_context->session; + } // decode frame struct yamux_frame f; @@ -173,14 +188,14 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco case yamux_frame_ping: // ping if (f.flags & yamux_frame_syn) { - yamux_session_ping(channelContext->yamux_context->session, f.length, 1); + yamux_session_ping(yamux_session, f.length, 1); - if (channelContext->yamux_context->session->ping_fn) - channelContext->yamux_context->session->ping_fn(channelContext->yamux_context->session, f.length); + if (yamux_session->ping_fn) + yamux_session->ping_fn(yamux_session, f.length); } - else if ((f.flags & yamux_frame_ack) && channelContext->yamux_context->session->pong_fn) + else if ((f.flags & yamux_frame_ack) && yamux_session->pong_fn) { - struct timespec now, dt, last = channelContext->yamux_context->session->since_ping; + struct timespec now, dt, last = yamux_session->since_ping; if (!timespec_get(&now, TIME_UTC)) return -EACCES; @@ -193,32 +208,33 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco else dt.tv_nsec = now.tv_nsec - last.tv_nsec; - channelContext->yamux_context->session->pong_fn(channelContext->yamux_context->session, f.length, dt); + yamux_session->pong_fn(yamux_session, f.length, dt); } else return -EPROTO; break; case yamux_frame_go_away: // go away (hanging up) - channelContext->yamux_context->session->closed = 1; - if (channelContext->yamux_context->session->go_away_fn) - channelContext->yamux_context->session->go_away_fn(channelContext->yamux_context->session, (enum yamux_error)f.length); + yamux_session->closed = 1; + if (yamux_session->go_away_fn) + yamux_session->go_away_fn(yamux_session, (enum yamux_error)f.length); break; default: return -EPROTO; } else { // we're handling a stream, not something at the yamux protocol level - for (size_t i = 0; i < channelContext->yamux_context->session->cap_streams; ++i) + for (size_t i = 0; i < yamux_session->cap_streams; ++i) { - struct yamux_session_stream* ss = &channelContext->yamux_context->session->streams[i]; + struct yamux_session_stream* ss = &yamux_session->streams[i]; struct yamux_stream* s = ss->stream; - if (!ss->alive || s->state == yamux_stream_closed) + if (!ss->alive || s->state == yamux_stream_closed) // skip dead or closed streams continue; - if (s->id == f.streamid) + if (s->id == f.streamid) // we have a match between the stored stream and the current stream { if (f.flags & yamux_frame_rst) { + // close the stream s->state = yamux_stream_closed; if (s->rst_fn) @@ -228,7 +244,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco { // local stream didn't initiate FIN if (s->state != yamux_stream_closing) - yamux_stream_close(channelContext); + yamux_stream_close(context); s->state = yamux_stream_closed; @@ -237,6 +253,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco } else if (f.flags & yamux_frame_ack) { + // acknowldegement if (s->state != yamux_stream_syn_sent) return -EPROTO; @@ -246,7 +263,7 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco return -EPROTO; int sz = sizeof(struct yamux_frame); - ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, channelContext->yamux_context->stream->parent_stream->stream_context); + ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz); return (re < 0) ? re : (re + incoming_size); } } @@ -255,17 +272,22 @@ int yamux_decode(struct YamuxChannelContext* channelContext, const uint8_t* inco // It must not exist yet, so let's try to make it if (f.flags & yamux_frame_syn) { - void* ud = NULL; // user data + struct StreamMessage* msg = libp2p_stream_message_new(); - if (channelContext->yamux_context->session->get_str_ud_fn) - ud = channelContext->yamux_context->session->get_str_ud_fn(channelContext->yamux_context->session, f.streamid); + if (incoming_size > sizeof(struct yamux_frame)) { + msg->data_size = incoming_size - sizeof(struct yamux_frame); + msg->data = malloc(msg->data_size); + memcpy(msg->data, &incoming[sizeof(struct yamux_frame)], msg->data_size); + } - struct yamux_stream* st = yamux_stream_new(channelContext->yamux_context->session, f.streamid, ud); + struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg); + struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; - if (channelContext->yamux_context->session->new_stream_fn) - channelContext->yamux_context->session->new_stream_fn(channelContext->yamux_context->session, st); + if (yamux_session->new_stream_fn) + yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg); - st->state = yamux_stream_syn_recv; + channelContext->state = yamux_stream_syn_recv; + *return_message = msg; } else return -EPROTO; diff --git a/yamux/stream.c b/yamux/stream.c index 52e8825..b8491ac 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -9,57 +9,62 @@ #include "libp2p/net/stream.h" #include "libp2p/yamux/frame.h" #include "libp2p/yamux/stream.h" +#include "libp2p/yamux/yamux.h" #define MIN(x,y) (y^((x^y)&-(xsession; + if (!id) { id = session->nextid; session->nextid += 2; } - struct yamux_stream* st = NULL; - struct yamux_session_stream* ss; + struct yamux_stream* y_stream = NULL; + struct yamux_session_stream* session_stream = NULL; - if (session->num_streams != session->cap_streams) + if (session->num_streams != session->cap_streams) { + // attempt to reuse dead streams for (size_t i = 0; i < session->cap_streams; ++i) { - ss = &session->streams[i]; + session_stream = &session->streams[i]; - if (!ss->alive) + if (!session_stream->alive) { - st = ss->stream; - ss->alive = 1; + y_stream = session_stream->stream; + session_stream->alive = 1; goto FOUND; } } + } if (session->cap_streams == session->config->accept_backlog) return NULL; - ss = &session->streams[session->cap_streams]; + // we didn't find a dead stream, so create a new one + session_stream = &session->streams[session->cap_streams]; - if (ss->alive) + if (session_stream->alive) return NULL; session->cap_streams++; - ss->alive = 1; - st = ss->stream = malloc(sizeof(struct yamux_stream)); + session_stream->alive = 1; + y_stream = session_stream->stream = malloc(sizeof(struct yamux_stream)); FOUND:; @@ -72,12 +77,24 @@ FOUND:; .read_fn = NULL, .fin_fn = NULL, .rst_fn = NULL, - - .userdata = userdata + .stream = libp2p_yamux_channel_stream_new(context->stream, id) }; - *st = nst; + *y_stream = nst; - return st; + if (libp2p_protocol_marshal(msg, nst.stream, context->protocol_handlers) >= 0) { + // success + } + /* + struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream); + struct YamuxChannelContext* channel = (struct YamuxChannelContext*)channelStream->stream_context; + channel->channel = id; + channel->child_stream = NULL; + channel->state = yamux_stream_inited; + + + return channelStream; + */ + return 0; } /** @@ -121,25 +138,40 @@ ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx) /*** * Close a stream - * @param stream the stream + * @param context the YamuxChannelContext or YamuxContext * @returns the number of bytes sent */ -ssize_t yamux_stream_close(struct YamuxChannelContext* channel_ctx) +ssize_t yamux_stream_close(void* context) { - if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed) - return -EINVAL; + if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) { + struct YamuxChannelContext* channel_ctx = (struct YamuxChannelContext*) context; + if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed) + return -EINVAL; - struct yamux_frame f = (struct yamux_frame){ - .version = YAMUX_VERSION, - .type = yamux_frame_window_update, - .flags = yamux_frame_fin, - .streamid = channel_ctx->channel, - .length = 0 - }; + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = yamux_frame_fin, + .streamid = channel_ctx->channel, + .length = 0 + }; - channel_ctx->state = yamux_stream_closing; + channel_ctx->state = yamux_stream_closing; - return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); + return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); + } else if ( ((char*)context)[0] == YAMUX_CONTEXT) { + struct YamuxContext* ctx = (struct YamuxContext*)context; + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = yamux_frame_fin, + .streamid = 0, + .length = 0 + }; + + return yamux_write_frame(ctx, &f); + } + return 0; } /** @@ -165,19 +197,42 @@ ssize_t yamux_stream_reset(struct YamuxChannelContext* channel_ctx) return yamux_write_frame(channel_ctx->yamux_context->stream->stream_context, &f); } -static enum yamux_frame_flags get_flags(struct YamuxChannelContext* ctx) -{ - switch (ctx->state) - { - case yamux_stream_inited: - ctx->state = yamux_stream_syn_sent; - return yamux_frame_syn; - case yamux_stream_syn_recv: - ctx->state = yamux_stream_est; - return yamux_frame_ack; - default: - return 0; - } +/** + * Retrieve the flags for this context + * @param context the context + * @returns the correct flag + */ +enum yamux_frame_flags get_flags(void* context) { + if (context == NULL) + return 0; + if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) { + struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)context; + switch (ctx->state) + { + case yamux_stream_inited: + ctx->state = yamux_stream_syn_sent; + return yamux_frame_syn; + case yamux_stream_syn_recv: + ctx->state = yamux_stream_est; + return yamux_frame_ack; + default: + return 0; + } + } else if ( ((char*)context)[0] == YAMUX_CONTEXT) { + struct YamuxContext* ctx = (struct YamuxContext*)context; + switch (ctx->state) + { + case yamux_stream_inited: + ctx->state = yamux_stream_syn_sent; + return yamux_frame_syn; + case yamux_stream_syn_recv: + ctx->state = yamux_stream_est; + return yamux_frame_ack; + default: + return 0; + } + } + return 0; } /** @@ -300,10 +355,9 @@ void yamux_stream_free(struct yamux_stream* stream) * @param frame the frame * @param incoming the stream bytes (after the frame) * @param incoming_size the size of incoming - * @param session_context the SessionContext * @returns the number of bytes processed (can be zero) or negative number on error */ -ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context) +ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size) { struct yamux_frame f = *frame; diff --git a/yamux/test.c b/yamux/test.c index 7d6a82f..1d8c0b6 100644 --- a/yamux/test.c +++ b/yamux/test.c @@ -153,7 +153,7 @@ int do_client() { } sess->new_stream_fn = on_new; - struct yamux_stream* strm = yamux_stream_new(sess, 0, NULL); + struct yamux_stream* strm = yamux_channel_new(sess, 0, NULL); if (!strm) { printf("yamux_new_stream() failed\n"); diff --git a/yamux/yamux.c b/yamux/yamux.c index 451bd07..3cd6bb2 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -105,7 +105,7 @@ int yamux_receive_protocol(struct YamuxContext* context) { * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { - struct Stream* new_stream = libp2p_yamux_stream_new(stream); + struct Stream* new_stream = libp2p_yamux_stream_new(stream, 1, protocol_context); if (new_stream == NULL) return -1; // upgrade @@ -127,7 +127,7 @@ int yamux_shutdown(void* protocol_context) { struct Libp2pProtocolHandler* libp2p_yamux_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { - handler->context = handler; + handler->context = handlers; handler->CanHandle = yamux_can_handle; handler->HandleMessage = yamux_handle_message; handler->Shutdown = yamux_shutdown; @@ -146,8 +146,11 @@ int libp2p_yamux_close(struct Stream* stream) { return 0; if (stream->stream_context == NULL) return 0; - if (stream->parent_stream->close(stream->parent_stream)) - libp2p_yamux_stream_free(stream); + struct Stream* parent_stream = stream->parent_stream; + // this should close everything above yamux (i.e. the protocols that are riding on top of yamux) + libp2p_yamux_stream_free(stream); + // and this should close everything below + parent_stream->close(parent_stream); return 1; } @@ -180,14 +183,50 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int return 0; // TODO: This is not right. It must be sorted out. struct StreamMessage* msg = *message; - return yamux_decode(channel, msg->data, msg->data_size); + if (yamux_decode(channel, msg->data, msg->data_size, message) == 0) + return 1; } else if (ctx != NULL) { - // We are still negotiating... - return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, yamux_default_timeout); + // We are still negotiating. They are probably attempting to negotiate a new protocol + struct StreamMessage* incoming = NULL; + if (ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &incoming, yamux_default_timeout)) { + // parse the frame + if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) { + libp2p_stream_message_free(incoming); + return 1; + } + libp2p_stream_message_free(incoming); + } } return 0; } +/*** + * Prepare a new Yamux StreamMessage based on another StreamMessage + * NOTE: The frame is not encoded yet + * @param incoming the incoming message + * @returns a new StreamMessage that has a yamux_frame + */ +struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming) { + struct StreamMessage* out = libp2p_stream_message_new(); + if (out != NULL) { + out->data_size = sizeof(struct yamux_frame) + incoming->data_size; + out->data = (uint8_t*) malloc(out->data_size); + if (out->data == NULL) { + libp2p_stream_message_free(out); + return NULL; + } + memset(out->data, 0, out->data_size); + // the first part of the data is the yamux frame + // Set values in the frame, which is the first part of the outgoing message data + struct yamux_frame* frame = (struct yamux_frame*)out->data; + frame->length = incoming->data_size; + frame->type = yamux_frame_data; + frame->version = YAMUX_VERSION; + // the last part of the data is the original data + memcpy(&out->data[sizeof(struct yamux_frame)], incoming->data, incoming->data_size); + } + return out; +} /*** * Write to the remote * @param stream_context the context. Could be a YamuxContext or YamuxChannelContext @@ -209,14 +248,28 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { ctx = (struct YamuxContext*)stream_context; } + if (ctx == NULL && channel == NULL) + return 0; + + struct StreamMessage* outgoing_message = libp2p_yamux_prepare_to_send(message); + // now convert fame for network use + struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data; + // set a few more flags + frame->flags = get_flags(stream_context); + if (channel != NULL) + frame->streamid = channel->channel; + encode_frame(frame); + + int retVal = 0; if (channel != NULL && channel->channel != 0) { // we have an established channel. Use it. - return yamux_stream_write(channel, message->data_size, message->data); + retVal = channel->stream->write(channel->stream->stream_context, outgoing_message); } else if (ctx != NULL) { - // We are still negotiating... - return ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, message); + retVal = ctx->stream->parent_stream->write(ctx->stream->parent_stream, outgoing_message); } - return 0; + libp2p_stream_message_free(outgoing_message); + + return retVal; } /*** @@ -241,12 +294,20 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size return -1; } -struct YamuxContext* libp2p_yamux_context_new() { +/** + * Create a new YamuxContext struct + * @param stream the parent stream + * @returns a YamuxContext + */ +struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) { struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); if (ctx != NULL) { ctx->type = YAMUX_CONTEXT; ctx->stream = NULL; ctx->channels = libp2p_utils_vector_new(1); + ctx->session = yamux_session_new(NULL, stream, yamux_session_server, NULL); + ctx->am_server = 0; + ctx->state = 0; } return ctx; } @@ -324,14 +385,31 @@ int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_ return libp2p_yamux_stream_add(yamux_context, new_stream); } +void libp2p_yamux_read_from_yamux_session(struct yamux_stream* stream, uint32_t data_len, void* data) { + +} + +/*** + * Internal yamux code calls this when a new stream is created + * @param context the context + * @param stream the new stream + */ +void libp2p_yamux_new_stream(struct YamuxContext* context, struct Stream* stream, struct StreamMessage* msg) { + // ok, we have the new stream structure. We now need to read what was sent. + libp2p_protocol_marshal(msg, stream, context->protocol_handlers); +} + /*** * Negotiate the Yamux protocol * @param parent_stream the parent stream + * @param am_server true(1) if we are considered the server, false(0) if we are the client. + * @param protocol_handlers the protocol handlers * @returns a Stream initialized and ready for yamux */ -struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { +struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_server, struct Libp2pVector* protocol_handlers) { struct Stream* out = libp2p_stream_new(); if (out != NULL) { + out->stream_type = STREAM_TYPE_YAMUX; out->parent_stream = parent_stream; out->close = libp2p_yamux_close; out->read = libp2p_yamux_read; @@ -341,13 +419,16 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { out->handle_upgrade = libp2p_yamux_handle_upgrade; out->address = parent_stream->address; // build YamuxContext - struct YamuxContext* ctx = libp2p_yamux_context_new(); + struct YamuxContext* ctx = libp2p_yamux_context_new(out); if (ctx == NULL) { libp2p_yamux_stream_free(out); return NULL; } + ctx->session->new_stream_fn = libp2p_yamux_new_stream; out->stream_context = ctx; ctx->stream = out; + ctx->am_server = am_server; + ctx->protocol_handlers = protocol_handlers; // attempt to negotiate yamux protocol if (!libp2p_yamux_negotiate(ctx)) { libp2p_yamux_stream_free(out); @@ -391,6 +472,8 @@ void libp2p_yamux_context_free(struct YamuxContext* ctx) { } libp2p_utils_vector_free(ctx->channels); } + if (ctx->session != NULL) + yamux_session_free(ctx->session); free(ctx); return; } @@ -419,34 +502,54 @@ int libp2p_yamux_channel_null_close(struct Stream* stream) { /** * Create a stream that has a "YamuxChannelContext" related to this yamux protocol - * NOTE: This "wraps" the incoming stream, so that the returned stream is the parent - * of the incoming_stream + * NOTE: If incoming_stream is not of the Yamux protocol, this "wraps" the incoming + * stream, so that the returned stream is the parent of the incoming_stream. If the + * incoming stream is of the yamux protocol, the YamuxChannelContext.child_stream + * will be NULL, awaiting an upgrade to fill it in. * @param incoming_stream the stream of the new protocol - * @returns a new Stream that has a YamuxChannelContext, and incoming_stream->parent_stream is set to this stream + * @param channelNumber the channel number (0 if unknown) + * @returns a new Stream that has a YamuxChannelContext */ -struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream) { +struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber) { struct Stream* out = libp2p_stream_new(); if (out != NULL) { + int isYamux = 0; + char first_char = ((uint8_t*)incoming_stream->stream_context)[0]; + if (first_char == YAMUX_CONTEXT) + isYamux = 1; + out->stream_type = STREAM_TYPE_YAMUX; out->address = incoming_stream->address; // don't allow the incoming_stream to close the channel out->close = libp2p_yamux_channel_null_close; - out->parent_stream = incoming_stream->parent_stream; - out->peek = incoming_stream->parent_stream->peek; - out->read = incoming_stream->parent_stream->read; - out->read_raw = incoming_stream->parent_stream->read_raw; - out->socket_mutex = incoming_stream->parent_stream->socket_mutex; struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext)); - ctx->channel = 0; + if (!isYamux) { + out->parent_stream = incoming_stream->parent_stream; + out->peek = incoming_stream->parent_stream->peek; + out->read = incoming_stream->parent_stream->read; + out->read_raw = incoming_stream->parent_stream->read_raw; + out->write = incoming_stream->parent_stream->write; + out->socket_mutex = incoming_stream->parent_stream->socket_mutex; + ctx->yamux_context = incoming_stream->parent_stream->stream_context; + ctx->child_stream = incoming_stream; + // this does the wrap + incoming_stream->parent_stream = out; + } else { + out->parent_stream = incoming_stream; + out->peek = incoming_stream->peek; + out->read = incoming_stream->read; + out->read_raw = incoming_stream->read_raw; + out->write = incoming_stream->write; + out->socket_mutex = incoming_stream->socket_mutex; + ctx->yamux_context = incoming_stream->stream_context; + ctx->child_stream = NULL; + } + ctx->channel = channelNumber; ctx->closed = 0; ctx->state = 0; ctx->window_size = 0; ctx->type = YAMUX_CHANNEL_CONTEXT; - ctx->yamux_context = incoming_stream->parent_stream->stream_context; ctx->stream = out; - ctx->child_stream = incoming_stream; out->stream_context = ctx; - out->write = incoming_stream->parent_stream->write; - incoming_stream->parent_stream = out; } return out; } @@ -461,12 +564,22 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) { if (stream == NULL) return 0; // wrap the new stream in a YamuxChannelContext - struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream); + struct Stream* channel_stream = libp2p_yamux_channel_stream_new(stream, 0); if (channel_stream == NULL) return 0; struct YamuxChannelContext* channel_context = (struct YamuxChannelContext*)channel_stream->stream_context; // the negotiation was successful. Add it to the list of channels that we have int itemNo = libp2p_utils_vector_add(ctx->channels, channel_stream); + // There are 2 streams for each protocol. A server has the even numbered streams, the + // client the odd number streams. If we are the server, we need to kick off the + // process to add a stream of the same type. channel_context->channel = itemNo; + if (ctx->am_server && itemNo % 2 != 0) { + // we're the server, and they have a negotiated a new protocol. + // negotiate a stream for us to talk to them. + struct Stream* yamux_stream = stream->parent_stream->parent_stream; + struct Stream* server_to_client_stream = stream->negotiate(yamux_stream); + libp2p_yamux_stream_add(ctx, server_to_client_stream); + } return 1; }