diff --git a/.cproject b/.cproject
index 542ec82..e3a1378 100644
--- a/.cproject
+++ b/.cproject
@@ -65,7 +65,6 @@
make
-
all
true
false
@@ -73,7 +72,6 @@
make
-
clean
true
false
@@ -81,6 +79,7 @@
make
+
test
true
false
@@ -88,12 +87,35 @@
make
-
rebuild
true
true
true
+
+ make
+
+ test
+ true
+ true
+ true
+
+
+ make
+
+ clean
+ true
+ true
+ true
+
+
+ make
+
+ all
+ true
+ true
+ true
+
diff --git a/Makefile b/Makefile
index c9615ec..55105bc 100644
--- a/Makefile
+++ b/Makefile
@@ -15,7 +15,8 @@ OBJS = \
record/*.o \
routing/*.o \
secio/*.o \
- utils/*.o
+ utils/*.o \
+ yamux/*.o
link:
ar rcs libp2p.a $(OBJS)
@@ -33,6 +34,7 @@ compile:
cd routing; make all;
cd secio; make all;
cd utils; make all;
+ cd yamux; make all;
test: compile link
cd test; make all;
@@ -55,5 +57,6 @@ clean:
cd secio; make clean;
cd utils; make clean;
cd test; make clean;
+ cd yamux; make clean;
rm -rf libp2p.a
diff --git a/include/libp2p/os/timespec.h b/include/libp2p/os/timespec.h
new file mode 100644
index 0000000..735b715
--- /dev/null
+++ b/include/libp2p/os/timespec.h
@@ -0,0 +1,11 @@
+#pragma once
+
+/**
+ * mac doesn't have timespec_get
+ */
+
+#ifdef __MACH__
+#include
+#define TIME_UTC 1
+int timespec_get(struct timespec *ts, int base);
+#endif
diff --git a/include/libp2p/yamux/config.h b/include/libp2p/yamux/config.h
new file mode 100644
index 0000000..bb6eb80
--- /dev/null
+++ b/include/libp2p/yamux/config.h
@@ -0,0 +1,19 @@
+#pragma once
+
+#include
+#include
+#include
+
+struct yamux_config
+{
+ size_t accept_backlog ;
+ uint32_t max_stream_window_size;
+};
+
+#define YAMUX_DEFAULT_WINDOW (0x100*0x400)
+
+#define YAMUX_DEFAULT_CONFIG ((struct yamux_config)\
+{\
+ .accept_backlog=0x100,\
+ .max_stream_window_size=YAMUX_DEFAULT_WINDOW\
+})\
diff --git a/include/libp2p/yamux/frame.h b/include/libp2p/yamux/frame.h
new file mode 100644
index 0000000..6990c0b
--- /dev/null
+++ b/include/libp2p/yamux/frame.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#include
+#include
+
+typedef uint8_t yamux_version ;
+typedef uint32_t yamux_streamid;
+
+#define YAMUX_VERSION (0x00)
+#define YAMUX_STREAMID_SESSION (0)
+
+enum yamux_frame_type
+{
+ yamux_frame_data = 0x00,
+ yamux_frame_window_update = 0x01,
+ yamux_frame_ping = 0x02,
+ yamux_frame_go_away = 0x03
+};
+enum yamux_frame_flags
+{
+ yamux_frame_nil = 0x0000,
+
+ yamux_frame_syn = 0x0001,
+ yamux_frame_ack = 0x0002,
+ yamux_frame_fin = 0x0004,
+ yamux_frame_rst = 0x0008
+};
+
+#pragma pack(push,1)
+struct yamux_frame
+{
+ yamux_version version ;
+ uint8_t type ;
+ uint16_t flags ;
+ yamux_streamid streamid;
+ uint32_t length ;
+};
+#pragma pack(pop)
+
+void encode_frame(struct yamux_frame* frame);
+void decode_frame(struct yamux_frame* frame);
+
+
diff --git a/include/libp2p/yamux/session.h b/include/libp2p/yamux/session.h
new file mode 100644
index 0000000..d81b6e8
--- /dev/null
+++ b/include/libp2p/yamux/session.h
@@ -0,0 +1,100 @@
+#pragma once
+
+#include
+#include
+#include
+
+#include "config.h"
+#include "frame.h"
+#include "stream.h"
+#include "libp2p/conn/session.h"
+
+enum yamux_session_type
+{
+ yamux_session_client,
+ yamux_session_server
+};
+enum yamux_error
+{
+ yamux_error_normal = 0x00,
+ yamux_error_protoc = 0x01,
+ yamux_error_intern = 0x02
+};
+
+struct yamux_session;
+struct yamux_stream;
+
+typedef void* (*yamux_session_get_str_ud_fn)(struct yamux_session* session, yamux_streamid newid );
+typedef void (*yamux_session_ping_fn )(struct yamux_session* session, uint32_t val );
+typedef void (*yamux_session_pong_fn )(struct yamux_session* session, uint32_t val, struct timespec dt);
+typedef void (*yamux_session_go_away_fn )(struct yamux_session* session, enum yamux_error err );
+typedef void (*yamux_session_new_stream_fn)(struct yamux_session* session, struct yamux_stream* stream);
+typedef void (*yamux_session_free_fn )(struct yamux_session* sesssion );
+
+struct yamux_session_stream
+{
+ struct yamux_stream* stream;
+ int alive;
+};
+struct yamux_session
+{
+ struct yamux_config* config;
+
+ size_t num_streams;
+ size_t cap_streams;
+ struct yamux_session_stream* streams;
+
+ yamux_session_get_str_ud_fn get_str_ud_fn;
+ yamux_session_ping_fn ping_fn ;
+ yamux_session_pong_fn pong_fn ;
+ yamux_session_go_away_fn go_away_fn ;
+ yamux_session_new_stream_fn new_stream_fn;
+ yamux_session_free_fn free_fn ;
+
+ void* userdata;
+
+ struct timespec since_ping;
+
+ enum yamux_session_type type;
+
+ struct SessionContext* session_context;
+
+ yamux_streamid nextid;
+
+ int closed;
+};
+
+/***
+ * Create a new yamux session
+ * @param config the configuration
+ * @param sock the socket
+ * @param type session type (yamux_session_server or yamux_session_client)
+ * @param userdata user data
+ * @returns the yamux_session struct
+ */
+struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata);
+
+// does not close the socket, but does close the session
+void yamux_session_free(struct yamux_session* session);
+
+// does not free used memory
+ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err);
+inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err)
+{
+ return yamux_session_close(session, err);
+}
+
+ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong);
+
+// defers to stream read handlers
+ssize_t yamux_session_read(struct yamux_session* session);
+
+/**
+ * Decode an incoming message
+ * @param incoming the incoming bytes
+ * @param incoming_size the size of the incoming bytes
+ * @returns true(1) on success, false(0) otherwise
+ */
+int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size);
+
+
diff --git a/include/libp2p/yamux/stream.h b/include/libp2p/yamux/stream.h
new file mode 100644
index 0000000..d7cbe3d
--- /dev/null
+++ b/include/libp2p/yamux/stream.h
@@ -0,0 +1,75 @@
+#pragma once
+
+#include
+#include
+
+#include "session.h"
+#include "libp2p/conn/session.h"
+
+// NOTE: 'data' is not guaranteed to be preserved when the read_fn
+// handler exists (read: it will be freed).
+struct yamux_stream;
+
+typedef void (*yamux_stream_read_fn)(struct yamux_stream* stream, uint32_t data_length, void* data);
+typedef void (*yamux_stream_fin_fn )(struct yamux_stream* stream);
+typedef void (*yamux_stream_rst_fn )(struct yamux_stream* stream);
+typedef void (*yamux_stream_free_fn)(struct yamux_stream* stream);
+
+enum yamux_stream_state
+{
+ yamux_stream_inited,
+ yamux_stream_syn_sent,
+ yamux_stream_syn_recv,
+ yamux_stream_est,
+ yamux_stream_closing,
+ yamux_stream_closed
+};
+
+struct yamux_stream
+{
+ struct yamux_session* session;
+
+ yamux_stream_read_fn read_fn;
+ yamux_stream_fin_fn fin_fn ;
+ yamux_stream_rst_fn rst_fn ;
+ yamux_stream_free_fn free_fn;
+
+ void* userdata;
+
+ enum yamux_stream_state state;
+
+ yamux_streamid id;
+
+ uint32_t window_size;
+};
+
+// does not init the stream
+struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata);
+
+// not obligatory, SYN is sent by yamux_stream_write when the stream
+// isn't initialised anyway
+ssize_t yamux_stream_init (struct yamux_stream* stream);
+
+// doesn't free the stream
+// uses FIN
+ssize_t yamux_stream_close(struct yamux_stream* stream);
+// uses RST
+ssize_t yamux_stream_reset(struct yamux_stream* stream);
+
+void yamux_stream_free(struct yamux_stream* stream);
+
+ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta);
+ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data);
+
+/***
+ * process stream
+ * @param stream the stream
+ * @param frame the frame
+ * @param incoming the stream bytes (after the frame)
+ * @param incoming_size the size of incoming
+ * @param session_context the SessionContext
+ * @returns the number of bytes processed (can be zero) or negative number on error
+ */
+ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context);
+
+
diff --git a/include/libp2p/yamux/yamux.h b/include/libp2p/yamux/yamux.h
new file mode 100644
index 0000000..0001a36
--- /dev/null
+++ b/include/libp2p/yamux/yamux.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include "libp2p/net/protocol.h"
+
+/**
+ * Build a handler that can handle the yamux protocol
+ */
+struct Libp2pProtocolHandler* yamux_build_protocol_handler();
diff --git a/net/multistream.c b/net/multistream.c
index a48b4e9..55b9204 100644
--- a/net/multistream.c
+++ b/net/multistream.c
@@ -65,18 +65,18 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi
if (libp2p_net_multistream_can_handle(results, bytes_read))
continue;
numRetries = 0;
- retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers);
- if (results != NULL)
- free(results);
- // exit the loop on error (or if they ask us to no longer loop by returning 0)
- if (retVal <= 0)
- break;
+ retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers);
+ if (results != NULL)
+ free(results);
+ // exit the loop on error (or if they ask us to no longer loop by returning 0)
+ if (retVal <= 0)
+ break;
} else {
- // we were unable to read from the network.
+ // we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
- if (numRetries >= max_retries)
- break;
- numRetries++;
+ if (numRetries >= max_retries)
+ break;
+ numRetries++;
}
}
diff --git a/os/timespec.c b/os/timespec.c
new file mode 100644
index 0000000..6782504
--- /dev/null
+++ b/os/timespec.c
@@ -0,0 +1,25 @@
+#ifdef __MACH__
+
+#include
+#include
+#include
+#include
+
+#include "libp2p/os/timespec.h"
+
+int timespec_get(struct timespec *ts, int base) {
+ switch (base) {
+ case TIME_UTC: {
+ clock_serv_t cclock;
+ mach_timespec_t mts;
+ host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
+ clock_get_time(cclock, &mts);
+ mach_port_deallocate(mach_task_self(), cclock);
+ ts->tv_sec = mts.tv_sec;
+ ts->tv_nsec = mts.tv_nsec;
+ return base;
+ }
+ }
+ return 0;
+}
+#endif
diff --git a/yamux/Makefile b/yamux/Makefile
new file mode 100644
index 0000000..ff0165b
--- /dev/null
+++ b/yamux/Makefile
@@ -0,0 +1,22 @@
+CC = gcc
+CFLAGS = -O0 -Wall -Werror -I../include -I../../c-protobuf -std=c11
+
+ifdef DEBUG
+CFLAGS += -g3
+endif
+
+LFLAGS =
+DEPS =
+OBJS = frame.o session.o stream.o yamux.o ../os/timespec.o
+
+%.o: %.c
+ $(CC) -c -o $@ $< $(CFLAGS)
+
+all: $(OBJS)
+
+clean:
+ rm -f *.o
+ rm -f test
+
+test: all test.o
+ $(CC) -o test test.o $(OBJS) $(CFLAGS)
\ No newline at end of file
diff --git a/yamux/frame.c b/yamux/frame.c
new file mode 100644
index 0000000..7c3a05f
--- /dev/null
+++ b/yamux/frame.c
@@ -0,0 +1,42 @@
+#include
+#include
+
+#include "libp2p/yamux/frame.h"
+
+enum eness
+{
+ unk,
+ little,
+ big
+};
+
+static enum eness eness = unk;
+
+static void set_eness()
+{
+ uint16_t x = 1;
+
+ if (*(char*)&x == 1)
+ eness = little;
+ else
+ eness = big;
+}
+
+void encode_frame(struct yamux_frame* frame)
+{
+ if (eness == unk)
+ set_eness();
+
+ frame->flags = htons(frame->flags );
+ frame->streamid = htonl(frame->streamid);
+ frame->length = htonl(frame->length );
+}
+void decode_frame(struct yamux_frame* frame)
+{
+ if (eness == unk)
+ set_eness();
+
+ frame->flags = ntohs(frame->flags );
+ frame->streamid = ntohl(frame->streamid);
+ frame->length = ntohl(frame->length );
+}
diff --git a/yamux/session.c b/yamux/session.c
new file mode 100644
index 0000000..a6e2c81
--- /dev/null
+++ b/yamux/session.c
@@ -0,0 +1,265 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "libp2p/net/stream.h"
+#include "libp2p/os/timespec.h"
+#include "libp2p/yamux/session.h"
+#include "libp2p/yamux/stream.h"
+
+static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG;
+
+/***
+ * Create a new yamux session
+ * @param config the configuration
+ * @param sock the socket
+ * @param type session type (yamux_session_server or yamux_session_client)
+ * @param userdata user data
+ * @returns the yamux_session struct
+ */
+struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata)
+{
+ if (!session_context)
+ return NULL;
+
+ if (!config)
+ config = &dcfg;
+
+ size_t ab = config->accept_backlog;
+
+ struct yamux_session_stream* streams =
+ (struct yamux_session_stream*)malloc(sizeof(struct yamux_session_stream) * ab);
+
+ for (size_t i = 0; i < ab; ++i)
+ streams[i].alive = 0;
+
+ struct yamux_session* sess = (struct yamux_session*)malloc(sizeof(struct yamux_session));
+ if (sess != NULL) {
+ sess->config = config;
+ sess->type = type;
+ sess->session_context = session_context;
+ sess->closed = 0;
+ sess->nextid = 1 + (type == yamux_session_server);
+ sess->num_streams = 0;
+ sess->cap_streams = 0;
+ sess->streams = streams;
+ struct timespec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
+ sess->since_ping = ts;
+ sess->get_str_ud_fn = NULL;
+ sess->ping_fn = NULL;
+ sess->pong_fn = NULL;
+ sess->go_away_fn = NULL;
+ sess->free_fn = NULL;
+ sess->userdata = userdata;
+ }
+
+ return sess;
+}
+
+void yamux_session_free(struct yamux_session* session)
+{
+ if (!session)
+ return;
+
+ if (!session->closed)
+ yamux_session_close(session, yamux_error_normal);
+
+ if (session->free_fn)
+ session->free_fn(session);
+
+ for (size_t i = 0; i < session->cap_streams; ++i)
+ if (session->streams[i].alive)
+ yamux_stream_free(session->streams[i].stream);
+
+ free(session->streams);
+ free(session);
+}
+
+/***
+ * Close a yamux session
+ * @param session the yamux_session to close
+ * @param err why we're closing
+ */
+ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err)
+{
+ if (!session)
+ return -EINVAL;
+ if (session->closed)
+ return 0;
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_go_away,
+ .flags = 0,
+ .streamid = YAMUX_STREAMID_SESSION,
+ .length = (uint32_t)err
+ };
+
+ session->closed = 1;
+
+ int sz = sizeof(struct yamux_frame);
+ if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+/***
+ * Ping
+ * @param session the session to ping
+ * @param value the value to send
+ * @param pong true(1) if we should send the ack, false(0) if we should send the syn (who's side are we on?)
+ * @returns number of bytes sent
+ */
+ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong)
+{
+ if (!session || session->closed)
+ return -EINVAL;
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_ping,
+ .flags = pong ? yamux_frame_ack : yamux_frame_syn,
+ .streamid = YAMUX_STREAMID_SESSION,
+ .length = value
+ };
+
+ if (!timespec_get(&session->since_ping, TIME_UTC))
+ return -EACCES;
+
+ int sz = sizeof(struct yamux_frame);
+ if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+/**
+ * Decode an incoming message
+ * @param incoming the incoming bytes
+ * @param incoming_size the size of the incoming bytes
+ * @returns true(1) on success, false(0) otherwise
+ */
+int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size) {
+ // decode frame
+ struct yamux_frame f;
+
+ if (incoming_size < sizeof(struct yamux_frame)) {
+ return 0;
+ }
+ decode_frame(&f);
+
+ // check yamux version
+ if (f.version != YAMUX_VERSION)
+ return 0;
+
+ if (!f.streamid) // we're not dealing with a stream
+ switch (f.type)
+ {
+ case yamux_frame_ping: // ping
+ if (f.flags & yamux_frame_syn)
+ {
+ yamux_session_ping(session, f.length, 1);
+
+ if (session->ping_fn)
+ session->ping_fn(session, f.length);
+ }
+ else if ((f.flags & yamux_frame_ack) && session->pong_fn)
+ {
+ struct timespec now, dt, last = session->since_ping;
+ if (!timespec_get(&now, TIME_UTC))
+ return -EACCES;
+
+ dt.tv_sec = now.tv_sec - last.tv_sec;
+ if (now.tv_nsec < last.tv_nsec)
+ {
+ dt.tv_sec--;
+ dt.tv_nsec = last.tv_nsec - now.tv_nsec;
+ }
+ else
+ dt.tv_nsec = now.tv_nsec - last.tv_nsec;
+
+ session->pong_fn(session, f.length, dt);
+ }
+ else
+ return -EPROTO;
+ break;
+ case yamux_frame_go_away: // go away (hanging up)
+ session->closed = 1;
+ if (session->go_away_fn)
+ session->go_away_fn(session, (enum yamux_error)f.length);
+ break;
+ default:
+ return -EPROTO;
+ }
+ else { // we're handling a stream
+ for (size_t i = 0; i < session->cap_streams; ++i)
+ {
+ struct yamux_session_stream* ss = &session->streams[i];
+ struct yamux_stream* s = ss->stream;
+
+ if (!ss->alive || s->state == yamux_stream_closed)
+ continue;
+
+ if (s->id == f.streamid)
+ {
+ if (f.flags & yamux_frame_rst)
+ {
+ s->state = yamux_stream_closed;
+
+ if (s->rst_fn)
+ s->rst_fn(s);
+ }
+ else if (f.flags & yamux_frame_fin)
+ {
+ // local stream didn't initiate FIN
+ if (s->state != yamux_stream_closing)
+ yamux_stream_close(s);
+
+ s->state = yamux_stream_closed;
+
+ if (s->fin_fn)
+ s->fin_fn(s);
+ }
+ else if (f.flags & yamux_frame_ack)
+ {
+ if (s->state != yamux_stream_syn_sent)
+ return -EPROTO;
+
+ s->state = yamux_stream_est;
+ }
+ else if (f.flags)
+ return -EPROTO;
+
+ int sz = sizeof(struct yamux_frame);
+ ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->session_context);
+ return (re < 0) ? re : (re + incoming_size);
+ }
+ }
+
+ // stream doesn't exist yet
+ if (f.flags & yamux_frame_syn)
+ {
+ void* ud = NULL;
+
+ if (session->get_str_ud_fn)
+ ud = session->get_str_ud_fn(session, f.streamid);
+
+ struct yamux_stream* st = yamux_stream_new(session, f.streamid, ud);
+
+ if (session->new_stream_fn)
+ session->new_stream_fn(session, st);
+
+ st->state = yamux_stream_syn_recv;
+ }
+ else
+ return -EPROTO;
+ }
+ return 0;
+}
+
diff --git a/yamux/stream.c b/yamux/stream.c
new file mode 100644
index 0000000..3b62cec
--- /dev/null
+++ b/yamux/stream.c
@@ -0,0 +1,319 @@
+
+#include
+#include
+#include
+#include
+#include
+
+#include "libp2p/conn/session.h"
+#include "libp2p/net/stream.h"
+#include "libp2p/yamux/frame.h"
+#include "libp2p/yamux/stream.h"
+
+#define MIN(x,y) (y^((x^y)&-(xnextid;
+ session->nextid += 2;
+ }
+
+ struct yamux_stream* st = NULL;
+ struct yamux_session_stream* ss;
+
+ if (session->num_streams != session->cap_streams)
+ for (size_t i = 0; i < session->cap_streams; ++i)
+ {
+ ss = &session->streams[i];
+
+ if (!ss->alive)
+ {
+ st = ss->stream;
+ ss->alive = 1;
+ goto FOUND;
+ }
+ }
+
+ if (session->cap_streams == session->config->accept_backlog)
+ return NULL;
+
+ ss = &session->streams[session->cap_streams];
+
+ if (ss->alive)
+ return NULL;
+
+ session->cap_streams++;
+
+ ss->alive = 1;
+ st = ss->stream = malloc(sizeof(struct yamux_stream));
+
+FOUND:;
+
+ struct yamux_stream nst = (struct yamux_stream){
+ .id = id,
+ .session = session,
+ .state = yamux_stream_inited,
+ .window_size = YAMUX_DEFAULT_WINDOW,
+
+ .read_fn = NULL,
+ .fin_fn = NULL,
+ .rst_fn = NULL,
+
+ .userdata = userdata
+ };
+ *st = nst;
+
+ return st;
+}
+
+/***
+ * Initialize a stream between 2 peers
+ * @param stream the stream to initialize
+ * @returns the number of bytes sent
+ */
+ssize_t yamux_stream_init(struct yamux_stream* stream)
+{
+ if (!stream || stream->state != yamux_stream_inited || stream->session->closed) {
+ return -EINVAL;
+ }
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_window_update,
+ .flags = yamux_frame_syn,
+ .streamid = stream->id,
+ .length = 0
+ };
+
+ stream->state = yamux_stream_syn_sent;
+
+ encode_frame(&f);
+ int sz = sizeof(struct yamux_frame);
+ if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+/***
+ * Close a stream
+ * @param stream the stream
+ * @returns the number of bytes sent
+ */
+ssize_t yamux_stream_close(struct yamux_stream* stream)
+{
+ if (!stream || stream->state != yamux_stream_est || stream->session->closed)
+ return -EINVAL;
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_window_update,
+ .flags = yamux_frame_fin,
+ .streamid = stream->id,
+ .length = 0
+ };
+
+ stream->state = yamux_stream_closing;
+
+ encode_frame(&f);
+ int sz = sizeof(struct yamux_frame);
+ if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+/**
+ * Reset the stream
+ * @param stream the stream
+ * @returns the number of bytes sent
+ */
+ssize_t yamux_stream_reset(struct yamux_stream* stream)
+{
+ if (!stream || stream->session->closed)
+ return -EINVAL;
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_window_update,
+ .flags = yamux_frame_rst,
+ .streamid = stream->id,
+ .length = 0
+ };
+
+ stream->state = yamux_stream_closed;
+
+ encode_frame(&f);
+ int sz = sizeof(struct yamux_frame);
+ if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+static enum yamux_frame_flags get_flags(struct yamux_stream* stream)
+{
+ switch (stream->state)
+ {
+ case yamux_stream_inited:
+ stream->state = yamux_stream_syn_sent;
+ return yamux_frame_syn;
+ case yamux_stream_syn_recv:
+ stream->state = yamux_stream_est;
+ return yamux_frame_ack;
+ default:
+ return 0;
+ }
+}
+
+/**
+ * update the window size
+ * @param stream the stream
+ * @param delta the new window size
+ * @returns number of bytes sent
+ */
+ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta)
+{
+ if (!stream || stream->state == yamux_stream_closed
+ || stream->state == yamux_stream_closing || stream->session->closed)
+ return -EINVAL;
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION,
+ .type = yamux_frame_window_update,
+ .flags = get_flags(stream),
+ .streamid = stream->id,
+ .length = (uint32_t)delta
+ };
+ encode_frame(&f);
+
+ int sz = sizeof(struct yamux_frame);
+ if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
+ return 0;
+ return sz;
+}
+
+/***
+ * Write data to the stream
+ * @param stream the stream
+ * @param data_length the length of the data to be sent
+ * @param data_ the data to be sent
+ * @return the number of bytes sent
+ */
+ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_)
+{
+ if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed
+ || stream->state == yamux_stream_closing || stream->session->closed)
+ return -EINVAL;
+
+ char* data = (char*)data_;
+
+ struct yamux_session* s = stream->session;
+
+ char* data_end = data + data_length;
+
+ uint32_t ws = stream->window_size;
+ yamux_streamid id = stream->id;
+ char sendd[ws + sizeof(struct yamux_frame)];
+
+ while (data < data_end) {
+ uint32_t
+ dr = (uint32_t)(data_end - data),
+ adv = MIN(dr, ws);
+
+ struct yamux_frame f = (struct yamux_frame){
+ .version = YAMUX_VERSION ,
+ .type = yamux_frame_data,
+ .flags = get_flags(stream),
+ .streamid = id,
+ .length = adv
+ };
+
+ encode_frame(&f);
+ memcpy(sendd, &f, sizeof(struct yamux_frame));
+ memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv);
+
+ int sz = adv + sizeof(struct yamux_frame);
+ if (!s->session_context->default_stream->write(s->session_context, (uint8_t*)sendd, sz))
+ return adv;
+
+ data += adv;
+ }
+
+ return data_end - (char*)data_;
+}
+
+/***
+ * Release resources of stream
+ * @param stream the stream
+ */
+void yamux_stream_free(struct yamux_stream* stream)
+{
+ if (!stream)
+ return;
+
+ if (stream->free_fn)
+ stream->free_fn(stream);
+
+ struct yamux_stream s = *stream;
+
+ for (size_t i = 0; i < s.session->cap_streams; ++i)
+ {
+ struct yamux_session_stream* ss = &s.session->streams[i];
+ if (ss->alive && ss->stream->id == s.id)
+ {
+ ss->alive = 0;
+
+ s.session->num_streams--;
+ if (i == s.session->cap_streams - 1)
+ s.session->cap_streams--;
+
+ break;
+ }
+ }
+
+ free(stream);
+}
+
+/***
+ * process stream
+ * @param stream the stream
+ * @param frame the frame
+ * @param incoming the stream bytes (after the frame)
+ * @param incoming_size the size of incoming
+ * @param session_context the SessionContext
+ * @returns the number of bytes processed (can be zero) or negative number on error
+ */
+ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context)
+{
+ struct yamux_frame f = *frame;
+
+ switch (f.type)
+ {
+ case yamux_frame_data:
+ {
+ if (incoming_size != (ssize_t)f.length)
+ return -1;
+
+ if (stream->read_fn)
+ stream->read_fn(stream, f.length, (void*)incoming);
+
+ return incoming_size;
+ }
+ case yamux_frame_window_update:
+ {
+ uint64_t nws = (uint64_t)((int64_t)stream->window_size + (int64_t)(int32_t)f.length);
+ nws &= 0xFFFFFFFFLL;
+ stream->window_size = (uint32_t)nws;
+ break;
+ }
+ default:
+ return -EPROTO;
+ }
+
+ return 0;
+}
+
diff --git a/yamux/test.c b/yamux/test.c
new file mode 100644
index 0000000..7d6a82f
--- /dev/null
+++ b/yamux/test.c
@@ -0,0 +1,235 @@
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "libp2p/yamux/session.h"
+#include "libp2p/yamux/frame.h"
+#include "libp2p/yamux/stream.h"
+
+static void on_read(struct yamux_stream* stream, uint32_t data_len, void* data)
+{
+ char d[data_len + 1];
+ d[data_len] = 0;
+ memcpy(d, data, data_len);
+
+ printf("%s", d);
+}
+static void on_new(struct yamux_session* session, struct yamux_stream* stream)
+{
+ stream->read_fn = on_read;
+}
+
+static struct sockaddr_in addr;
+
+int init_server(int sock) {
+ int err;
+ //printf("bind\n");
+ if ((err = bind(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0)
+ return err;
+
+ //printf("listen\n");
+ if ((err = listen(sock, 0x80)) < 0)
+ return err;
+
+ //printf("accept\n");
+ return accept(sock, NULL, NULL);
+}
+
+int init_client(int sock) {
+ int err;
+ //printf("connect\n");
+ if ((err = connect(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0)
+ return err;
+
+ return sock;
+}
+
+int do_server() {
+ int sock;
+ int e = 0;
+ ssize_t ee;
+
+ // init sock
+ if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
+ {
+ e = errno;
+ printf("socket() failed with %i\n", e);
+
+ goto END;
+ }
+
+ memset(&addr, 0, sizeof(struct sockaddr_in));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+ addr.sin_port = htons(1337);
+
+ int s2 = -1;
+ ssize_t initr = init_server(sock);
+ if (initr < 0)
+ {
+ e = errno;
+ printf("init failed with %i, errno=%i\n", (int)-initr, e);
+
+ goto FREE_SOCK;
+ }
+ s2 = (int)initr;
+
+ // init yamux
+ struct yamux_session* sess = yamux_session_new(NULL, s2,
+ yamux_session_server
+ , NULL);
+ if (!sess)
+ {
+ printf("yamux_session_new() failed\n");
+
+ goto FREE_SOCK;
+ }
+ sess->new_stream_fn = on_new;
+
+ for (;;) {
+ if ((ee = yamux_session_read(sess)) < 0)
+ {
+ e = errno;
+ printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e);
+
+ goto KILL_STRM;
+ }
+
+ }
+
+KILL_STRM:
+ yamux_session_free(sess);
+FREE_SOCK:
+ close(sock);
+END:
+ return 0;
+}
+
+int do_client() {
+ int sock;
+ int e = 0;
+ ssize_t ee;
+
+ // init sock
+ if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
+ {
+ e = errno;
+ printf("socket() failed with %i\n", e);
+
+ goto END;
+ }
+
+ memset(&addr, 0, sizeof(struct sockaddr_in));
+ addr.sin_family = AF_INET;
+ addr.sin_addr.s_addr = inet_addr("127.0.0.1");
+ addr.sin_port = htons(1337);
+
+ int s2 = -1;
+ ssize_t initr = init_client(sock);
+ if (initr < 0)
+ {
+ e = errno;
+ printf("init failed with %i, errno=%i\n", (int)-initr, e);
+
+ goto FREE_SOCK;
+ }
+ s2 = (int)initr;
+
+ // init yamux
+ struct yamux_session* sess = yamux_session_new(NULL, s2,
+ yamux_session_client
+ , NULL);
+ if (!sess)
+ {
+ printf("yamux_session_new() failed\n");
+
+ goto FREE_SOCK;
+ }
+ sess->new_stream_fn = on_new;
+
+ struct yamux_stream* strm = yamux_stream_new(sess, 0, NULL);
+ if (!strm)
+ {
+ printf("yamux_new_stream() failed\n");
+
+ goto FREE_YAMUX;
+ }
+
+ strm->read_fn = on_read;
+
+ if ((ee = yamux_stream_init(strm)) < 0)
+ {
+ e = errno;
+ printf("yamux_stream_init() failed with %i, errno=%i\n", (int)-ee, e);
+
+ goto KILL_STRM;
+ }
+
+ char str[] = "hello\n";
+ if ((ee = yamux_stream_write(strm, 6, str)) < 0)
+ {
+ e = errno;
+ printf("yamux_stream_write() failed with %i, errno=%i\n", (int)-ee, e);
+
+ goto KILL_STRM;
+ }
+
+ for (;;) {
+ if ((ee = yamux_session_read(sess)) < 0)
+ {
+ e = errno;
+ printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e);
+
+ goto KILL_STRM;
+ }
+
+ break;
+ }
+
+KILL_STRM:
+ if (yamux_stream_reset(strm))
+ goto FREE_STRM;
+FREE_STRM:
+ yamux_stream_free(strm);
+
+FREE_YAMUX:
+ yamux_session_free(sess);
+FREE_SOCK:
+ close(sock);
+END:
+ return 0;
+
+}
+
+int main(int argc, char* argv[])
+{
+ int e = 0;
+ int client = -1;
+
+ if (argc < 2) {
+ e = 1;
+ } else {
+ if (strcmp(argv[1], "client") == 0)
+ client = 1;
+ if (strcmp(argv[1], "server") == 0)
+ client = 0;
+ }
+
+ if (e || client == -1) {
+ fprintf(stderr, "Syntax: %s server or %s client\n", argv[0], argv[0]);
+ exit(e);
+ }
+
+ if (client)
+ return do_client();
+ else
+ return do_server();
+
+}
+
diff --git a/yamux/yamux.c b/yamux/yamux.c
new file mode 100644
index 0000000..0c50508
--- /dev/null
+++ b/yamux/yamux.c
@@ -0,0 +1,129 @@
+#include
+#include "varint.h"
+#include "libp2p/yamux/session.h"
+#include "libp2p/net/protocol.h"
+#include "libp2p/net/stream.h"
+#include "libp2p/conn/session.h"
+#include "libp2p/utils/logger.h"
+
+/**
+ * Determines if this protocol can handle the incoming message
+ * @param incoming the incoming data
+ * @param incoming_size the size of the incoming data buffer
+ * @returns true(1) if it can handle this message, false(0) if not
+ */
+int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) {
+ char *protocol = "/yamux/1.0.0\n";
+ int protocol_size = strlen(protocol);
+ // is there a varint in front?
+ size_t num_bytes = 0;
+ if (incoming[0] != protocol[0] && incoming[1] != protocol[1]) {
+ varint_decode(incoming, incoming_size, &num_bytes);
+ }
+ if (incoming_size >= protocol_size - num_bytes) {
+ if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0)
+ return 1;
+ }
+ return 0;
+}
+
+/**
+ * the yamux stream received some bytes. Process them
+ * @param stream the stream that the data came in on
+ * @param incoming_size the size of the stream buffer
+ * @param incoming the stream buffer
+ */
+void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8_t* incoming) {
+ struct Libp2pVector* handlers = stream->userdata;
+ int retVal = libp2p_protocol_marshal(incoming, incoming_size, stream->session->session_context, handlers);
+ if (retVal == -1) {
+ // TODO handle error condition
+ libp2p_logger_error("yamux", "Marshalling returned error.\n");
+ } else if (retVal > 0) {
+ // TODO handle everything went okay
+ libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n");
+ } else {
+ // TODO we've been told we shouldn't do anything anymore
+ libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n");
+ }
+ return;
+}
+
+/***
+ * Handles the message
+ * @param incoming the incoming data buffer
+ * @param incoming_size the size of the incoming data buffer
+ * @param session_context the information about the incoming connection
+ * @param protocol_context the protocol-dependent context
+ * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
+ */
+int yamux_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
+ // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream
+ struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context);
+ uint8_t* buf = (uint8_t*) malloc(incoming_size);
+ if (buf == NULL)
+ return -1;
+ memcpy(buf, incoming, incoming_size);
+ for(;;) {
+ int retVal = yamux_decode(yamux, incoming, incoming_size);
+ free(buf);
+ buf = NULL;
+ if (!retVal)
+ break;
+ else { // try to read more from this stream
+ // TODO need more information as to what this loop should do
+ }
+ }
+
+ /*
+ struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context;
+ uint8_t* results = NULL;
+ size_t bytes_read = 0;
+ int numRetries = 0;
+ int retVal = 0;
+ int max_retries = 100; // try for 5 minutes
+ for(;;) {
+ // try to read for 5 seconds
+ if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) {
+ // we read something from the network. Process it.
+ // NOTE: If it is a multistream protocol that we are receiving, ignore it.
+ if (yamux_can_handle(results, bytes_read))
+ continue;
+ numRetries = 0;
+ retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers);
+ if (results != NULL)
+ free(results);
+ // exit the loop on error (or if they ask us to no longer loop by returning 0)
+ if (retVal <= 0)
+ break;
+ } else {
+ // we were unable to read from the network.
+ // if it timed out, we should try again (if we're not out of retries)
+ if (numRetries >= max_retries)
+ break;
+ numRetries++;
+ }
+ }
+ */
+ return 0;
+}
+
+/**
+ * Shutting down. Clean up any memory allocations
+ * @param protocol_context the context
+ * @returns true(1)
+ */
+int yamux_shutdown(void* protocol_context) {
+ return 0;
+}
+
+struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) {
+ struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
+ if (handler != NULL) {
+ handler->context = handlers;
+ handler->CanHandle = yamux_can_handle;
+ handler->HandleMessage = yamux_handle_message;
+ handler->Shutdown = yamux_shutdown;
+ }
+ return handler;
+}