diff --git a/include/libp2p/yamux/frame.h b/include/libp2p/yamux/frame.h index 6990c0b..a31b027 100644 --- a/include/libp2p/yamux/frame.h +++ b/include/libp2p/yamux/frame.h @@ -37,7 +37,16 @@ struct yamux_frame }; #pragma pack(pop) +/*** + * convert the frame so it can be sent over the network (makes the endienness correct) + * @param frame the frame to encode + */ void encode_frame(struct yamux_frame* frame); + +/*** + * Convert the frame from the network format to the local format (corrects endienness) + * @param frame the frame to decode + */ void decode_frame(struct yamux_frame* frame); diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h index 5f1fa85..d15c8ee 100644 --- a/include/libp2p/yamux/yamux.h +++ b/include/libp2p/yamux/yamux.h @@ -7,6 +7,16 @@ * Declarations for the Yamux protocol */ +static const int yamux_default_timeout = 10; + +/*** + * Context struct for Yamux + */ +struct YamuxContext { + struct Stream* stream; + struct yamux_session* session; +}; + /** * Build a handler that can handle the yamux protocol */ diff --git a/yamux/frame.c b/yamux/frame.c index 7c3a05f..0582a60 100644 --- a/yamux/frame.c +++ b/yamux/frame.c @@ -22,6 +22,10 @@ static void set_eness() eness = big; } +/*** + * convert the frame so it can be sent over the network (makes the endienness correct) + * @param frame the frame to encode + */ void encode_frame(struct yamux_frame* frame) { if (eness == unk) @@ -31,6 +35,11 @@ void encode_frame(struct yamux_frame* frame) frame->streamid = htonl(frame->streamid); frame->length = htonl(frame->length ); } + +/*** + * Convert the frame from the network format to the local format (corrects endienness) + * @param frame the frame to decode + */ void decode_frame(struct yamux_frame* frame) { if (eness == unk) diff --git a/yamux/yamux.c b/yamux/yamux.c index f9f0355..f594faa 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -2,6 +2,7 @@ #include #include "varint.h" #include "libp2p/yamux/session.h" +#include "libp2p/yamux/yamux.h" #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" #include "libp2p/conn/session.h" @@ -171,14 +172,130 @@ struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* return handler; } +int libp2p_yamux_close(void* stream_context) { + //TODO: Implement + return 0; +} + +/** + * Read from the network, expecting a yamux frame. + * NOTE: This will also dispatch the frame to the correct protocol + * @param stream_context the YamuxContext + * @param message the resultant message + * @param timeout_secs when to give up + * @returns true(1) on success, false(0) on failure + */ +int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) { + struct YamuxContext* ctx = (struct YamuxContext*)stream_context; + struct Stream* parent_stream = ctx->stream->parent_stream; + + struct StreamMessage* incoming; + if (!parent_stream->read(parent_stream->stream_context, &incoming, timeout_secs)) + return 0; + + // we've got bytes from the network. process them as a yamux frame + return yamux_decode(ctx->session, incoming->data, incoming->data_size); +} + +int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) { + //TODO: Implement + return 0; +} + +/*** + * Check to see if there is anything waiting on the network. + * @param stream_context the YamuxContext + * @returns the number of bytes waiting, or -1 on error + */ +int libp2p_yamux_peek(void* stream_context) { + if (stream_context == NULL) + return -1; + + struct YamuxContext* ctx = (struct YamuxContext*)stream_context; + struct Stream* parent_stream = ctx->stream->parent_stream; + if (parent_stream == NULL) + return -1; + + return parent_stream->peek(parent_stream->stream_context); +} + +int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) { + //TODO: Implement + return -1; +} + +struct YamuxContext* libp2p_yamux_context_new() { + struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); + if (ctx != NULL) { + ctx->stream = NULL; + } + return ctx; +} + +void libp2p_yamux_stream_free(struct Stream* yamux_stream) { + //TODO: Implement +} + +int libp2p_yamux_negotiate(struct YamuxContext* ctx) { + const char* protocolID = "/yamux/1.0.0\n"; + struct StreamMessage outgoing; + struct StreamMessage* results = NULL; + int retVal = 0; + int haveTheirs = 0; + int peek_result = 0; + + // see if they're trying to send something first + peek_result = libp2p_yamux_peek(ctx); + if (peek_result > 0) { + libp2p_logger_debug("yamux", "There is %d bytes waiting for us. Perhaps it is the yamux header we're expecting.\n", peek_result); + // get the protocol + ctx->stream->parent_stream->read(ctx->stream->parent_stream, &results, yamux_default_timeout); + if (results == NULL || results->data_size == 0) { + libp2p_logger_error("yamux", "We thought we had a yamux header, but we got nothing.\n"); + goto exit; + } + if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { + libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); + goto exit; + } + haveTheirs = 1; + } + + // send the protocol id + outgoing.data = (uint8_t*)protocolID; + outgoing.data_size = strlen(protocolID); + if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) { + libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n"); + goto exit; + } + + // wait for them to send the protocol id back + if (!haveTheirs) { + // expect the same back + ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, yamux_default_timeout); + if (results == NULL || results->data_size == 0) { + libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we got nothing.\n"); + goto exit; + } + if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) { + libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data); + goto exit; + } + } + + retVal = 1; + exit: + if (results != NULL) + free(results); + return retVal; +} + /*** * Negotiate the Yamux protocol * @param parent_stream the parent stream * @returns a Stream initialized and ready for yamux */ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { - struct Stream* out = NULL; - /* struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream)); if (out != NULL) { out->parent_stream = parent_stream; @@ -189,21 +306,18 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { out->read_raw = libp2p_yamux_read_raw; out->address = parent_stream->address; // build YamuxContext - struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); + struct YamuxContext* ctx = libp2p_yamux_context_new(); if (ctx == NULL) { - libp2p_net_multistream_stream_free(out); + libp2p_yamux_stream_free(out); return NULL; } out->stream_context = ctx; ctx->stream = out; - ctx->handlers = NULL; - ctx->session_context = NULL; // attempt to negotiate yamux protocol if (!libp2p_yamux_negotiate(ctx)) { libp2p_yamux_stream_free(out); return NULL; } } - */ return out; }