2017-10-11 16:23:25 +00:00
# include <memory.h>
# include <string.h>
# include <sys/socket.h>
# include <errno.h>
# include <stdlib.h>
# include <stdio.h>
# include <time.h>
# include "libp2p/net/stream.h"
# include "libp2p/os/timespec.h"
# include "libp2p/yamux/session.h"
# include "libp2p/yamux/stream.h"
2017-11-06 21:38:55 +00:00
# include "libp2p/yamux/yamux.h"
2017-11-23 11:23:50 +00:00
# include "libp2p/utils/logger.h"
2017-10-11 16:23:25 +00:00
static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG ;
/***
* Create a new yamux session
* @ param config the configuration
* @ param sock the socket
* @ param type session type ( yamux_session_server or yamux_session_client )
* @ param userdata user data
* @ returns the yamux_session struct
*/
2017-11-06 18:36:11 +00:00
struct yamux_session * yamux_session_new ( struct yamux_config * config , struct Stream * parent_stream , enum yamux_session_type type , void * userdata )
2017-10-11 16:23:25 +00:00
{
2017-11-06 18:36:11 +00:00
if ( ! parent_stream )
2017-10-11 16:23:25 +00:00
return NULL ;
if ( ! config )
config = & dcfg ;
size_t ab = config - > accept_backlog ;
struct yamux_session_stream * streams =
( struct yamux_session_stream * ) malloc ( sizeof ( struct yamux_session_stream ) * ab ) ;
for ( size_t i = 0 ; i < ab ; + + i )
streams [ i ] . alive = 0 ;
struct yamux_session * sess = ( struct yamux_session * ) malloc ( sizeof ( struct yamux_session ) ) ;
if ( sess ! = NULL ) {
sess - > config = config ;
sess - > type = type ;
2017-11-06 18:36:11 +00:00
sess - > parent_stream = parent_stream ;
2017-10-11 16:23:25 +00:00
sess - > closed = 0 ;
sess - > nextid = 1 + ( type = = yamux_session_server ) ;
sess - > num_streams = 0 ;
sess - > cap_streams = 0 ;
sess - > streams = streams ;
struct timespec ts ;
ts . tv_sec = 0 ;
ts . tv_nsec = 0 ;
sess - > since_ping = ts ;
sess - > get_str_ud_fn = NULL ;
sess - > ping_fn = NULL ;
sess - > pong_fn = NULL ;
sess - > go_away_fn = NULL ;
sess - > free_fn = NULL ;
sess - > userdata = userdata ;
}
return sess ;
}
void yamux_session_free ( struct yamux_session * session )
{
if ( ! session )
return ;
if ( ! session - > closed )
yamux_session_close ( session , yamux_error_normal ) ;
if ( session - > free_fn )
session - > free_fn ( session ) ;
for ( size_t i = 0 ; i < session - > cap_streams ; + + i )
if ( session - > streams [ i ] . alive )
yamux_stream_free ( session - > streams [ i ] . stream ) ;
free ( session - > streams ) ;
free ( session ) ;
}
/***
* Close a yamux session
* @ param session the yamux_session to close
* @ param err why we ' re closing
*/
ssize_t yamux_session_close ( struct yamux_session * session , enum yamux_error err )
{
if ( ! session )
return - EINVAL ;
if ( session - > closed )
return 0 ;
struct yamux_frame f = ( struct yamux_frame ) {
. version = YAMUX_VERSION ,
. type = yamux_frame_go_away ,
. flags = 0 ,
. streamid = YAMUX_STREAMID_SESSION ,
. length = ( uint32_t ) err
} ;
session - > closed = 1 ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
outgoing . data = ( uint8_t * ) & f ;
outgoing . data_size = sizeof ( struct yamux_frame ) ;
2017-11-06 18:36:11 +00:00
if ( ! session - > parent_stream - > write ( session - > parent_stream - > stream_context , & outgoing ) )
2017-10-11 16:23:25 +00:00
return 0 ;
2017-10-23 14:47:54 +00:00
return outgoing . data_size ;
2017-10-11 16:23:25 +00:00
}
/***
2017-11-27 14:06:33 +00:00
* Respond to a Ping
2017-10-11 16:23:25 +00:00
* @ param session the session to ping
* @ param value the value to send
* @ param pong true ( 1 ) if we should send the ack , false ( 0 ) if we should send the syn ( who ' s side are we on ? )
* @ returns number of bytes sent
*/
ssize_t yamux_session_ping ( struct yamux_session * session , uint32_t value , int pong )
{
if ( ! session | | session - > closed )
return - EINVAL ;
struct yamux_frame f = ( struct yamux_frame ) {
. version = YAMUX_VERSION ,
. type = yamux_frame_ping ,
. flags = pong ? yamux_frame_ack : yamux_frame_syn ,
. streamid = YAMUX_STREAMID_SESSION ,
. length = value
} ;
if ( ! timespec_get ( & session - > since_ping , TIME_UTC ) )
return - EACCES ;
2017-10-23 14:47:54 +00:00
struct StreamMessage outgoing ;
outgoing . data = ( uint8_t * ) & f ;
outgoing . data_size = sizeof ( struct yamux_frame ) ;
2017-11-27 14:06:33 +00:00
if ( ! session - > parent_stream - > parent_stream - > write ( session - > parent_stream - > parent_stream - > stream_context , & outgoing ) )
2017-10-11 16:23:25 +00:00
return 0 ;
2017-10-23 14:47:54 +00:00
return outgoing . data_size ;
2017-10-11 16:23:25 +00:00
}
/**
* Decode an incoming message
2017-11-19 18:37:03 +00:00
* @ param context the YamuxContext or YamuxChannelContext
2017-10-11 16:23:25 +00:00
* @ param incoming the incoming bytes
* @ param incoming_size the size of the incoming bytes
2017-11-19 18:37:03 +00:00
* @ param return_message the results ( usually the stuff after the frame )
* @ returns 0 on success , negative number on error
2017-10-11 16:23:25 +00:00
*/
2017-11-19 18:37:03 +00:00
int yamux_decode ( void * context , const uint8_t * incoming , size_t incoming_size , struct StreamMessage * * return_message ) {
// retrieve the yamux context
struct yamux_session * yamux_session = NULL ;
struct YamuxContext * yamuxContext = NULL ;
if ( context = = NULL )
return 0 ;
if ( ( ( char * ) context ) [ 0 ] = = YAMUX_CONTEXT ) {
yamuxContext = ( struct YamuxContext * ) context ;
yamux_session = yamuxContext - > session ;
} else if ( ( ( char * ) context ) [ 0 ] = = YAMUX_CHANNEL_CONTEXT ) {
struct YamuxChannelContext * channelContext = ( struct YamuxChannelContext * ) context ;
yamuxContext = channelContext - > yamux_context ;
yamux_session = channelContext - > yamux_context - > session ;
}
2017-10-11 16:23:25 +00:00
// decode frame
struct yamux_frame f ;
if ( incoming_size < sizeof ( struct yamux_frame ) ) {
return 0 ;
}
2017-11-06 21:38:55 +00:00
memcpy ( ( void * ) & f , incoming , sizeof ( struct yamux_frame ) ) ;
2017-11-06 18:36:11 +00:00
2017-10-11 16:23:25 +00:00
decode_frame ( & f ) ;
// check yamux version
2017-11-27 14:06:33 +00:00
if ( f . version ! = YAMUX_VERSION ) {
libp2p_logger_error ( " yamux " , " Incorrect Yamux version. Expected %d but received %d. \n " , YAMUX_VERSION , f . version ) ;
2017-10-11 16:23:25 +00:00
return 0 ;
2017-11-27 14:06:33 +00:00
}
2017-10-11 16:23:25 +00:00
2017-11-27 14:06:33 +00:00
if ( ! f . streamid ) { // we're not dealing with a stream, we're dealing with something at the yamux protocol level
libp2p_logger_debug ( " yamux " , " Received frame with no stream id. We must need to do something at the protocol level. \n " ) ;
2017-10-11 16:23:25 +00:00
switch ( f . type )
{
2017-11-27 14:06:33 +00:00
case yamux_frame_ping : {
// ping
libp2p_logger_debug ( " yamux " , " Received a ping. \n " ) ;
2017-10-11 16:23:25 +00:00
if ( f . flags & yamux_frame_syn )
{
2017-11-19 18:37:03 +00:00
yamux_session_ping ( yamux_session , f . length , 1 ) ;
2017-10-11 16:23:25 +00:00
2017-11-19 18:37:03 +00:00
if ( yamux_session - > ping_fn )
yamux_session - > ping_fn ( yamux_session , f . length ) ;
2017-10-11 16:23:25 +00:00
}
2017-11-19 18:37:03 +00:00
else if ( ( f . flags & yamux_frame_ack ) & & yamux_session - > pong_fn )
2017-10-11 16:23:25 +00:00
{
2017-11-19 18:37:03 +00:00
struct timespec now , dt , last = yamux_session - > since_ping ;
2017-10-11 16:23:25 +00:00
if ( ! timespec_get ( & now , TIME_UTC ) )
return - EACCES ;
dt . tv_sec = now . tv_sec - last . tv_sec ;
if ( now . tv_nsec < last . tv_nsec )
{
dt . tv_sec - - ;
dt . tv_nsec = last . tv_nsec - now . tv_nsec ;
}
else
dt . tv_nsec = now . tv_nsec - last . tv_nsec ;
2017-11-19 18:37:03 +00:00
yamux_session - > pong_fn ( yamux_session , f . length , dt ) ;
2017-10-11 16:23:25 +00:00
}
else
return - EPROTO ;
break ;
2017-11-27 14:06:33 +00:00
}
case yamux_frame_go_away : {
// go away (hanging up)
libp2p_logger_debug ( " yamux " , " Received a \" go away \" . \n " ) ;
2017-11-19 18:37:03 +00:00
yamux_session - > closed = 1 ;
if ( yamux_session - > go_away_fn )
yamux_session - > go_away_fn ( yamux_session , ( enum yamux_error ) f . length ) ;
2017-10-11 16:23:25 +00:00
break ;
2017-11-27 14:06:33 +00:00
}
default : {
libp2p_logger_debug ( " yamux " , " We thought we needed to do something at the yamux protocol level, but the flags didn't match up. \n " ) ;
2017-10-11 16:23:25 +00:00
return - EPROTO ;
2017-11-27 14:06:33 +00:00
}
2017-10-11 16:23:25 +00:00
}
2017-11-27 14:06:33 +00:00
} else {
// we're handling a stream, not something at the yamux protocol level
2017-11-19 18:37:03 +00:00
for ( size_t i = 0 ; i < yamux_session - > cap_streams ; + + i )
2017-10-11 16:23:25 +00:00
{
2017-11-19 18:37:03 +00:00
struct yamux_session_stream * ss = & yamux_session - > streams [ i ] ;
2017-10-11 16:23:25 +00:00
struct yamux_stream * s = ss - > stream ;
2017-11-19 18:37:03 +00:00
if ( ! ss - > alive | | s - > state = = yamux_stream_closed ) // skip dead or closed streams
2017-10-11 16:23:25 +00:00
continue ;
2017-11-19 18:37:03 +00:00
if ( s - > id = = f . streamid ) // we have a match between the stored stream and the current stream
2017-10-11 16:23:25 +00:00
{
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " We found our stream id. \n " ) ;
2017-10-11 16:23:25 +00:00
if ( f . flags & yamux_frame_rst )
{
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " They are asking that this stream be reset. \n " ) ;
2017-11-19 18:37:03 +00:00
// close the stream
2017-10-11 16:23:25 +00:00
s - > state = yamux_stream_closed ;
if ( s - > rst_fn )
s - > rst_fn ( s ) ;
}
else if ( f . flags & yamux_frame_fin )
{
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " They are asking that this stream be closed. \n " ) ;
2017-10-11 16:23:25 +00:00
// local stream didn't initiate FIN
if ( s - > state ! = yamux_stream_closing )
2017-11-19 18:37:03 +00:00
yamux_stream_close ( context ) ;
2017-10-11 16:23:25 +00:00
s - > state = yamux_stream_closed ;
if ( s - > fin_fn )
s - > fin_fn ( s ) ;
}
else if ( f . flags & yamux_frame_ack )
{
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " They sent an ack. \n " ) ;
2017-11-19 18:37:03 +00:00
// acknowldegement
2017-11-27 14:06:33 +00:00
if ( s - > state ! = yamux_stream_syn_sent ) {
libp2p_logger_debug ( " yamux " , " We received an ack, but it seems we never sent anything! \n " ) ;
2017-10-11 16:23:25 +00:00
return - EPROTO ;
2017-11-27 14:06:33 +00:00
}
2017-10-11 16:23:25 +00:00
s - > state = yamux_stream_est ;
}
2017-11-27 14:06:33 +00:00
else if ( f . flags ) {
libp2p_logger_debug ( " yamux " , " They sent no flags. I don't know what to do. Erroring out. \n " ) ;
2017-10-11 16:23:25 +00:00
return - EPROTO ;
2017-11-27 14:06:33 +00:00
}
2017-10-11 16:23:25 +00:00
int sz = sizeof ( struct yamux_frame ) ;
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " Processing frame of %d bytes. \n " ) ;
2017-11-19 18:37:03 +00:00
ssize_t re = yamux_stream_process ( s , & f , & incoming [ sz ] , incoming_size - sz ) ;
2017-10-11 16:23:25 +00:00
return ( re < 0 ) ? re : ( re + incoming_size ) ;
}
}
2017-11-06 18:36:11 +00:00
// This stream is not in my list of streams.
// It must not exist yet, so let's try to make it
2017-10-11 16:23:25 +00:00
if ( f . flags & yamux_frame_syn )
{
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " Looks like we have a new stream coming in. Stream %d. \n " , f . streamid ) ;
2017-11-19 18:37:03 +00:00
struct StreamMessage * msg = libp2p_stream_message_new ( ) ;
2017-10-11 16:23:25 +00:00
2017-11-19 18:37:03 +00:00
if ( incoming_size > sizeof ( struct yamux_frame ) ) {
msg - > data_size = incoming_size - sizeof ( struct yamux_frame ) ;
2017-11-27 14:06:33 +00:00
libp2p_logger_debug ( " yamux " , " Stream %d has data after the frame, with a length of %d. \n " , f . streamid , msg - > data_size ) ;
2017-11-19 18:37:03 +00:00
msg - > data = malloc ( msg - > data_size ) ;
memcpy ( msg - > data , & incoming [ sizeof ( struct yamux_frame ) ] , msg - > data_size ) ;
2017-11-27 14:06:33 +00:00
} else {
libp2p_logger_debug ( " yamux " , " Stream %d has no extra data after the frame. \n " , f . streamid ) ;
2017-11-19 18:37:03 +00:00
}
2017-10-11 16:23:25 +00:00
2017-11-20 00:29:40 +00:00
// if we didn't initiate it, add this new channel (odd stream id is from client, even is from server)
2017-11-27 14:06:33 +00:00
if ( ( f . streamid % 2 = = 0 & & ! yamuxContext - > am_server ) | | ( f . streamid % 2 = = 1 & & yamuxContext - > am_server ) ) {
libp2p_logger_debug ( " yamux " , " This is a new channel. Creating it... \n " ) ;
2017-11-20 00:29:40 +00:00
struct Stream * yamuxChannelStream = yamux_channel_new ( yamuxContext , f . streamid , msg ) ;
2017-11-23 11:23:50 +00:00
if ( yamuxChannelStream = = NULL ) {
libp2p_logger_error ( " yamux " , " session->yamux_decode: Unable to create new yamux channel for stream id %d. \n " , f . streamid ) ;
2017-11-20 00:29:40 +00:00
return - EPROTO ;
2017-11-23 11:23:50 +00:00
}
2017-11-20 00:29:40 +00:00
struct YamuxChannelContext * channelContext = ( struct YamuxChannelContext * ) yamuxChannelStream - > stream_context ;
2017-10-11 16:23:25 +00:00
2017-11-23 11:23:50 +00:00
if ( yamux_session - > new_stream_fn ) {
libp2p_logger_debug ( " yamux " , " session->yamux_decode: Calling new_stream_fn. \n " ) ;
2017-11-20 00:29:40 +00:00
yamux_session - > new_stream_fn ( yamuxContext , yamuxContext - > stream , msg ) ;
2017-11-23 11:23:50 +00:00
}
2017-10-11 16:23:25 +00:00
2017-11-20 00:29:40 +00:00
channelContext - > state = yamux_stream_syn_recv ;
2017-11-27 14:06:33 +00:00
} else {
libp2p_logger_debug ( " yamux " , " I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s " , f . streamid , ( yamuxContext - > am_server ? " server " : " client) " ) ) ;
2017-11-20 00:29:40 +00:00
}
2017-11-19 18:37:03 +00:00
* return_message = msg ;
2017-10-11 16:23:25 +00:00
}
2017-11-27 14:06:33 +00:00
else {
libp2p_logger_error ( " yamux " , " We had a (probably) new frame, but the flags didn't seem right. " ) ;
2017-10-11 16:23:25 +00:00
return - EPROTO ;
2017-11-27 14:06:33 +00:00
}
2017-10-11 16:23:25 +00:00
}
return 0 ;
}