First commit of yamux protocol

yamux
jmjatlanta 2017-10-11 11:23:25 -05:00
parent 68242a6355
commit 6b185e31bd
16 changed files with 1332 additions and 14 deletions

View File

@ -65,7 +65,6 @@
<buildTargets>
<target name="all" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>all</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>false</useDefaultCommand>
@ -73,7 +72,6 @@
</target>
<target name="clean" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>clean</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>false</useDefaultCommand>
@ -81,6 +79,7 @@
</target>
<target name="test" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>test</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>false</useDefaultCommand>
@ -88,12 +87,35 @@
</target>
<target name="rebuild" path="" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>rebuild</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>true</runAllBuilders>
</target>
<target name="test" path="yamux" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>test</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>true</runAllBuilders>
</target>
<target name="clean" path="yamux" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>clean</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>true</runAllBuilders>
</target>
<target name="all" path="yamux" targetID="org.eclipse.cdt.build.MakeTargetBuilder">
<buildCommand>make</buildCommand>
<buildArguments/>
<buildTarget>all</buildTarget>
<stopOnError>true</stopOnError>
<useDefaultCommand>true</useDefaultCommand>
<runAllBuilders>true</runAllBuilders>
</target>
</buildTargets>
</storageModule>
</cproject>

View File

@ -15,7 +15,8 @@ OBJS = \
record/*.o \
routing/*.o \
secio/*.o \
utils/*.o
utils/*.o \
yamux/*.o
link:
ar rcs libp2p.a $(OBJS)
@ -33,6 +34,7 @@ compile:
cd routing; make all;
cd secio; make all;
cd utils; make all;
cd yamux; make all;
test: compile link
cd test; make all;
@ -55,5 +57,6 @@ clean:
cd secio; make clean;
cd utils; make clean;
cd test; make clean;
cd yamux; make clean;
rm -rf libp2p.a

View File

@ -0,0 +1,11 @@
#pragma once
/**
* mac doesn't have timespec_get
*/
#ifdef __MACH__
#include <time.h>
#define TIME_UTC 1
int timespec_get(struct timespec *ts, int base);
#endif

View File

@ -0,0 +1,19 @@
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <time.h>
struct yamux_config
{
size_t accept_backlog ;
uint32_t max_stream_window_size;
};
#define YAMUX_DEFAULT_WINDOW (0x100*0x400)
#define YAMUX_DEFAULT_CONFIG ((struct yamux_config)\
{\
.accept_backlog=0x100,\
.max_stream_window_size=YAMUX_DEFAULT_WINDOW\
})\

View File

@ -0,0 +1,43 @@
#pragma once
#include <stdint.h>
#include <stddef.h>
typedef uint8_t yamux_version ;
typedef uint32_t yamux_streamid;
#define YAMUX_VERSION (0x00)
#define YAMUX_STREAMID_SESSION (0)
enum yamux_frame_type
{
yamux_frame_data = 0x00,
yamux_frame_window_update = 0x01,
yamux_frame_ping = 0x02,
yamux_frame_go_away = 0x03
};
enum yamux_frame_flags
{
yamux_frame_nil = 0x0000,
yamux_frame_syn = 0x0001,
yamux_frame_ack = 0x0002,
yamux_frame_fin = 0x0004,
yamux_frame_rst = 0x0008
};
#pragma pack(push,1)
struct yamux_frame
{
yamux_version version ;
uint8_t type ;
uint16_t flags ;
yamux_streamid streamid;
uint32_t length ;
};
#pragma pack(pop)
void encode_frame(struct yamux_frame* frame);
void decode_frame(struct yamux_frame* frame);

View File

@ -0,0 +1,100 @@
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <time.h>
#include "config.h"
#include "frame.h"
#include "stream.h"
#include "libp2p/conn/session.h"
enum yamux_session_type
{
yamux_session_client,
yamux_session_server
};
enum yamux_error
{
yamux_error_normal = 0x00,
yamux_error_protoc = 0x01,
yamux_error_intern = 0x02
};
struct yamux_session;
struct yamux_stream;
typedef void* (*yamux_session_get_str_ud_fn)(struct yamux_session* session, yamux_streamid newid );
typedef void (*yamux_session_ping_fn )(struct yamux_session* session, uint32_t val );
typedef void (*yamux_session_pong_fn )(struct yamux_session* session, uint32_t val, struct timespec dt);
typedef void (*yamux_session_go_away_fn )(struct yamux_session* session, enum yamux_error err );
typedef void (*yamux_session_new_stream_fn)(struct yamux_session* session, struct yamux_stream* stream);
typedef void (*yamux_session_free_fn )(struct yamux_session* sesssion );
struct yamux_session_stream
{
struct yamux_stream* stream;
int alive;
};
struct yamux_session
{
struct yamux_config* config;
size_t num_streams;
size_t cap_streams;
struct yamux_session_stream* streams;
yamux_session_get_str_ud_fn get_str_ud_fn;
yamux_session_ping_fn ping_fn ;
yamux_session_pong_fn pong_fn ;
yamux_session_go_away_fn go_away_fn ;
yamux_session_new_stream_fn new_stream_fn;
yamux_session_free_fn free_fn ;
void* userdata;
struct timespec since_ping;
enum yamux_session_type type;
struct SessionContext* session_context;
yamux_streamid nextid;
int closed;
};
/***
* Create a new yamux session
* @param config the configuration
* @param sock the socket
* @param type session type (yamux_session_server or yamux_session_client)
* @param userdata user data
* @returns the yamux_session struct
*/
struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata);
// does not close the socket, but does close the session
void yamux_session_free(struct yamux_session* session);
// does not free used memory
ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err);
inline ssize_t yamux_session_go_away(struct yamux_session* session, enum yamux_error err)
{
return yamux_session_close(session, err);
}
ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong);
// defers to stream read handlers
ssize_t yamux_session_read(struct yamux_session* session);
/**
* Decode an incoming message
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise
*/
int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size);

View File

@ -0,0 +1,75 @@
#pragma once
#include <stddef.h>
#include <stdint.h>
#include "session.h"
#include "libp2p/conn/session.h"
// NOTE: 'data' is not guaranteed to be preserved when the read_fn
// handler exists (read: it will be freed).
struct yamux_stream;
typedef void (*yamux_stream_read_fn)(struct yamux_stream* stream, uint32_t data_length, void* data);
typedef void (*yamux_stream_fin_fn )(struct yamux_stream* stream);
typedef void (*yamux_stream_rst_fn )(struct yamux_stream* stream);
typedef void (*yamux_stream_free_fn)(struct yamux_stream* stream);
enum yamux_stream_state
{
yamux_stream_inited,
yamux_stream_syn_sent,
yamux_stream_syn_recv,
yamux_stream_est,
yamux_stream_closing,
yamux_stream_closed
};
struct yamux_stream
{
struct yamux_session* session;
yamux_stream_read_fn read_fn;
yamux_stream_fin_fn fin_fn ;
yamux_stream_rst_fn rst_fn ;
yamux_stream_free_fn free_fn;
void* userdata;
enum yamux_stream_state state;
yamux_streamid id;
uint32_t window_size;
};
// does not init the stream
struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata);
// not obligatory, SYN is sent by yamux_stream_write when the stream
// isn't initialised anyway
ssize_t yamux_stream_init (struct yamux_stream* stream);
// doesn't free the stream
// uses FIN
ssize_t yamux_stream_close(struct yamux_stream* stream);
// uses RST
ssize_t yamux_stream_reset(struct yamux_stream* stream);
void yamux_stream_free(struct yamux_stream* stream);
ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta);
ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data);
/***
* process stream
* @param stream the stream
* @param frame the frame
* @param incoming the stream bytes (after the frame)
* @param incoming_size the size of incoming
* @param session_context the SessionContext
* @returns the number of bytes processed (can be zero) or negative number on error
*/
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context);

View File

@ -0,0 +1,8 @@
#pragma once
#include "libp2p/net/protocol.h"
/**
* Build a handler that can handle the yamux protocol
*/
struct Libp2pProtocolHandler* yamux_build_protocol_handler();

View File

@ -65,18 +65,18 @@ int libp2p_net_multistream_handle_message(const uint8_t *incoming, size_t incomi
if (libp2p_net_multistream_can_handle(results, bytes_read))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
retVal = libp2p_protocol_marshal(results, bytes_read, context, multistream_context->handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
if (numRetries >= max_retries)
break;
numRetries++;
}
}

25
os/timespec.c Normal file
View File

@ -0,0 +1,25 @@
#ifdef __MACH__
#include <time.h>
#include <sys/time.h>
#include <mach/clock.h>
#include <mach/mach.h>
#include "libp2p/os/timespec.h"
int timespec_get(struct timespec *ts, int base) {
switch (base) {
case TIME_UTC: {
clock_serv_t cclock;
mach_timespec_t mts;
host_get_clock_service(mach_host_self(), CALENDAR_CLOCK, &cclock);
clock_get_time(cclock, &mts);
mach_port_deallocate(mach_task_self(), cclock);
ts->tv_sec = mts.tv_sec;
ts->tv_nsec = mts.tv_nsec;
return base;
}
}
return 0;
}
#endif

22
yamux/Makefile Normal file
View File

@ -0,0 +1,22 @@
CC = gcc
CFLAGS = -O0 -Wall -Werror -I../include -I../../c-protobuf -std=c11
ifdef DEBUG
CFLAGS += -g3
endif
LFLAGS =
DEPS =
OBJS = frame.o session.o stream.o yamux.o ../os/timespec.o
%.o: %.c
$(CC) -c -o $@ $< $(CFLAGS)
all: $(OBJS)
clean:
rm -f *.o
rm -f test
test: all test.o
$(CC) -o test test.o $(OBJS) $(CFLAGS)

42
yamux/frame.c Normal file
View File

@ -0,0 +1,42 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include "libp2p/yamux/frame.h"
enum eness
{
unk,
little,
big
};
static enum eness eness = unk;
static void set_eness()
{
uint16_t x = 1;
if (*(char*)&x == 1)
eness = little;
else
eness = big;
}
void encode_frame(struct yamux_frame* frame)
{
if (eness == unk)
set_eness();
frame->flags = htons(frame->flags );
frame->streamid = htonl(frame->streamid);
frame->length = htonl(frame->length );
}
void decode_frame(struct yamux_frame* frame)
{
if (eness == unk)
set_eness();
frame->flags = ntohs(frame->flags );
frame->streamid = ntohl(frame->streamid);
frame->length = ntohl(frame->length );
}

265
yamux/session.c Normal file
View File

@ -0,0 +1,265 @@
#include <memory.h>
#include <string.h>
#include <sys/socket.h>
#include <errno.h>
#include <stdlib.h>
#include <stdio.h>
#include <time.h>
#include "libp2p/net/stream.h"
#include "libp2p/os/timespec.h"
#include "libp2p/yamux/session.h"
#include "libp2p/yamux/stream.h"
static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG;
/***
* Create a new yamux session
* @param config the configuration
* @param sock the socket
* @param type session type (yamux_session_server or yamux_session_client)
* @param userdata user data
* @returns the yamux_session struct
*/
struct yamux_session* yamux_session_new(struct yamux_config* config, struct SessionContext* session_context, enum yamux_session_type type, void* userdata)
{
if (!session_context)
return NULL;
if (!config)
config = &dcfg;
size_t ab = config->accept_backlog;
struct yamux_session_stream* streams =
(struct yamux_session_stream*)malloc(sizeof(struct yamux_session_stream) * ab);
for (size_t i = 0; i < ab; ++i)
streams[i].alive = 0;
struct yamux_session* sess = (struct yamux_session*)malloc(sizeof(struct yamux_session));
if (sess != NULL) {
sess->config = config;
sess->type = type;
sess->session_context = session_context;
sess->closed = 0;
sess->nextid = 1 + (type == yamux_session_server);
sess->num_streams = 0;
sess->cap_streams = 0;
sess->streams = streams;
struct timespec ts;
ts.tv_sec = 0;
ts.tv_nsec = 0;
sess->since_ping = ts;
sess->get_str_ud_fn = NULL;
sess->ping_fn = NULL;
sess->pong_fn = NULL;
sess->go_away_fn = NULL;
sess->free_fn = NULL;
sess->userdata = userdata;
}
return sess;
}
void yamux_session_free(struct yamux_session* session)
{
if (!session)
return;
if (!session->closed)
yamux_session_close(session, yamux_error_normal);
if (session->free_fn)
session->free_fn(session);
for (size_t i = 0; i < session->cap_streams; ++i)
if (session->streams[i].alive)
yamux_stream_free(session->streams[i].stream);
free(session->streams);
free(session);
}
/***
* Close a yamux session
* @param session the yamux_session to close
* @param err why we're closing
*/
ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err)
{
if (!session)
return -EINVAL;
if (session->closed)
return 0;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_go_away,
.flags = 0,
.streamid = YAMUX_STREAMID_SESSION,
.length = (uint32_t)err
};
session->closed = 1;
int sz = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
/***
* Ping
* @param session the session to ping
* @param value the value to send
* @param pong true(1) if we should send the ack, false(0) if we should send the syn (who's side are we on?)
* @returns number of bytes sent
*/
ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int pong)
{
if (!session || session->closed)
return -EINVAL;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_ping,
.flags = pong ? yamux_frame_ack : yamux_frame_syn,
.streamid = YAMUX_STREAMID_SESSION,
.length = value
};
if (!timespec_get(&session->since_ping, TIME_UTC))
return -EACCES;
int sz = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
/**
* Decode an incoming message
* @param incoming the incoming bytes
* @param incoming_size the size of the incoming bytes
* @returns true(1) on success, false(0) otherwise
*/
int yamux_decode(struct yamux_session* session, const uint8_t* incoming, size_t incoming_size) {
// decode frame
struct yamux_frame f;
if (incoming_size < sizeof(struct yamux_frame)) {
return 0;
}
decode_frame(&f);
// check yamux version
if (f.version != YAMUX_VERSION)
return 0;
if (!f.streamid) // we're not dealing with a stream
switch (f.type)
{
case yamux_frame_ping: // ping
if (f.flags & yamux_frame_syn)
{
yamux_session_ping(session, f.length, 1);
if (session->ping_fn)
session->ping_fn(session, f.length);
}
else if ((f.flags & yamux_frame_ack) && session->pong_fn)
{
struct timespec now, dt, last = session->since_ping;
if (!timespec_get(&now, TIME_UTC))
return -EACCES;
dt.tv_sec = now.tv_sec - last.tv_sec;
if (now.tv_nsec < last.tv_nsec)
{
dt.tv_sec--;
dt.tv_nsec = last.tv_nsec - now.tv_nsec;
}
else
dt.tv_nsec = now.tv_nsec - last.tv_nsec;
session->pong_fn(session, f.length, dt);
}
else
return -EPROTO;
break;
case yamux_frame_go_away: // go away (hanging up)
session->closed = 1;
if (session->go_away_fn)
session->go_away_fn(session, (enum yamux_error)f.length);
break;
default:
return -EPROTO;
}
else { // we're handling a stream
for (size_t i = 0; i < session->cap_streams; ++i)
{
struct yamux_session_stream* ss = &session->streams[i];
struct yamux_stream* s = ss->stream;
if (!ss->alive || s->state == yamux_stream_closed)
continue;
if (s->id == f.streamid)
{
if (f.flags & yamux_frame_rst)
{
s->state = yamux_stream_closed;
if (s->rst_fn)
s->rst_fn(s);
}
else if (f.flags & yamux_frame_fin)
{
// local stream didn't initiate FIN
if (s->state != yamux_stream_closing)
yamux_stream_close(s);
s->state = yamux_stream_closed;
if (s->fin_fn)
s->fin_fn(s);
}
else if (f.flags & yamux_frame_ack)
{
if (s->state != yamux_stream_syn_sent)
return -EPROTO;
s->state = yamux_stream_est;
}
else if (f.flags)
return -EPROTO;
int sz = sizeof(struct yamux_frame);
ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz, session->session_context);
return (re < 0) ? re : (re + incoming_size);
}
}
// stream doesn't exist yet
if (f.flags & yamux_frame_syn)
{
void* ud = NULL;
if (session->get_str_ud_fn)
ud = session->get_str_ud_fn(session, f.streamid);
struct yamux_stream* st = yamux_stream_new(session, f.streamid, ud);
if (session->new_stream_fn)
session->new_stream_fn(session, st);
st->state = yamux_stream_syn_recv;
}
else
return -EPROTO;
}
return 0;
}

319
yamux/stream.c Normal file
View File

@ -0,0 +1,319 @@
#include <errno.h>
#include <memory.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include "libp2p/conn/session.h"
#include "libp2p/net/stream.h"
#include "libp2p/yamux/frame.h"
#include "libp2p/yamux/stream.h"
#define MIN(x,y) (y^((x^y)&-(x<y)))
#define MAX(x,y) (x^((x^y)&-(x<y)))
struct yamux_stream* yamux_stream_new(struct yamux_session* session, yamux_streamid id, void* userdata)
{
if (!session)
return NULL;
if (!id)
{
id = session->nextid;
session->nextid += 2;
}
struct yamux_stream* st = NULL;
struct yamux_session_stream* ss;
if (session->num_streams != session->cap_streams)
for (size_t i = 0; i < session->cap_streams; ++i)
{
ss = &session->streams[i];
if (!ss->alive)
{
st = ss->stream;
ss->alive = 1;
goto FOUND;
}
}
if (session->cap_streams == session->config->accept_backlog)
return NULL;
ss = &session->streams[session->cap_streams];
if (ss->alive)
return NULL;
session->cap_streams++;
ss->alive = 1;
st = ss->stream = malloc(sizeof(struct yamux_stream));
FOUND:;
struct yamux_stream nst = (struct yamux_stream){
.id = id,
.session = session,
.state = yamux_stream_inited,
.window_size = YAMUX_DEFAULT_WINDOW,
.read_fn = NULL,
.fin_fn = NULL,
.rst_fn = NULL,
.userdata = userdata
};
*st = nst;
return st;
}
/***
* Initialize a stream between 2 peers
* @param stream the stream to initialize
* @returns the number of bytes sent
*/
ssize_t yamux_stream_init(struct yamux_stream* stream)
{
if (!stream || stream->state != yamux_stream_inited || stream->session->closed) {
return -EINVAL;
}
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_syn,
.streamid = stream->id,
.length = 0
};
stream->state = yamux_stream_syn_sent;
encode_frame(&f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
/***
* Close a stream
* @param stream the stream
* @returns the number of bytes sent
*/
ssize_t yamux_stream_close(struct yamux_stream* stream)
{
if (!stream || stream->state != yamux_stream_est || stream->session->closed)
return -EINVAL;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_fin,
.streamid = stream->id,
.length = 0
};
stream->state = yamux_stream_closing;
encode_frame(&f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
/**
* Reset the stream
* @param stream the stream
* @returns the number of bytes sent
*/
ssize_t yamux_stream_reset(struct yamux_stream* stream)
{
if (!stream || stream->session->closed)
return -EINVAL;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = yamux_frame_rst,
.streamid = stream->id,
.length = 0
};
stream->state = yamux_stream_closed;
encode_frame(&f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
static enum yamux_frame_flags get_flags(struct yamux_stream* stream)
{
switch (stream->state)
{
case yamux_stream_inited:
stream->state = yamux_stream_syn_sent;
return yamux_frame_syn;
case yamux_stream_syn_recv:
stream->state = yamux_stream_est;
return yamux_frame_ack;
default:
return 0;
}
}
/**
* update the window size
* @param stream the stream
* @param delta the new window size
* @returns number of bytes sent
*/
ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta)
{
if (!stream || stream->state == yamux_stream_closed
|| stream->state == yamux_stream_closing || stream->session->closed)
return -EINVAL;
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION,
.type = yamux_frame_window_update,
.flags = get_flags(stream),
.streamid = stream->id,
.length = (uint32_t)delta
};
encode_frame(&f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
}
/***
* Write data to the stream
* @param stream the stream
* @param data_length the length of the data to be sent
* @param data_ the data to be sent
* @return the number of bytes sent
*/
ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, void* data_)
{
if (!((size_t)stream | (size_t)data_) || stream->state == yamux_stream_closed
|| stream->state == yamux_stream_closing || stream->session->closed)
return -EINVAL;
char* data = (char*)data_;
struct yamux_session* s = stream->session;
char* data_end = data + data_length;
uint32_t ws = stream->window_size;
yamux_streamid id = stream->id;
char sendd[ws + sizeof(struct yamux_frame)];
while (data < data_end) {
uint32_t
dr = (uint32_t)(data_end - data),
adv = MIN(dr, ws);
struct yamux_frame f = (struct yamux_frame){
.version = YAMUX_VERSION ,
.type = yamux_frame_data,
.flags = get_flags(stream),
.streamid = id,
.length = adv
};
encode_frame(&f);
memcpy(sendd, &f, sizeof(struct yamux_frame));
memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv);
int sz = adv + sizeof(struct yamux_frame);
if (!s->session_context->default_stream->write(s->session_context, (uint8_t*)sendd, sz))
return adv;
data += adv;
}
return data_end - (char*)data_;
}
/***
* Release resources of stream
* @param stream the stream
*/
void yamux_stream_free(struct yamux_stream* stream)
{
if (!stream)
return;
if (stream->free_fn)
stream->free_fn(stream);
struct yamux_stream s = *stream;
for (size_t i = 0; i < s.session->cap_streams; ++i)
{
struct yamux_session_stream* ss = &s.session->streams[i];
if (ss->alive && ss->stream->id == s.id)
{
ss->alive = 0;
s.session->num_streams--;
if (i == s.session->cap_streams - 1)
s.session->cap_streams--;
break;
}
}
free(stream);
}
/***
* process stream
* @param stream the stream
* @param frame the frame
* @param incoming the stream bytes (after the frame)
* @param incoming_size the size of incoming
* @param session_context the SessionContext
* @returns the number of bytes processed (can be zero) or negative number on error
*/
ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* frame, const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context)
{
struct yamux_frame f = *frame;
switch (f.type)
{
case yamux_frame_data:
{
if (incoming_size != (ssize_t)f.length)
return -1;
if (stream->read_fn)
stream->read_fn(stream, f.length, (void*)incoming);
return incoming_size;
}
case yamux_frame_window_update:
{
uint64_t nws = (uint64_t)((int64_t)stream->window_size + (int64_t)(int32_t)f.length);
nws &= 0xFFFFFFFFLL;
stream->window_size = (uint32_t)nws;
break;
}
default:
return -EPROTO;
}
return 0;
}

235
yamux/test.c Normal file
View File

@ -0,0 +1,235 @@
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <errno.h>
#include "libp2p/yamux/session.h"
#include "libp2p/yamux/frame.h"
#include "libp2p/yamux/stream.h"
static void on_read(struct yamux_stream* stream, uint32_t data_len, void* data)
{
char d[data_len + 1];
d[data_len] = 0;
memcpy(d, data, data_len);
printf("%s", d);
}
static void on_new(struct yamux_session* session, struct yamux_stream* stream)
{
stream->read_fn = on_read;
}
static struct sockaddr_in addr;
int init_server(int sock) {
int err;
//printf("bind\n");
if ((err = bind(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0)
return err;
//printf("listen\n");
if ((err = listen(sock, 0x80)) < 0)
return err;
//printf("accept\n");
return accept(sock, NULL, NULL);
}
int init_client(int sock) {
int err;
//printf("connect\n");
if ((err = connect(sock, (struct sockaddr*)&addr, sizeof(struct sockaddr_in))) < 0)
return err;
return sock;
}
int do_server() {
int sock;
int e = 0;
ssize_t ee;
// init sock
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
e = errno;
printf("socket() failed with %i\n", e);
goto END;
}
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(1337);
int s2 = -1;
ssize_t initr = init_server(sock);
if (initr < 0)
{
e = errno;
printf("init failed with %i, errno=%i\n", (int)-initr, e);
goto FREE_SOCK;
}
s2 = (int)initr;
// init yamux
struct yamux_session* sess = yamux_session_new(NULL, s2,
yamux_session_server
, NULL);
if (!sess)
{
printf("yamux_session_new() failed\n");
goto FREE_SOCK;
}
sess->new_stream_fn = on_new;
for (;;) {
if ((ee = yamux_session_read(sess)) < 0)
{
e = errno;
printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e);
goto KILL_STRM;
}
}
KILL_STRM:
yamux_session_free(sess);
FREE_SOCK:
close(sock);
END:
return 0;
}
int do_client() {
int sock;
int e = 0;
ssize_t ee;
// init sock
if ((sock = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0)
{
e = errno;
printf("socket() failed with %i\n", e);
goto END;
}
memset(&addr, 0, sizeof(struct sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(1337);
int s2 = -1;
ssize_t initr = init_client(sock);
if (initr < 0)
{
e = errno;
printf("init failed with %i, errno=%i\n", (int)-initr, e);
goto FREE_SOCK;
}
s2 = (int)initr;
// init yamux
struct yamux_session* sess = yamux_session_new(NULL, s2,
yamux_session_client
, NULL);
if (!sess)
{
printf("yamux_session_new() failed\n");
goto FREE_SOCK;
}
sess->new_stream_fn = on_new;
struct yamux_stream* strm = yamux_stream_new(sess, 0, NULL);
if (!strm)
{
printf("yamux_new_stream() failed\n");
goto FREE_YAMUX;
}
strm->read_fn = on_read;
if ((ee = yamux_stream_init(strm)) < 0)
{
e = errno;
printf("yamux_stream_init() failed with %i, errno=%i\n", (int)-ee, e);
goto KILL_STRM;
}
char str[] = "hello\n";
if ((ee = yamux_stream_write(strm, 6, str)) < 0)
{
e = errno;
printf("yamux_stream_write() failed with %i, errno=%i\n", (int)-ee, e);
goto KILL_STRM;
}
for (;;) {
if ((ee = yamux_session_read(sess)) < 0)
{
e = errno;
printf("yamux_session_read() failed with %i, errno=%i\n", (int)-ee, e);
goto KILL_STRM;
}
break;
}
KILL_STRM:
if (yamux_stream_reset(strm))
goto FREE_STRM;
FREE_STRM:
yamux_stream_free(strm);
FREE_YAMUX:
yamux_session_free(sess);
FREE_SOCK:
close(sock);
END:
return 0;
}
int main(int argc, char* argv[])
{
int e = 0;
int client = -1;
if (argc < 2) {
e = 1;
} else {
if (strcmp(argv[1], "client") == 0)
client = 1;
if (strcmp(argv[1], "server") == 0)
client = 0;
}
if (e || client == -1) {
fprintf(stderr, "Syntax: %s server or %s client\n", argv[0], argv[0]);
exit(e);
}
if (client)
return do_client();
else
return do_server();
}

129
yamux/yamux.c Normal file
View File

@ -0,0 +1,129 @@
#include <string.h>
#include "varint.h"
#include "libp2p/yamux/session.h"
#include "libp2p/net/protocol.h"
#include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
#include "libp2p/utils/logger.h"
/**
* Determines if this protocol can handle the incoming message
* @param incoming the incoming data
* @param incoming_size the size of the incoming data buffer
* @returns true(1) if it can handle this message, false(0) if not
*/
int yamux_can_handle(const uint8_t* incoming, size_t incoming_size) {
char *protocol = "/yamux/1.0.0\n";
int protocol_size = strlen(protocol);
// is there a varint in front?
size_t num_bytes = 0;
if (incoming[0] != protocol[0] && incoming[1] != protocol[1]) {
varint_decode(incoming, incoming_size, &num_bytes);
}
if (incoming_size >= protocol_size - num_bytes) {
if (strncmp(protocol, (char*) &incoming[num_bytes], protocol_size) == 0)
return 1;
}
return 0;
}
/**
* the yamux stream received some bytes. Process them
* @param stream the stream that the data came in on
* @param incoming_size the size of the stream buffer
* @param incoming the stream buffer
*/
void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8_t* incoming) {
struct Libp2pVector* handlers = stream->userdata;
int retVal = libp2p_protocol_marshal(incoming, incoming_size, stream->session->session_context, handlers);
if (retVal == -1) {
// TODO handle error condition
libp2p_logger_error("yamux", "Marshalling returned error.\n");
} else if (retVal > 0) {
// TODO handle everything went okay
libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n");
} else {
// TODO we've been told we shouldn't do anything anymore
libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n");
}
return;
}
/***
* Handles the message
* @param incoming the incoming data buffer
* @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection
* @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int yamux_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
// they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream
struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(incoming_size);
if (buf == NULL)
return -1;
memcpy(buf, incoming, incoming_size);
for(;;) {
int retVal = yamux_decode(yamux, incoming, incoming_size);
free(buf);
buf = NULL;
if (!retVal)
break;
else { // try to read more from this stream
// TODO need more information as to what this loop should do
}
}
/*
struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context;
uint8_t* results = NULL;
size_t bytes_read = 0;
int numRetries = 0;
int retVal = 0;
int max_retries = 100; // try for 5 minutes
for(;;) {
// try to read for 5 seconds
if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) {
// we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (yamux_can_handle(results, bytes_read))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
}
}
*/
return 0;
}
/**
* Shutting down. Clean up any memory allocations
* @param protocol_context the context
* @returns true(1)
*/
int yamux_shutdown(void* protocol_context) {
return 0;
}
struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = handlers;
handler->CanHandle = yamux_can_handle;
handler->HandleMessage = yamux_handle_message;
handler->Shutdown = yamux_shutdown;
}
return handler;
}