From 6b185e31bdb8b5fb1dbd58d03898c97d51eff5f6 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Wed, 11 Oct 2017 11:23:25 -0500 Subject: [PATCH] First commit of yamux protocol --- .cproject | 28 ++- Makefile | 5 +- include/libp2p/os/timespec.h | 11 ++ include/libp2p/yamux/config.h | 19 ++ include/libp2p/yamux/frame.h | 43 +++++ include/libp2p/yamux/session.h | 100 +++++++++++ include/libp2p/yamux/stream.h | 75 ++++++++ include/libp2p/yamux/yamux.h | 8 + net/multistream.c | 20 +-- os/timespec.c | 25 +++ yamux/Makefile | 22 +++ yamux/frame.c | 42 +++++ yamux/session.c | 265 +++++++++++++++++++++++++++ yamux/stream.c | 319 +++++++++++++++++++++++++++++++++ yamux/test.c | 235 ++++++++++++++++++++++++ yamux/yamux.c | 129 +++++++++++++ 16 files changed, 1332 insertions(+), 14 deletions(-) create mode 100644 include/libp2p/os/timespec.h create mode 100644 include/libp2p/yamux/config.h create mode 100644 include/libp2p/yamux/frame.h create mode 100644 include/libp2p/yamux/session.h create mode 100644 include/libp2p/yamux/stream.h create mode 100644 include/libp2p/yamux/yamux.h create mode 100644 os/timespec.c create mode 100644 yamux/Makefile create mode 100644 yamux/frame.c create mode 100644 yamux/session.c create mode 100644 yamux/stream.c create mode 100644 yamux/test.c create mode 100644 yamux/yamux.c diff --git a/.cproject b/.cproject index 542ec82..e3a1378 100644 --- a/.cproject +++ b/.cproject @@ -65,7 +65,6 @@ make - all true false @@ -73,7 +72,6 @@ make - clean true false @@ -81,6 +79,7 @@ make + test true false @@ -88,12 +87,35 @@ make - rebuild true true true + + make + + test + true + true + true + + + make + + clean + true + true + true + + + make + + all + true + true + true + diff --git a/Makefile b/Makefile index c9615ec..55105bc 100644 --- a/Makefile +++ b/Makefile @@ -15,7 +15,8 @@ OBJS = \ record/*.o \ routing/*.o \ secio/*.o \ - utils/*.o + utils/*.o \ + yamux/*.o link: ar rcs libp2p.a $(OBJS) @@ -33,6 +34,7 @@ compile: cd routing; make all; cd secio; make all; cd utils; make all; + cd yamux; make all; test: compile link cd test; make all; @@ -55,5 +57,6 @@ clean: cd secio; make clean; cd utils; make clean; cd test; make clean; + cd yamux; make clean; rm -rf libp2p.a diff --git a/include/libp2p/os/timespec.h b/include/libp2p/os/timespec.h new file mode 100644 index 0000000..735b715 --- /dev/null +++ b/include/libp2p/os/timespec.h @@ -0,0 +1,11 @@ +#pragma once + +/** + * mac doesn't have timespec_get + */ + +#ifdef __MACH__ +#include +#define TIME_UTC 1 +int timespec_get(struct timespec *ts, int base); +#endif diff --git a/include/libp2p/yamux/config.h b/include/libp2p/yamux/config.h new file mode 100644 index 0000000..bb6eb80 --- /dev/null +++ b/include/libp2p/yamux/config.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include +#include + +struct yamux_config +{ + size_t accept_backlog ; + uint32_t max_stream_window_size; +}; + +#define YAMUX_DEFAULT_WINDOW (0x100*0x400) + +#define YAMUX_DEFAULT_CONFIG ((struct yamux_config)\ +{\ + .accept_backlog=0x100,\ + .max_stream_window_size=YAMUX_DEFAULT_WINDOW\ +})\ diff --git a/include/libp2p/yamux/frame.h b/include/libp2p/yamux/frame.h new file mode 100644 index 0000000..6990c0b --- /dev/null +++ b/include/libp2p/yamux/frame.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include + +typedef uint8_t yamux_version ; +typedef uint32_t yamux_streamid; + +#define YAMUX_VERSION (0x00) +#define YAMUX_STREAMID_SESSION (0) + +enum yamux_frame_type +{ + yamux_frame_data = 0x00, + yamux_frame_window_update = 0x01, + yamux_frame_ping = 0x02, + yamux_frame_go_away = 0x03 +}; +enum yamux_frame_flags +{ + yamux_frame_nil = 0x0000, + + yamux_frame_syn = 0x0001, + yamux_frame_ack = 0x0002, + yamux_frame_fin = 0x0004, + yamux_frame_rst = 0x0008 +}; + +#pragma pack(push,1) +struct yamux_frame +{ + yamux_version version ; + uint8_t type ; + uint16_t flags ; + yamux_streamid streamid; + uint32_t length ; +}; +#pragma pack(pop) + +void encode_frame(struct yamux_frame* frame); +void decode_frame(struct yamux_frame* frame); + + diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h new file mode 100644 index 0000000..d81b6e8 --- /dev/null +++ b/include/libp2p/yamux/session.h @@ -0,0 +1,100 @@ +#pragma once + +#include +#include +#include + +#include "config.h" +#include "frame.h" +#include "stream.h" +#include "libp2p/conn/session.h" + +enum yamux_session_type +{ + yamux_session_client, + yamux_session_server +}; +enum yamux_error +{ + yamux_error_normal = 0x00, + yamux_error_protoc = 0x01, + yamux_error_intern = 0x02 +}; + +struct yamux_session; +struct yamux_stream; + +typedef void* (*yamux_session_get_str_ud_fn)(struct yamux_session* session, yamux_streamid newid ); +typedef void (*yamux_session_ping_fn )(struct yamux_session* session, uint32_t val ); +typedef void (*yamux_session_pong_fn )(struct yamux_session* session, uint32_t val, struct timespec dt); +typedef void (*yamux_session_go_away_fn )(struct yamux_session* session, enum yamux_error err ); +typedef void (*yamux_session_new_stream_fn)(struct yamux_session* session, struct yamux_stream* stream); +typedef void (*yamux_session_free_fn )(struct yamux_session* sesssion ); + +struct yamux_session_stream +{ + struct yamux_stream* stream; + int alive; +}; +struct yamux_session +{ + struct yamux_config* config; + + size_t num_streams; + size_t cap_streams; + struct yamux_session_stream* streams; + + yamux_session_get_str_ud_fn get_str_ud_fn; + yamux_session_ping_fn ping_fn ; + yamux_session_pong_fn pong_fn ; + yamux_session_go_away_fn go_away_fn ; + yamux_session_new_stream_fn new_stream_fn; + yamux_session_free_fn free_fn ; + + void* userdata; + + struct timespec since_ping; + + enum yamux_session_type type; + + struct SessionContext* session_context; + + yamux_streamid nextid; + + int closed; +}; + +/*** + * Create a new yamux session + * @param config the configuration + * @param sock the socket + * @param type session type (yamux_session_server or yamux_session_client) + * @param userdata user data + * @returns the yamux_session struct + */ +struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata); + +// does not close the socket, but does close the session +void yamux_session_free(struct yamux_session* session); + +// does not free used memory +ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err); +inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err) +{ + return yamux_session_close(session, err); +} + +ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong); + +// defers to stream read handlers +ssize_t yamux_session_read(struct yamux_session* session); + +/** + * Decode an incoming message + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming bytes + * @returns true(1) on success, false(0) otherwise + */ +int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size); + + diff --git a/include/libp2p/yamux/stream.h b/include/libp2p/yamux/stream.h new file mode 100644 index 0000000..d7cbe3d --- /dev/null +++ b/include/libp2p/yamux/stream.h @@ -0,0 +1,75 @@ +#pragma once + +#include +#include + +#include "session.h" +#include "libp2p/conn/session.h" + +// NOTE: 'data' is not guaranteed to be preserved when the read_fn +// handler exists (read: it will be freed). +struct yamux_stream; + +typedef void (*yamux_stream_read_fn)(struct yamux_stream* stream, uint32_t data_length, void* data); +typedef void (*yamux_stream_fin_fn )(struct yamux_stream* stream); +typedef void (*yamux_stream_rst_fn )(struct yamux_stream* stream); +typedef void (*yamux_stream_free_fn)(struct yamux_stream* stream); + +enum yamux_stream_state +{ + yamux_stream_inited, + yamux_stream_syn_sent, + yamux_stream_syn_recv, + yamux_stream_est, + yamux_stream_closing, + yamux_stream_closed +}; + +struct yamux_stream +{ + struct yamux_session* session; + + yamux_stream_read_fn read_fn; + yamux_stream_fin_fn fin_fn ; + yamux_stream_rst_fn rst_fn ; + yamux_stream_free_fn free_fn; + + void* userdata; + + enum yamux_stream_state state; + + yamux_streamid id; + + uint32_t window_size; +}; + +// does not init the stream +struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata); + +// not obligatory, SYN is sent by yamux_stream_write when the stream +// isn't initialised anyway +ssize_t yamux_stream_init (struct yamux_stream* stream); + +// doesn't free the stream +// uses FIN +ssize_t yamux_stream_close(struct yamux_stream* stream); +// uses RST +ssize_t yamux_stream_reset(struct yamux_stream* stream); + +void yamux_stream_free(struct yamux_stream* stream); + +ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta); +ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data); + +/*** + * process stream + * @param stream the stream + * @param frame the frame + * @param incoming the stream bytes (after the frame) + * @param incoming_size the size of incoming + * @param session_context the SessionContext + * @returns the number of bytes processed (can be zero) or negative number on error + */ +ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context); + + diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h new file mode 100644 index 0000000..0001a36 --- /dev/null +++ b/include/libp2p/yamux/yamux.h @@ -0,0 +1,8 @@ +#pragma once + +#include "libp2p/net/protocol.h" + +/** + * Build a handler that can handle the yamux protocol + */ +struct Libp2pProtocolHandler* yamux_build_protocol_handler(); diff --git a/net/multistream.c b/net/multistream.c index a48b4e9..55b9204 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -65,18 +65,18 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi if (libp2p_net_multistream_can_handle(results, bytes_read)) continue; numRetries = 0; - retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers); - if (results != NULL) - free(results); - // exit the loop on error (or if they ask us to no longer loop by returning 0) - if (retVal <= 0) - break; + retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers); + if (results != NULL) + free(results); + // exit the loop on error (or if they ask us to no longer loop by returning 0) + if (retVal <= 0) + break; } else { - // we were unable to read from the network. + // we were unable to read from the network. // if it timed out, we should try again (if we're not out of retries) - if (numRetries >= max_retries) - break; - numRetries++; + if (numRetries >= max_retries) + break; + numRetries++; } } diff --git a/os/timespec.c b/os/timespec.c new file mode 100644 index 0000000..6782504 --- /dev/null +++ b/os/timespec.c @@ -0,0 +1,25 @@ +#ifdef __MACH__ + +#include +#include +#include +#include + +#include "libp2p/os/timespec.h" + +int timespec_get(struct timespec *ts, int base) { + switch (base) { + case TIME_UTC: { + clock_serv_t cclock; + mach_timespec_t mts; + host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock); + clock_get_time(cclock, &mts); + mach_port_deallocate(mach_task_self(), cclock); + ts->tv_sec = mts.tv_sec; + ts->tv_nsec = mts.tv_nsec; + return base; + } + } + return 0; +} +#endif diff --git a/yamux/Makefile b/yamux/Makefile new file mode 100644 index 0000000..ff0165b --- /dev/null +++ b/yamux/Makefile @@ -0,0 +1,22 @@ +CC = gcc +CFLAGS = -O0 -Wall -Werror -I../include -I../../c-protobuf -std=c11 + +ifdef DEBUG +CFLAGS += -g3 +endif + +LFLAGS = +DEPS = +OBJS = frame.o session.o stream.o yamux.o ../os/timespec.o + +%.o: %.c + $(CC) -c -o $@ $< $(CFLAGS) + +all: $(OBJS) + +clean: + rm -f *.o + rm -f test + +test: all test.o + $(CC) -o test test.o $(OBJS) $(CFLAGS) \ No newline at end of file diff --git a/yamux/frame.c b/yamux/frame.c new file mode 100644 index 0000000..7c3a05f --- /dev/null +++ b/yamux/frame.c @@ -0,0 +1,42 @@ +#include +#include + +#include "libp2p/yamux/frame.h" + +enum eness +{ + unk, + little, + big +}; + +static enum eness eness = unk; + +static void set_eness() +{ + uint16_t x = 1; + + if (*(char*)&x == 1) + eness = little; + else + eness = big; +} + +void encode_frame(struct yamux_frame* frame) +{ + if (eness == unk) + set_eness(); + + frame->flags = htons(frame->flags ); + frame->streamid = htonl(frame->streamid); + frame->length = htonl(frame->length ); +} +void decode_frame(struct yamux_frame* frame) +{ + if (eness == unk) + set_eness(); + + frame->flags = ntohs(frame->flags ); + frame->streamid = ntohl(frame->streamid); + frame->length = ntohl(frame->length ); +} diff --git a/yamux/session.c b/yamux/session.c new file mode 100644 index 0000000..a6e2c81 --- /dev/null +++ b/yamux/session.c @@ -0,0 +1,265 @@ + +#include +#include +#include +#include +#include +#include +#include + +#include "libp2p/net/stream.h" +#include "libp2p/os/timespec.h" +#include "libp2p/yamux/session.h" +#include "libp2p/yamux/stream.h" + +static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; + +/*** + * Create a new yamux session + * @param config the configuration + * @param sock the socket + * @param type session type (yamux_session_server or yamux_session_client) + * @param userdata user data + * @returns the yamux_session struct + */ +struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata) +{ + if (!session_context) + return NULL; + + if (!config) + config = &dcfg; + + size_t ab = config->accept_backlog; + + struct yamux_session_stream* streams = + (struct yamux_session_stream*)malloc(sizeof(struct yamux_session_stream) * ab); + + for (size_t i = 0; i < ab; ++i) + streams[i].alive = 0; + + struct yamux_session* sess = (struct yamux_session*)malloc(sizeof(struct yamux_session)); + if (sess != NULL) { + sess->config = config; + sess->type = type; + sess->session_context = session_context; + sess->closed = 0; + sess->nextid = 1 + (type == yamux_session_server); + sess->num_streams = 0; + sess->cap_streams = 0; + sess->streams = streams; + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 0; + sess->since_ping = ts; + sess->get_str_ud_fn = NULL; + sess->ping_fn = NULL; + sess->pong_fn = NULL; + sess->go_away_fn = NULL; + sess->free_fn = NULL; + sess->userdata = userdata; + } + + return sess; +} + +void yamux_session_free(struct yamux_session* session) +{ + if (!session) + return; + + if (!session->closed) + yamux_session_close(session, yamux_error_normal); + + if (session->free_fn) + session->free_fn(session); + + for (size_t i = 0; i < session->cap_streams; ++i) + if (session->streams[i].alive) + yamux_stream_free(session->streams[i].stream); + + free(session->streams); + free(session); +} + +/*** + * Close a yamux session + * @param session the yamux_session to close + * @param err why we're closing + */ +ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err) +{ + if (!session) + return -EINVAL; + if (session->closed) + return 0; + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_go_away, + .flags = 0, + .streamid = YAMUX_STREAMID_SESSION, + .length = (uint32_t)err + }; + + session->closed = 1; + + int sz = sizeof(struct yamux_frame); + if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +/*** + * Ping + * @param session the session to ping + * @param value the value to send + * @param pong true(1) if we should send the ack, false(0) if we should send the syn (who's side are we on?) + * @returns number of bytes sent + */ +ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong) +{ + if (!session || session->closed) + return -EINVAL; + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_ping, + .flags = pong ? yamux_frame_ack : yamux_frame_syn, + .streamid = YAMUX_STREAMID_SESSION, + .length = value + }; + + if (!timespec_get(&session->since_ping, TIME_UTC)) + return -EACCES; + + int sz = sizeof(struct yamux_frame); + if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +/** + * Decode an incoming message + * @param incoming the incoming bytes + * @param incoming_size the size of the incoming bytes + * @returns true(1) on success, false(0) otherwise + */ +int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size) { + // decode frame + struct yamux_frame f; + + if (incoming_size < sizeof(struct yamux_frame)) { + return 0; + } + decode_frame(&f); + + // check yamux version + if (f.version != YAMUX_VERSION) + return 0; + + if (!f.streamid) // we're not dealing with a stream + switch (f.type) + { + case yamux_frame_ping: // ping + if (f.flags & yamux_frame_syn) + { + yamux_session_ping(session, f.length, 1); + + if (session->ping_fn) + session->ping_fn(session, f.length); + } + else if ((f.flags & yamux_frame_ack) && session->pong_fn) + { + struct timespec now, dt, last = session->since_ping; + if (!timespec_get(&now, TIME_UTC)) + return -EACCES; + + dt.tv_sec = now.tv_sec - last.tv_sec; + if (now.tv_nsec < last.tv_nsec) + { + dt.tv_sec--; + dt.tv_nsec = last.tv_nsec - now.tv_nsec; + } + else + dt.tv_nsec = now.tv_nsec - last.tv_nsec; + + session->pong_fn(session, f.length, dt); + } + else + return -EPROTO; + break; + case yamux_frame_go_away: // go away (hanging up) + session->closed = 1; + if (session->go_away_fn) + session->go_away_fn(session, (enum yamux_error)f.length); + break; + default: + return -EPROTO; + } + else { // we're handling a stream + for (size_t i = 0; i < session->cap_streams; ++i) + { + struct yamux_session_stream* ss = &session->streams[i]; + struct yamux_stream* s = ss->stream; + + if (!ss->alive || s->state == yamux_stream_closed) + continue; + + if (s->id == f.streamid) + { + if (f.flags & yamux_frame_rst) + { + s->state = yamux_stream_closed; + + if (s->rst_fn) + s->rst_fn(s); + } + else if (f.flags & yamux_frame_fin) + { + // local stream didn't initiate FIN + if (s->state != yamux_stream_closing) + yamux_stream_close(s); + + s->state = yamux_stream_closed; + + if (s->fin_fn) + s->fin_fn(s); + } + else if (f.flags & yamux_frame_ack) + { + if (s->state != yamux_stream_syn_sent) + return -EPROTO; + + s->state = yamux_stream_est; + } + else if (f.flags) + return -EPROTO; + + int sz = sizeof(struct yamux_frame); + ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->session_context); + return (re < 0) ? re : (re + incoming_size); + } + } + + // stream doesn't exist yet + if (f.flags & yamux_frame_syn) + { + void* ud = NULL; + + if (session->get_str_ud_fn) + ud = session->get_str_ud_fn(session, f.streamid); + + struct yamux_stream* st = yamux_stream_new(session, f.streamid, ud); + + if (session->new_stream_fn) + session->new_stream_fn(session, st); + + st->state = yamux_stream_syn_recv; + } + else + return -EPROTO; + } + return 0; +} + diff --git a/yamux/stream.c b/yamux/stream.c new file mode 100644 index 0000000..3b62cec --- /dev/null +++ b/yamux/stream.c @@ -0,0 +1,319 @@ + +#include +#include +#include +#include +#include + +#include "libp2p/conn/session.h" +#include "libp2p/net/stream.h" +#include "libp2p/yamux/frame.h" +#include "libp2p/yamux/stream.h" + +#define MIN(x,y) (y^((x^y)&-(xnextid; + session->nextid += 2; + } + + struct yamux_stream* st = NULL; + struct yamux_session_stream* ss; + + if (session->num_streams != session->cap_streams) + for (size_t i = 0; i < session->cap_streams; ++i) + { + ss = &session->streams[i]; + + if (!ss->alive) + { + st = ss->stream; + ss->alive = 1; + goto FOUND; + } + } + + if (session->cap_streams == session->config->accept_backlog) + return NULL; + + ss = &session->streams[session->cap_streams]; + + if (ss->alive) + return NULL; + + session->cap_streams++; + + ss->alive = 1; + st = ss->stream = malloc(sizeof(struct yamux_stream)); + +FOUND:; + + struct yamux_stream nst = (struct yamux_stream){ + .id = id, + .session = session, + .state = yamux_stream_inited, + .window_size = YAMUX_DEFAULT_WINDOW, + + .read_fn = NULL, + .fin_fn = NULL, + .rst_fn = NULL, + + .userdata = userdata + }; + *st = nst; + + return st; +} + +/*** + * Initialize a stream between 2 peers + * @param stream the stream to initialize + * @returns the number of bytes sent + */ +ssize_t yamux_stream_init(struct yamux_stream* stream) +{ + if (!stream || stream->state != yamux_stream_inited || stream->session->closed) { + return -EINVAL; + } + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = yamux_frame_syn, + .streamid = stream->id, + .length = 0 + }; + + stream->state = yamux_stream_syn_sent; + + encode_frame(&f); + int sz = sizeof(struct yamux_frame); + if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +/*** + * Close a stream + * @param stream the stream + * @returns the number of bytes sent + */ +ssize_t yamux_stream_close(struct yamux_stream* stream) +{ + if (!stream || stream->state != yamux_stream_est || stream->session->closed) + return -EINVAL; + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = yamux_frame_fin, + .streamid = stream->id, + .length = 0 + }; + + stream->state = yamux_stream_closing; + + encode_frame(&f); + int sz = sizeof(struct yamux_frame); + if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +/** + * Reset the stream + * @param stream the stream + * @returns the number of bytes sent + */ +ssize_t yamux_stream_reset(struct yamux_stream* stream) +{ + if (!stream || stream->session->closed) + return -EINVAL; + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = yamux_frame_rst, + .streamid = stream->id, + .length = 0 + }; + + stream->state = yamux_stream_closed; + + encode_frame(&f); + int sz = sizeof(struct yamux_frame); + if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +static enum yamux_frame_flags get_flags(struct yamux_stream* stream) +{ + switch (stream->state) + { + case yamux_stream_inited: + stream->state = yamux_stream_syn_sent; + return yamux_frame_syn; + case yamux_stream_syn_recv: + stream->state = yamux_stream_est; + return yamux_frame_ack; + default: + return 0; + } +} + +/** + * update the window size + * @param stream the stream + * @param delta the new window size + * @returns number of bytes sent + */ +ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) +{ + if (!stream || stream->state == yamux_stream_closed + || stream->state == yamux_stream_closing || stream->session->closed) + return -EINVAL; + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION, + .type = yamux_frame_window_update, + .flags = get_flags(stream), + .streamid = stream->id, + .length = (uint32_t)delta + }; + encode_frame(&f); + + int sz = sizeof(struct yamux_frame); + if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) + return 0; + return sz; +} + +/*** + * Write data to the stream + * @param stream the stream + * @param data_length the length of the data to be sent + * @param data_ the data to be sent + * @return the number of bytes sent + */ +ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_) +{ + if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed + || stream->state == yamux_stream_closing || stream->session->closed) + return -EINVAL; + + char* data = (char*)data_; + + struct yamux_session* s = stream->session; + + char* data_end = data + data_length; + + uint32_t ws = stream->window_size; + yamux_streamid id = stream->id; + char sendd[ws + sizeof(struct yamux_frame)]; + + while (data < data_end) { + uint32_t + dr = (uint32_t)(data_end - data), + adv = MIN(dr, ws); + + struct yamux_frame f = (struct yamux_frame){ + .version = YAMUX_VERSION , + .type = yamux_frame_data, + .flags = get_flags(stream), + .streamid = id, + .length = adv + }; + + encode_frame(&f); + memcpy(sendd, &f, sizeof(struct yamux_frame)); + memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv); + + int sz = adv + sizeof(struct yamux_frame); + if (!s->session_context->default_stream->write(s->session_context, (uint8_t*)sendd, sz)) + return adv; + + data += adv; + } + + return data_end - (char*)data_; +} + +/*** + * Release resources of stream + * @param stream the stream + */ +void yamux_stream_free(struct yamux_stream* stream) +{ + if (!stream) + return; + + if (stream->free_fn) + stream->free_fn(stream); + + struct yamux_stream s = *stream; + + for (size_t i = 0; i < s.session->cap_streams; ++i) + { + struct yamux_session_stream* ss = &s.session->streams[i]; + if (ss->alive && ss->stream->id == s.id) + { + ss->alive = 0; + + s.session->num_streams--; + if (i == s.session->cap_streams - 1) + s.session->cap_streams--; + + break; + } + } + + free(stream); +} + +/*** + * process stream + * @param stream the stream + * @param frame the frame + * @param incoming the stream bytes (after the frame) + * @param incoming_size the size of incoming + * @param session_context the SessionContext + * @returns the number of bytes processed (can be zero) or negative number on error + */ +ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context) +{ + struct yamux_frame f = *frame; + + switch (f.type) + { + case yamux_frame_data: + { + if (incoming_size != (ssize_t)f.length) + return -1; + + if (stream->read_fn) + stream->read_fn(stream, f.length, (void*)incoming); + + return incoming_size; + } + case yamux_frame_window_update: + { + uint64_t nws = (uint64_t)((int64_t)stream->window_size + (int64_t)(int32_t)f.length); + nws &= 0xFFFFFFFFLL; + stream->window_size = (uint32_t)nws; + break; + } + default: + return -EPROTO; + } + + return 0; +} + diff --git a/yamux/test.c b/yamux/test.c new file mode 100644 index 0000000..7d6a82f --- /dev/null +++ b/yamux/test.c @@ -0,0 +1,235 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "libp2p/yamux/session.h" +#include "libp2p/yamux/frame.h" +#include "libp2p/yamux/stream.h" + +static void on_read(struct yamux_stream* stream, uint32_t data_len, void* data) +{ + char d[data_len + 1]; + d[data_len] = 0; + memcpy(d, data, data_len); + + printf("%s", d); +} +static void on_new(struct yamux_session* session, struct yamux_stream* stream) +{ + stream->read_fn = on_read; +} + +static struct sockaddr_in addr; + +int init_server(int sock) { + int err; + //printf("bind\n"); + if ((err = bind(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0) + return err; + + //printf("listen\n"); + if ((err = listen(sock, 0x80)) < 0) + return err; + + //printf("accept\n"); + return accept(sock, NULL, NULL); +} + +int init_client(int sock) { + int err; + //printf("connect\n"); + if ((err = connect(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0) + return err; + + return sock; +} + +int do_server() { + int sock; + int e = 0; + ssize_t ee; + + // init sock + if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + { + e = errno; + printf("socket() failed with %i\n", e); + + goto END; + } + + memset(&addr, 0, sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + addr.sin_port = htons(1337); + + int s2 = -1; + ssize_t initr = init_server(sock); + if (initr < 0) + { + e = errno; + printf("init failed with %i, errno=%i\n", (int)-initr, e); + + goto FREE_SOCK; + } + s2 = (int)initr; + + // init yamux + struct yamux_session* sess = yamux_session_new(NULL, s2, + yamux_session_server + , NULL); + if (!sess) + { + printf("yamux_session_new() failed\n"); + + goto FREE_SOCK; + } + sess->new_stream_fn = on_new; + + for (;;) { + if ((ee = yamux_session_read(sess)) < 0) + { + e = errno; + printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e); + + goto KILL_STRM; + } + + } + +KILL_STRM: + yamux_session_free(sess); +FREE_SOCK: + close(sock); +END: + return 0; +} + +int do_client() { + int sock; + int e = 0; + ssize_t ee; + + // init sock + if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) + { + e = errno; + printf("socket() failed with %i\n", e); + + goto END; + } + + memset(&addr, 0, sizeof(struct sockaddr_in)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + addr.sin_port = htons(1337); + + int s2 = -1; + ssize_t initr = init_client(sock); + if (initr < 0) + { + e = errno; + printf("init failed with %i, errno=%i\n", (int)-initr, e); + + goto FREE_SOCK; + } + s2 = (int)initr; + + // init yamux + struct yamux_session* sess = yamux_session_new(NULL, s2, + yamux_session_client + , NULL); + if (!sess) + { + printf("yamux_session_new() failed\n"); + + goto FREE_SOCK; + } + sess->new_stream_fn = on_new; + + struct yamux_stream* strm = yamux_stream_new(sess, 0, NULL); + if (!strm) + { + printf("yamux_new_stream() failed\n"); + + goto FREE_YAMUX; + } + + strm->read_fn = on_read; + + if ((ee = yamux_stream_init(strm)) < 0) + { + e = errno; + printf("yamux_stream_init() failed with %i, errno=%i\n", (int)-ee, e); + + goto KILL_STRM; + } + + char str[] = "hello\n"; + if ((ee = yamux_stream_write(strm, 6, str)) < 0) + { + e = errno; + printf("yamux_stream_write() failed with %i, errno=%i\n", (int)-ee, e); + + goto KILL_STRM; + } + + for (;;) { + if ((ee = yamux_session_read(sess)) < 0) + { + e = errno; + printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e); + + goto KILL_STRM; + } + + break; + } + +KILL_STRM: + if (yamux_stream_reset(strm)) + goto FREE_STRM; +FREE_STRM: + yamux_stream_free(strm); + +FREE_YAMUX: + yamux_session_free(sess); +FREE_SOCK: + close(sock); +END: + return 0; + +} + +int main(int argc, char* argv[]) +{ + int e = 0; + int client = -1; + + if (argc < 2) { + e = 1; + } else { + if (strcmp(argv[1], "client") == 0) + client = 1; + if (strcmp(argv[1], "server") == 0) + client = 0; + } + + if (e || client == -1) { + fprintf(stderr, "Syntax: %s server or %s client\n", argv[0], argv[0]); + exit(e); + } + + if (client) + return do_client(); + else + return do_server(); + +} + diff --git a/yamux/yamux.c b/yamux/yamux.c new file mode 100644 index 0000000..0c50508 --- /dev/null +++ b/yamux/yamux.c @@ -0,0 +1,129 @@ +#include +#include "varint.h" +#include "libp2p/yamux/session.h" +#include "libp2p/net/protocol.h" +#include "libp2p/net/stream.h" +#include "libp2p/conn/session.h" +#include "libp2p/utils/logger.h" + +/** + * Determines if this protocol can handle the incoming message + * @param incoming the incoming data + * @param incoming_size the size of the incoming data buffer + * @returns true(1) if it can handle this message, false(0) if not + */ +int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) { + char *protocol = "/yamux/1.0.0\n"; + int protocol_size = strlen(protocol); + // is there a varint in front? + size_t num_bytes = 0; + if (incoming[0] != protocol[0] && incoming[1] != protocol[1]) { + varint_decode(incoming, incoming_size, &num_bytes); + } + if (incoming_size >= protocol_size - num_bytes) { + if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0) + return 1; + } + return 0; +} + +/** + * the yamux stream received some bytes. Process them + * @param stream the stream that the data came in on + * @param incoming_size the size of the stream buffer + * @param incoming the stream buffer + */ +void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8_t* incoming) { + struct Libp2pVector* handlers = stream->userdata; + int retVal = libp2p_protocol_marshal(incoming, incoming_size, stream->session->session_context, handlers); + if (retVal == -1) { + // TODO handle error condition + libp2p_logger_error("yamux", "Marshalling returned error.\n"); + } else if (retVal > 0) { + // TODO handle everything went okay + libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n"); + } else { + // TODO we've been told we shouldn't do anything anymore + libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n"); + } + return; +} + +/*** + * Handles the message + * @param incoming the incoming data buffer + * @param incoming_size the size of the incoming data buffer + * @param session_context the information about the incoming connection + * @param protocol_context the protocol-dependent context + * @returns 0 if the caller should not continue looping, <0 on error, >0 on success + */ +int yamux_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) { + // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream + struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context); + uint8_t* buf = (uint8_t*) malloc(incoming_size); + if (buf == NULL) + return -1; + memcpy(buf, incoming, incoming_size); + for(;;) { + int retVal = yamux_decode(yamux, incoming, incoming_size); + free(buf); + buf = NULL; + if (!retVal) + break; + else { // try to read more from this stream + // TODO need more information as to what this loop should do + } + } + + /* + struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context; + uint8_t* results = NULL; + size_t bytes_read = 0; + int numRetries = 0; + int retVal = 0; + int max_retries = 100; // try for 5 minutes + for(;;) { + // try to read for 5 seconds + if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) { + // we read something from the network. Process it. + // NOTE: If it is a multistream protocol that we are receiving, ignore it. + if (yamux_can_handle(results, bytes_read)) + continue; + numRetries = 0; + retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers); + if (results != NULL) + free(results); + // exit the loop on error (or if they ask us to no longer loop by returning 0) + if (retVal <= 0) + break; + } else { + // we were unable to read from the network. + // if it timed out, we should try again (if we're not out of retries) + if (numRetries >= max_retries) + break; + numRetries++; + } + } + */ + return 0; +} + +/** + * Shutting down. Clean up any memory allocations + * @param protocol_context the context + * @returns true(1) + */ +int yamux_shutdown(void* protocol_context) { + return 0; +} + +struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) { + struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); + if (handler != NULL) { + handler->context = handlers; + handler->CanHandle = yamux_can_handle; + handler->HandleMessage = yamux_handle_message; + handler->Shutdown = yamux_shutdown; + } + return handler; +}