2017-10-11 16:23:25 +00:00
# include <string.h>
2017-10-12 02:15:12 +00:00
# include <unistd.h>
2017-10-11 16:23:25 +00:00
# include "varint.h"
# include "libp2p/yamux/session.h"
2017-11-02 19:45:17 +00:00
# include "libp2p/yamux/yamux.h"
2017-10-11 16:23:25 +00:00
# include "libp2p/net/protocol.h"
# include "libp2p/net/stream.h"
2017-11-08 15:51:43 +00:00
# include "libp2p/net/connectionstream.h"
2017-10-11 16:23:25 +00:00
# include "libp2p/conn/session.h"
# include "libp2p/utils/logger.h"
2017-11-23 11:23:50 +00:00
// function declarations that we don't want in the header file
int libp2p_yamux_channels_free ( struct YamuxContext * ctx ) ;
struct Stream * libp2p_yamux_get_parent_stream ( void * context ) ;
2017-12-01 01:58:47 +00:00
/***
* Walks down the tree , looking for the nearest YamuxChannelContext
* @ param in the stream
* @ returns the YamuxChannelContext or NULL
*/
struct YamuxChannelContext * libp2p_yamux_get_parent_channel_context ( struct Stream * in ) {
if ( in = = NULL )
return NULL ;
struct YamuxChannelContext * retVal = libp2p_yamux_get_channel_context ( in - > stream_context ) ;
if ( retVal = = NULL ) {
return libp2p_yamux_get_parent_channel_context ( in - > parent_stream ) ;
}
return NULL ;
}
2017-11-23 11:23:50 +00:00
/**
* Given a context , get the YamuxChannelContext
* @ param stream_context the context
* @ returns the YamuxChannelContext or NULL if there was none
*/
struct YamuxChannelContext * libp2p_yamux_get_channel_context ( void * stream_context ) {
char proto = ( ( uint8_t * ) stream_context ) [ 0 ] ;
if ( proto = = YAMUX_CHANNEL_CONTEXT ) {
return ( struct YamuxChannelContext * ) stream_context ;
}
return NULL ;
}
/***
* Given a context , get the YamuxContext
* @ param stream_context a YamuxChannelContext or a YamuxContext
* @ returns the YamuxContext , or NULL on error
*/
struct YamuxContext * libp2p_yamux_get_context ( void * stream_context ) {
2017-12-01 01:58:47 +00:00
if ( stream_context = = NULL )
return NULL ;
2017-11-23 11:23:50 +00:00
char proto = ( ( uint8_t * ) stream_context ) [ 0 ] ;
struct YamuxChannelContext * channel = NULL ;
struct YamuxContext * ctx = NULL ;
if ( proto = = YAMUX_CHANNEL_CONTEXT ) {
channel = ( struct YamuxChannelContext * ) stream_context ;
ctx = channel - > yamux_context ;
} else if ( proto = = YAMUX_CONTEXT ) {
ctx = ( struct YamuxContext * ) stream_context ;
}
return ctx ;
}
2017-10-11 16:23:25 +00:00
/**
* Determines if this protocol can handle the incoming message
* @ param incoming the incoming data
* @ param incoming_size the size of the incoming data buffer
* @ returns true ( 1 ) if it can handle this message , false ( 0 ) if not
*/
2017-10-23 20:21:50 +00:00
int yamux_can_handle ( const struct StreamMessage * msg ) {
2017-11-27 14:06:33 +00:00
if ( msg = = NULL | | msg - > data_size = = 0 | | msg - > data = = NULL )
return 0 ;
2017-10-11 16:23:25 +00:00
char * protocol = " /yamux/1.0.0 \n " ;
int protocol_size = strlen ( protocol ) ;
// is there a varint in front?
size_t num_bytes = 0 ;
2017-10-23 20:21:50 +00:00
if ( msg - > data [ 0 ] ! = protocol [ 0 ] & & msg - > data [ 1 ] ! = protocol [ 1 ] ) {
varint_decode ( msg - > data , msg - > data_size , & num_bytes ) ;
2017-10-11 16:23:25 +00:00
}
2017-10-23 20:21:50 +00:00
if ( msg - > data_size > = protocol_size - num_bytes ) {
if ( strncmp ( protocol , ( char * ) & msg - > data [ num_bytes ] , protocol_size ) = = 0 )
2017-10-11 16:23:25 +00:00
return 1 ;
}
return 0 ;
}
/***
2017-11-06 18:36:11 +00:00
* The remote is attempting to negotiate yamux
2017-10-23 20:21:50 +00:00
* @ param msg the incoming message
2017-10-11 16:23:25 +00:00
* @ param incoming_size the size of the incoming data buffer
2017-11-08 15:51:43 +00:00
* @ param stream the incoming stream
2017-10-11 16:23:25 +00:00
* @ param protocol_context the protocol - dependent context
* @ returns 0 if the caller should not continue looping , < 0 on error , > 0 on success
*/
2017-11-08 15:51:43 +00:00
int yamux_handle_message ( const struct StreamMessage * msg , struct Stream * stream , void * protocol_context ) {
2017-11-29 12:40:52 +00:00
// get latest stream (multithreaded, so could be stale)
stream = libp2p_net_connection_get_session_context ( stream ) - > default_stream ;
2017-11-29 04:41:46 +00:00
if ( stream - > stream_type = = STREAM_TYPE_YAMUX ) {
struct YamuxContext * ctx = ( struct YamuxContext * ) stream - > stream_context ;
if ( ctx - > state = = yamux_stream_est ) {
// TODO: This is probably a frame. we need to handle this.
return - 1 ;
} else {
//TODO: check to make sure they sent the yamux protocol id
// we sent a protocol ID, and this is them responding
ctx - > state = yamux_stream_est ;
}
return 1 ;
}
// the incoming stream is not yamux. They are attempting to upgrade to yamux
struct YamuxProtocolContext * yamuxProtocolContext = ( struct YamuxProtocolContext * ) protocol_context ;
struct Stream * new_stream = libp2p_yamux_stream_new ( stream , 0 , yamuxProtocolContext - > protocol_handlers ) ;
if ( new_stream ! = NULL ) {
struct YamuxContext * ctx = ( struct YamuxContext * ) new_stream - > stream_context ;
ctx - > state = yamux_stream_est ;
// upgrade
return stream - > handle_upgrade ( stream , new_stream ) ;
}
return - 1 ;
2017-10-11 16:23:25 +00:00
}
/**
* Shutting down . Clean up any memory allocations
* @ param protocol_context the context
* @ returns true ( 1 )
*/
int yamux_shutdown ( void * protocol_context ) {
2017-11-08 15:51:43 +00:00
if ( protocol_context ! = NULL )
free ( protocol_context ) ;
2017-10-11 16:23:25 +00:00
return 0 ;
}
2017-11-08 15:51:43 +00:00
struct Libp2pProtocolHandler * libp2p_yamux_build_protocol_handler ( struct Libp2pVector * handlers ) {
2017-10-11 16:23:25 +00:00
struct Libp2pProtocolHandler * handler = libp2p_protocol_handler_new ( ) ;
if ( handler ! = NULL ) {
2017-11-23 12:44:42 +00:00
struct YamuxProtocolContext * ctx = ( struct YamuxProtocolContext * ) malloc ( sizeof ( struct YamuxProtocolContext ) ) ;
if ( ctx = = NULL ) {
libp2p_protocol_handler_free ( handler ) ;
return NULL ;
}
ctx - > protocol_handlers = handlers ;
handler - > context = ctx ;
2017-10-11 16:23:25 +00:00
handler - > CanHandle = yamux_can_handle ;
handler - > HandleMessage = yamux_handle_message ;
handler - > Shutdown = yamux_shutdown ;
}
return handler ;
}
2017-10-23 23:03:38 +00:00
2017-11-23 11:23:50 +00:00
/**
* Close the main yamux connection
* @ param stream the stream to close
* @ returns true ( 1 ) on success , false ( 0 ) on error
*/
int libp2p_yamux_send_go_away ( struct Stream * stream ) {
struct YamuxChannelContext * channel = libp2p_yamux_get_channel_context ( stream - > stream_context ) ;
struct YamuxContext * ctx = libp2p_yamux_get_context ( stream - > stream_context ) ;
if ( ctx ! = NULL ) {
struct StreamMessage * msg = libp2p_stream_message_new ( ) ;
msg - > data_size = sizeof ( struct yamux_frame ) ;
msg - > data = malloc ( msg - > data_size ) ;
struct yamux_frame * f = ( struct yamux_frame * ) msg - > data ;
f - > type = yamux_frame_go_away ;
f - > flags = yamux_frame_fin ;
f - > streamid = 0 ;
f - > version = 0 ;
f - > length = 0 ;
if ( channel ! = NULL ) {
f - > streamid = channel - > channel ;
}
encode_frame ( f ) ;
stream - > parent_stream - > write ( stream - > parent_stream - > stream_context , msg ) ;
libp2p_stream_message_free ( msg ) ;
return 1 ;
}
return 0 ;
}
2017-11-06 21:38:55 +00:00
/***
* Close the stream and clean up all resources
* NOTE : This also goes through the channels
* @ param stream_context the YamuxContext
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-11-08 15:51:43 +00:00
int libp2p_yamux_close ( struct Stream * stream ) {
if ( stream = = NULL )
2017-11-06 18:36:11 +00:00
return 0 ;
2017-11-08 15:51:43 +00:00
if ( stream - > stream_context = = NULL )
return 0 ;
2017-11-19 18:37:03 +00:00
struct Stream * parent_stream = stream - > parent_stream ;
// this should close everything above yamux (i.e. the protocols that are riding on top of yamux)
2017-11-23 11:23:50 +00:00
libp2p_yamux_channels_free ( stream - > stream_context ) ;
// send a FIN
libp2p_yamux_send_go_away ( stream ) ;
2017-11-19 18:37:03 +00:00
// and this should close everything below
parent_stream - > close ( parent_stream ) ;
2017-11-06 21:38:55 +00:00
return 1 ;
2017-11-02 19:45:17 +00:00
}
2017-11-29 15:57:48 +00:00
/***
* Determine if the incoming is a data frame , but we need more data
* @ param incoming the incoming message
* @ returns > 0 if we need more data , 0 if not
*/
int yamux_more_to_read ( struct StreamMessage * incoming ) {
if ( incoming = = NULL )
return 0 ;
if ( incoming - > data_size < 12 ) {
return 0 ;
}
// get frame
struct yamux_frame * original_frame = ( struct yamux_frame * ) incoming - > data ;
struct yamux_frame * copy = ( struct yamux_frame * ) malloc ( sizeof ( struct yamux_frame ) ) ;
memcpy ( copy , original_frame , sizeof ( struct yamux_frame ) ) ;
decode_frame ( copy ) ;
if ( copy - > type = = yamux_frame_data ) {
libp2p_logger_debug ( " yamux " , " Checking frame sizes. It says we should have %d, and I see %d. \n " , copy - > length , incoming - > data_size - sizeof ( struct yamux_frame ) ) ;
int retVal = copy - > length - ( incoming - > data_size - sizeof ( struct yamux_frame ) ) ;
free ( copy ) ;
return retVal ;
}
free ( copy ) ;
return 0 ;
}
2017-11-30 19:32:36 +00:00
int libp2p_yamux_channel_read ( void * stream_context , struct StreamMessage * * message , int timeout_secs ) {
if ( stream_context = = NULL ) {
libp2p_logger_error ( " yamux " , " channel_read: stream context null. \n " ) ;
return 0 ;
}
struct YamuxChannelContext * context = libp2p_yamux_get_channel_context ( stream_context ) ;
if ( context = = NULL ) {
libp2p_logger_error ( " yamux " , " channel_read: stream_context not a channel context \n " ) ;
return 0 ;
}
// reserve the necessary memory
* message = libp2p_stream_message_new ( ) ;
struct StreamMessage * msg = * message ;
if ( msg = = NULL ) {
libp2p_logger_error ( " yamux " , " channel_read: Unable to allocate memory for message struct. \n " ) ;
return 0 ;
}
msg - > data_size = context - > buffer - > buffer_size ;
if ( msg - > data_size = = 0 ) {
libp2p_logger_debug ( " yamux " , " channel_read: Nothing to read. \n " ) ;
libp2p_stream_message_free ( msg ) ;
* message = NULL ;
return 0 ;
}
msg - > data = ( uint8_t * ) malloc ( msg - > data_size ) ;
if ( msg - > data = = NULL ) {
libp2p_logger_error ( " yamux " , " chanel_read: Unable to allocate memory for message data. \n " ) ;
libp2p_stream_message_free ( msg ) ;
* message = NULL ;
return 0 ;
}
// ok, we have our struct. Now fill it
msg - > data_size = threadsafe_buffer_read ( context - > buffer , msg - > data , msg - > data_size ) ;
libp2p_logger_debug ( " yamux " , " channel_read: Read %d bytes from buffer. \n " , msg - > data_size ) ;
return msg - > data_size ;
}
2017-11-02 19:45:17 +00:00
/**
* Read from the network , expecting a yamux frame .
* NOTE : This will also dispatch the frame to the correct protocol
* @ param stream_context the YamuxContext
* @ param message the resultant message
* @ param timeout_secs when to give up
* @ returns true ( 1 ) on success , false ( 0 ) on failure
*/
int libp2p_yamux_read ( void * stream_context , struct StreamMessage * * message , int timeout_secs ) {
2017-11-23 11:23:50 +00:00
if ( stream_context = = NULL ) {
libp2p_logger_error ( " yamux " , " read was passed a null context. \n " ) ;
2017-11-02 19:45:17 +00:00
return 0 ;
2017-11-23 11:23:50 +00:00
}
2017-11-30 19:32:36 +00:00
struct YamuxChannelContext * channel = libp2p_yamux_get_channel_context ( stream_context ) ;
if ( channel ! = NULL ) {
// do a channel read instead
return libp2p_yamux_channel_read ( stream_context , message , timeout_secs ) ;
2017-11-06 18:36:11 +00:00
}
2017-11-02 19:45:17 +00:00
2017-11-30 19:32:36 +00:00
struct YamuxContext * ctx = libp2p_yamux_get_context ( stream_context ) ;
2017-11-29 12:40:52 +00:00
if ( ctx = = NULL ) {
libp2p_logger_error ( " yamux " , " read: The incoming stream is not a yamux stream. \n " ) ;
return 0 ;
}
2017-11-29 04:58:28 +00:00
if ( ctx - > state ! = yamux_stream_est ) {
libp2p_logger_debug ( " yamux " , " read: Yamux still not inited, so passing to lower protocol. \n " ) ;
2017-11-29 12:40:52 +00:00
// perhaps this is the yamux protocol id we've been expecting
int retVal = ctx - > stream - > parent_stream - > read ( ctx - > stream - > parent_stream - > stream_context , message , timeout_secs ) ;
libp2p_logger_debug ( " yamux " , " read: we did the lower read, and received a %d. \n " , retVal ) ;
if ( retVal > 0 ) {
struct StreamMessage * incoming_message = * message ;
libp2p_logger_debug ( " yamux " , " read: The lower read has a message of %d bytes that says: %s. \n " , incoming_message - > data_size , incoming_message - > data ) ;
if ( strstr ( ( char * ) incoming_message - > data , " /yamux/1.0.0 " ) ! = NULL ) {
libp2p_logger_debug ( " yamux " , " read: We got the protocol we've been waiting for. \n " ) ;
ctx - > state = yamux_stream_est ;
libp2p_stream_message_free ( incoming_message ) ;
* message = NULL ;
return 0 ;
}
}
return retVal ;
2017-11-29 04:58:28 +00:00
}
2017-11-23 11:23:50 +00:00
struct Stream * parent_stream = libp2p_yamux_get_parent_stream ( stream_context ) ;
2017-11-30 19:32:36 +00:00
// this is the normal situation (not dead code).
struct StreamMessage * incoming = NULL ;
// we need a lock
if ( parent_stream - > read ( parent_stream - > s tream_context , & incoming , yamux_default_timeout ) ) {
libp2p_logger_debug ( " yamux " , " read: successfully read %d bytes from network. \n " , incoming - > data_size ) ;
// This could be a data frame with the actual data coming later. Yuck.
// JMJ in the case of an incomplete buffer, the next read should be the data. This must
// be true, as the next data does not have a frame. We should only read the bytes we need.
int moreToRead = yamux_more_to_read ( incoming ) ;
if ( moreToRead > 0 ) {
uint8_t buffer [ moreToRead ] ;
if ( parent_stream - > read_raw ( parent_stream - > stream_context , buffer , moreToRead , timeout_secs ) = = moreToRead ) {
// we have the bytes we need
uint8_t * new_buffer = ( uint8_t * ) malloc ( incoming - > data_size + moreToRead ) ;
memcpy ( new_buffer , incoming - > data , incoming - > data_size ) ;
memcpy ( & new_buffer [ incoming - > data_size ] , buffer , moreToRead ) ;
incoming - > data_size + = moreToRead ;
free ( incoming - > data ) ;
incoming - > data = new_buffer ;
} else {
// we didn't get the bytes we needed
return 0 ;
}
2017-11-23 11:23:50 +00:00
}
2017-11-30 19:32:36 +00:00
// parse the frame. This is where the work happens.
if ( yamux_decode ( ctx , incoming - > data , incoming - > data_size , message ) > = 0 ) {
libp2p_stream_message_free ( incoming ) ;
// The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled
if ( * message ! = NULL & & ( * message ) - > data_size = = 0 ) {
libp2p_stream_message_free ( * message ) ;
* message = NULL ;
return 0 ;
}
2017-11-19 18:37:03 +00:00
return 1 ;
2017-11-23 11:23:50 +00:00
}
libp2p_logger_error ( " yamux " , " yamux_decode returned error. \n " ) ;
2017-11-30 19:32:36 +00:00
libp2p_stream_message_free ( incoming ) ;
} else {
// read failed
2017-11-06 18:36:11 +00:00
}
2017-11-30 19:32:36 +00:00
2017-11-23 11:23:50 +00:00
libp2p_logger_error ( " yamux " , " Unable to do network read. \n " ) ;
2017-11-06 18:36:11 +00:00
return 0 ;
2017-11-02 19:45:17 +00:00
}
2017-11-19 18:37:03 +00:00
/***
* Prepare a new Yamux StreamMessage based on another StreamMessage
* NOTE : The frame is not encoded yet
* @ param incoming the incoming message
* @ returns a new StreamMessage that has a yamux_frame
*/
struct StreamMessage * libp2p_yamux_prepare_to_send ( struct StreamMessage * incoming ) {
struct StreamMessage * out = libp2p_stream_message_new ( ) ;
if ( out ! = NULL ) {
out - > data_size = sizeof ( struct yamux_frame ) + incoming - > data_size ;
out - > data = ( uint8_t * ) malloc ( out - > data_size ) ;
if ( out - > data = = NULL ) {
libp2p_stream_message_free ( out ) ;
return NULL ;
}
memset ( out - > data , 0 , out - > data_size ) ;
// the first part of the data is the yamux frame
// Set values in the frame, which is the first part of the outgoing message data
struct yamux_frame * frame = ( struct yamux_frame * ) out - > data ;
frame - > length = incoming - > data_size ;
frame - > type = yamux_frame_data ;
frame - > version = YAMUX_VERSION ;
// the last part of the data is the original data
memcpy ( & out - > data [ sizeof ( struct yamux_frame ) ] , incoming - > data , incoming - > data_size ) ;
}
return out ;
}
2017-11-23 11:23:50 +00:00
/***
* Get the next usable ID for a channel
* NOTE : Also increments the yamux_session_ . nextid counter
* NOTE : Odd = client , Even = server
* @ param ctx the context
* @ returns the next id
*/
uint32_t libp2p_yamux_get_next_id ( struct YamuxContext * ctx ) {
uint32_t next_id = ctx - > session - > nextid ;
if ( ( ctx - > am_server & & next_id % 2 = = 1 )
| | ( ! ctx - > am_server & & next_id % 2 = = 0 ) )
next_id + = 1 ;
ctx - > session - > nextid = next_id + 1 ;
return next_id ;
}
2017-11-06 18:36:11 +00:00
/***
* Write to the remote
* @ param stream_context the context . Could be a YamuxContext or YamuxChannelContext
* @ param message the message to write
* @ returns the number of bytes written
*/
2017-11-02 19:45:17 +00:00
int libp2p_yamux_write ( void * stream_context , struct StreamMessage * message ) {
2017-11-06 18:36:11 +00:00
if ( stream_context = = NULL )
return 0 ;
// look at the first byte of the context to determine if this is a YamuxContext (we're negotiating)
// or a YamuxChannelContext (we're talking to an established channel)
struct YamuxContext * ctx = NULL ;
struct YamuxChannelContext * channel = NULL ;
char proto = ( ( uint8_t * ) stream_context ) [ 0 ] ;
if ( proto = = YAMUX_CHANNEL_CONTEXT ) {
channel = ( struct YamuxChannelContext * ) stream_context ;
ctx = channel - > yamux_context ;
} else if ( proto = = YAMUX_CONTEXT ) {
ctx = ( struct YamuxContext * ) stream_context ;
}
2017-11-19 18:37:03 +00:00
if ( ctx = = NULL & & channel = = NULL )
return 0 ;
2017-11-29 04:41:46 +00:00
if ( ctx - > state ! = yamux_stream_est ) {
struct Stream * parent_stream = ctx - > stream - > parent_stream ;
return parent_stream - > write ( parent_stream - > stream_context , message ) ;
}
2017-11-19 18:37:03 +00:00
struct StreamMessage * outgoing_message = libp2p_yamux_prepare_to_send ( message ) ;
// now convert fame for network use
struct yamux_frame * frame = ( struct yamux_frame * ) outgoing_message - > data ;
// set a few more flags
frame - > flags = get_flags ( stream_context ) ;
2017-11-23 11:23:50 +00:00
if ( channel = = NULL ) {
// if we don't yet have a channel, set the id to the next available
frame - > streamid = libp2p_yamux_get_next_id ( ctx ) ;
} else {
2017-11-19 18:37:03 +00:00
frame - > streamid = channel - > channel ;
2017-11-23 11:23:50 +00:00
}
2017-11-19 18:37:03 +00:00
encode_frame ( frame ) ;
int retVal = 0 ;
2017-11-06 21:38:55 +00:00
if ( channel ! = NULL & & channel - > channel ! = 0 ) {
2017-11-06 18:36:11 +00:00
// we have an established channel. Use it.
2017-11-23 11:23:50 +00:00
libp2p_logger_debug ( " yamux " , " About to write %d bytes to yamux channel %d. \n " , outgoing_message - > data_size , channel - > channel ) ;
struct Stream * parent_stream = libp2p_yamux_get_parent_stream ( stream_context ) ;
retVal = parent_stream - > write ( parent_stream - > stream_context , outgoing_message ) ;
2017-11-06 18:36:11 +00:00
} else if ( ctx ! = NULL ) {
2017-11-23 11:23:50 +00:00
libp2p_logger_debug ( " yamux " , " About to write %d bytes to stream. \n " , outgoing_message - > data_size ) ;
retVal = ctx - > stream - > parent_stream - > write ( ctx - > stream - > parent_stream - > stream_context , outgoing_message ) ;
2017-11-06 18:36:11 +00:00
}
2017-11-19 18:37:03 +00:00
libp2p_stream_message_free ( outgoing_message ) ;
return retVal ;
2017-11-02 19:45:17 +00:00
}
/***
* Check to see if there is anything waiting on the network .
* @ param stream_context the YamuxContext
* @ returns the number of bytes waiting , or - 1 on error
*/
int libp2p_yamux_peek ( void * stream_context ) {
if ( stream_context = = NULL )
return - 1 ;
2017-11-20 00:29:40 +00:00
struct YamuxContext * ctx = libp2p_yamux_get_context ( stream_context ) ;
2017-11-02 19:45:17 +00:00
struct Stream * parent_stream = ctx - > stream - > parent_stream ;
if ( parent_stream = = NULL )
return - 1 ;
return parent_stream - > peek ( parent_stream - > stream_context ) ;
}
2017-11-30 19:32:36 +00:00
/***
* Read from the yamux channel buffer , mimics a network read
* @ param stream_context a YamuxChannelContext
* @ param buffer where to put the results
* @ param buffer_size the size of the buffer
* @ param timeout_secs how long to wait ( currently unused )
* @ returns the number of bytes placed into the buffer
*/
int libp2p_yamux_channel_read_raw ( void * stream_context , uint8_t * buffer , int buffer_size , int timeout_secs ) {
if ( stream_context = = NULL )
return 0 ;
struct YamuxChannelContext * channelContext = libp2p_yamux_get_channel_context ( stream_context ) ;
if ( channelContext = = NULL )
return 0 ;
// wait to see if we get the bytes we need
int counter = 0 ;
while ( channelContext - > buffer - > buffer_size < buffer_size & & counter < timeout_secs ) {
sleep ( 1 ) ;
counter + + ;
}
return threadsafe_buffer_read ( channelContext - > buffer , buffer , buffer_size ) ;
}
2017-11-20 00:29:40 +00:00
/***
* Read from the network , and place it in the buffer
* NOTE : This may put something in the internal read buffer ( i . e . buffer_size is too small )
* @ param stream_context the yamux context
* @ param buffer the buffer
* @ param buffer_size the size of the incoming buffer ( max number of bytes to read )
* @ param timeout_secs timeout
* @ returns number of bytes read .
*/
2017-11-02 19:45:17 +00:00
int libp2p_yamux_read_raw ( void * stream_context , uint8_t * buffer , int buffer_size , int timeout_secs ) {
2017-11-20 00:29:40 +00:00
if ( stream_context = = NULL ) {
return - 1 ;
}
2017-11-30 19:32:36 +00:00
struct YamuxChannelContext * channel_context = libp2p_yamux_get_channel_context ( stream_context ) ;
if ( channel_context ! = NULL ) {
// do a read_raw on a channel
return libp2p_yamux_channel_read_raw ( stream_context , buffer , buffer_size , timeout_secs ) ;
}
2017-11-20 00:29:40 +00:00
struct YamuxContext * ctx = libp2p_yamux_get_context ( stream_context ) ;
2017-11-23 11:23:50 +00:00
if ( ctx - > buffered_message_pos = = - 1 | | ctx - > buffered_message = = NULL ) {
2017-11-20 00:29:40 +00:00
// we need to get info from the network
2017-11-23 11:23:50 +00:00
if ( ! libp2p_yamux_read ( stream_context , & ctx - > buffered_message , timeout_secs ) ) {
libp2p_logger_error ( " yamux " , " read_raw: Unable to read from network. \n " ) ;
2017-11-20 00:29:40 +00:00
return - 1 ;
}
ctx - > buffered_message_pos = 0 ;
2017-11-23 11:23:50 +00:00
} else {
// we have some data from a previous read_raw call the code
// below should handle this.
2017-11-20 00:29:40 +00:00
}
// max_to_read is the lesser of bytes read or buffer_size
2017-11-23 11:23:50 +00:00
int max_to_read = ( buffer_size > ( ctx - > buffered_message - > data_size - ctx - > buffered_message_pos ) ? ctx - > buffered_message - > data_size - ctx - > buffered_message_pos : buffer_size ) ;
2017-11-20 00:29:40 +00:00
memcpy ( buffer , & ctx - > buffered_message - > data [ ctx - > buffered_message_pos ] , max_to_read ) ;
ctx - > buffered_message_pos + = max_to_read ;
if ( ctx - > buffered_message_pos = = ctx - > buffered_message - > data_size ) {
// we read everything
libp2p_stream_message_free ( ctx - > buffered_message ) ;
ctx - > buffered_message = NULL ;
ctx - > buffered_message_pos = - 1 ;
} else {
// we didn't read everything.
ctx - > buffered_message_pos = max_to_read ;
}
return max_to_read ;
2017-11-02 19:45:17 +00:00
}
2017-11-19 18:37:03 +00:00
/**
* Create a new YamuxContext struct
* @ param stream the parent stream
* @ returns a YamuxContext
*/
2017-12-01 01:58:47 +00:00
struct YamuxContext * libp2p_yamux_context_new ( struct Stream * stream , int amServer ) {
2017-11-02 19:45:17 +00:00
struct YamuxContext * ctx = ( struct YamuxContext * ) malloc ( sizeof ( struct YamuxContext ) ) ;
if ( ctx ! = NULL ) {
2017-11-06 18:36:11 +00:00
ctx - > type = YAMUX_CONTEXT ;
2017-11-02 19:45:17 +00:00
ctx - > stream = NULL ;
2017-11-06 12:27:03 +00:00
ctx - > channels = libp2p_utils_vector_new ( 1 ) ;
2017-12-01 01:58:47 +00:00
ctx - > session = yamux_session_new ( NULL , stream , amServer ? yamux_session_server : yamux_session_client , NULL ) ;
2017-11-19 18:37:03 +00:00
ctx - > am_server = 0 ;
ctx - > state = 0 ;
2017-11-20 00:29:40 +00:00
ctx - > buffered_message = NULL ;
ctx - > buffered_message_pos = - 1 ;
ctx - > protocol_handlers = NULL ;
2017-11-02 19:45:17 +00:00
}
return ctx ;
}
2017-11-29 04:41:46 +00:00
/***
* Write the protocol id for yamux to the stream
* @ param stream the stream to write to
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_yamux_send_protocol ( struct Stream * stream ) {
2017-11-29 12:40:52 +00:00
// JMJ debug
libp2p_logger_debug ( " yamux " , " Sending protocol through stream of type %d and channel %d. \n " , stream - > stream_type , stream - > channel ) ;
2017-11-29 04:41:46 +00:00
const char * protocolID = " /yamux/1.0.0 \n " ;
struct StreamMessage outgoing ;
outgoing . data_size = strlen ( protocolID ) ;
outgoing . data = ( uint8_t * ) protocolID ;
outgoing . error_number = 0 ;
return stream - > write ( stream - > stream_context , & outgoing ) ;
}
2017-11-08 15:51:43 +00:00
/***
* A new protocol was asked for . Give it a " channel "
* @ param yamux_stream the yamux stream
* @ param new_stream the newly negotiated protocol
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_yamux_handle_upgrade ( struct Stream * yamux_stream , struct Stream * new_stream ) {
// put this stream in the collection, and tie it to an id
2017-11-23 11:23:50 +00:00
if ( libp2p_logger_watching_class ( " yamux " ) ) {
2017-12-01 01:58:47 +00:00
libp2p_logger_debug ( " yamux " , " handle_upgrade called for stream type %d. \n " , new_stream - > stream_type ) ;
2017-11-23 11:23:50 +00:00
}
2017-11-30 19:32:36 +00:00
struct YamuxContext * yamux_context = libp2p_yamux_get_context ( yamux_stream - > stream_context ) ;
struct YamuxChannelContext * yamux_channel_context = libp2p_yamux_get_channel_context ( yamux_stream - > stream_context ) ;
if ( yamux_channel_context ! = NULL ) {
// they've asked to upgrade on a channel. Make them the new default stream for this channel
yamux_channel_context - > child_stream = new_stream ;
struct yamux_session_stream * yamux_session_stream = yamux_get_session_stream ( yamux_channel_context - > yamux_context - > session , yamux_channel_context - > channel ) ;
if ( yamux_session_stream = = NULL ) {
libp2p_logger_error ( " yamux " , " Unable to get correct session stream. \n " ) ;
return 0 ;
}
yamux_session_stream - > stream - > stream = new_stream ;
return 1 ;
} else {
// they've asked to upgrade on the main channel. I don't think this should never happen.
libp2p_logger_debug ( " yamux " , " handle_upgrade: Attempt to upgrade on the main yamux channel " ) ;
return libp2p_yamux_stream_add ( yamux_context , new_stream ) ;
}
2017-11-19 18:37:03 +00:00
}
/***
* Internal yamux code calls this when a new stream is created
* @ param context the context
* @ param stream the new stream
*/
void libp2p_yamux_new_stream ( struct YamuxContext * context , struct Stream * stream , struct StreamMessage * msg ) {
// ok, we have the new stream structure. We now need to read what was sent.
2017-11-23 11:23:50 +00:00
//libp2p_protocol_marshal(msg, stream, context->protocol_handlers);
2017-11-19 18:37:03 +00:00
}
2017-10-23 23:03:38 +00:00
/***
* Negotiate the Yamux protocol
* @ param parent_stream the parent stream
2017-11-19 18:37:03 +00:00
* @ param am_server true ( 1 ) if we are considered the server , false ( 0 ) if we are the client .
* @ param protocol_handlers the protocol handlers
2017-10-23 23:03:38 +00:00
* @ returns a Stream initialized and ready for yamux
*/
2017-11-19 18:37:03 +00:00
struct Stream * libp2p_yamux_stream_new ( struct Stream * parent_stream , int am_server , struct Libp2pVector * protocol_handlers ) {
2017-11-06 18:36:11 +00:00
struct Stream * out = libp2p_stream_new ( ) ;
2017-11-02 18:43:52 +00:00
if ( out ! = NULL ) {
2017-11-19 18:37:03 +00:00
out - > stream_type = STREAM_TYPE_YAMUX ;
2017-11-02 18:43:52 +00:00
out - > parent_stream = parent_stream ;
out - > close = libp2p_yamux_close ;
out - > read = libp2p_yamux_read ;
out - > write = libp2p_yamux_write ;
out - > peek = libp2p_yamux_peek ;
out - > read_raw = libp2p_yamux_read_raw ;
2017-11-08 15:51:43 +00:00
out - > handle_upgrade = libp2p_yamux_handle_upgrade ;
2017-11-02 18:43:52 +00:00
out - > address = parent_stream - > address ;
2017-11-29 04:58:28 +00:00
out - > socket_mutex = parent_stream - > socket_mutex ;
2017-11-02 18:43:52 +00:00
// build YamuxContext
2017-12-01 01:58:47 +00:00
struct YamuxContext * ctx = libp2p_yamux_context_new ( out , am_server ) ;
2017-11-02 18:43:52 +00:00
if ( ctx = = NULL ) {
2017-11-02 19:45:17 +00:00
libp2p_yamux_stream_free ( out ) ;
2017-11-02 18:43:52 +00:00
return NULL ;
}
2017-11-19 18:37:03 +00:00
ctx - > session - > new_stream_fn = libp2p_yamux_new_stream ;
2017-11-02 18:43:52 +00:00
out - > stream_context = ctx ;
ctx - > stream = out ;
2017-11-19 18:37:03 +00:00
ctx - > am_server = am_server ;
ctx - > protocol_handlers = protocol_handlers ;
2017-11-29 04:41:46 +00:00
ctx - > state = yamux_stream_inited ;
// tell protocol below that we want to upgrade
parent_stream - > handle_upgrade ( parent_stream , out ) ;
2017-11-02 18:43:52 +00:00
// attempt to negotiate yamux protocol
2017-11-29 04:41:46 +00:00
if ( ! libp2p_yamux_send_protocol ( parent_stream ) ) {
2017-11-02 18:43:52 +00:00
libp2p_yamux_stream_free ( out ) ;
return NULL ;
}
}
return out ;
2017-10-23 23:03:38 +00:00
}
2017-11-06 12:27:03 +00:00
2017-11-23 11:23:50 +00:00
/***
* This will retrieve the stream that yamux is riding on top of
* @ param context a YamuxContext or YamuxChannelContext
* @ returns the Stream that yamux is riding on top of
*/
struct Stream * libp2p_yamux_get_parent_stream ( void * context ) {
if ( context = = NULL )
return NULL ;
struct YamuxContext * ctx = libp2p_yamux_get_context ( context ) ;
if ( ctx = = NULL )
return NULL ;
return ctx - > stream - > parent_stream ;
}
/***
* Sends a FIN to close a channel
* @ param channel the channel to close
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_yamux_channel_send_FIN ( struct YamuxChannelContext * channel ) {
if ( channel = = NULL )
return 0 ;
struct YamuxContext * ctx = channel - > yamux_context ;
if ( ctx ! = NULL ) {
struct StreamMessage * msg = libp2p_stream_message_new ( ) ;
msg - > data_size = sizeof ( struct yamux_frame ) ;
msg - > data = malloc ( msg - > data_size ) ;
struct yamux_frame * f = ( struct yamux_frame * ) msg - > data ;
f - > type = yamux_frame_window_update ;
f - > flags = yamux_frame_fin ;
f - > streamid = channel - > channel ;
f - > version = 0 ;
f - > length = 0 ;
encode_frame ( f ) ;
struct Stream * parent_to_yamux = libp2p_yamux_get_parent_stream ( channel ) ;
if ( parent_to_yamux ! = NULL )
parent_to_yamux - > write ( parent_to_yamux - > stream_context , msg ) ;
libp2p_stream_message_free ( msg ) ;
return 1 ;
}
return 0 ;
}
2017-11-08 15:51:43 +00:00
/**
* Clean up resources from libp2p_yamux_channel_new
* @ param ctx the YamuxChannelContext
*/
int libp2p_yamux_channel_close ( void * context ) {
if ( context = = NULL )
return 0 ;
struct YamuxChannelContext * ctx = ( struct YamuxChannelContext * ) context ;
if ( ctx ! = NULL ) {
2017-11-23 11:23:50 +00:00
//Send FIN
libp2p_yamux_channel_send_FIN ( ctx ) ;
2017-11-08 15:51:43 +00:00
// close the child's stream
ctx - > child_stream - > close ( ctx - > child_stream ) ;
libp2p_stream_free ( ctx - > stream ) ;
free ( ctx ) ;
}
return 1 ;
}
2017-11-23 11:23:50 +00:00
/***
* Close all channels
* @ param ctx the YamuxContext that contains a vector of channels
* @ returns true ( 1 )
*/
int libp2p_yamux_channels_free ( struct YamuxContext * ctx ) {
if ( ctx - > channels ) {
for ( int i = 0 ; i < ctx - > channels - > total ; i + + ) {
struct Stream * curr = ( struct Stream * ) libp2p_utils_vector_get ( ctx - > channels , i ) ;
libp2p_yamux_channel_close ( curr - > stream_context ) ;
}
libp2p_utils_vector_free ( ctx - > channels ) ;
ctx - > channels = NULL ;
}
return 1 ;
}
2017-11-08 15:51:43 +00:00
/***
* Free the resources from libp2p_yamux_context_new
* @ param ctx the context
*/
void libp2p_yamux_context_free ( struct YamuxContext * ctx ) {
if ( ctx = = NULL )
return ;
2017-11-20 00:29:40 +00:00
if ( ctx - > buffered_message ! = NULL ) {
libp2p_stream_message_free ( ctx - > buffered_message ) ;
ctx - > buffered_message = NULL ;
}
2017-11-08 15:51:43 +00:00
// free all the channels
2017-11-23 11:23:50 +00:00
libp2p_yamux_channels_free ( ctx ) ;
2017-11-19 18:37:03 +00:00
if ( ctx - > session ! = NULL )
yamux_session_free ( ctx - > session ) ;
2017-11-08 15:51:43 +00:00
free ( ctx ) ;
return ;
}
2017-11-06 18:36:11 +00:00
/**
* Frees resources held by the stream
* @ param yamux_stream the stream
*/
void libp2p_yamux_stream_free ( struct Stream * yamux_stream ) {
if ( yamux_stream = = NULL )
return ;
struct YamuxContext * ctx = ( struct YamuxContext * ) yamux_stream - > stream_context ;
libp2p_yamux_context_free ( ctx ) ;
libp2p_stream_free ( yamux_stream ) ;
}
2017-11-08 15:51:43 +00:00
/***
* Channels calling close on the stream should not be able
* to clean up layers below
* @ param context the context
* @ returns true ( 1 ) ;
2017-11-06 21:38:55 +00:00
*/
2017-11-08 15:51:43 +00:00
int libp2p_yamux_channel_null_close ( struct Stream * stream ) {
2017-11-06 21:38:55 +00:00
return 1 ;
}
2017-11-06 18:36:11 +00:00
/**
* Create a stream that has a " YamuxChannelContext " related to this yamux protocol
2017-11-19 18:37:03 +00:00
* NOTE : If incoming_stream is not of the Yamux protocol , this " wraps " the incoming
* stream , so that the returned stream is the parent of the incoming_stream . If the
* incoming stream is of the yamux protocol , the YamuxChannelContext . child_stream
* will be NULL , awaiting an upgrade to fill it in .
2017-11-08 15:51:43 +00:00
* @ param incoming_stream the stream of the new protocol
2017-11-19 18:37:03 +00:00
* @ param channelNumber the channel number ( 0 if unknown )
* @ returns a new Stream that has a YamuxChannelContext
2017-11-06 18:36:11 +00:00
*/
2017-11-19 18:37:03 +00:00
struct Stream * libp2p_yamux_channel_stream_new ( struct Stream * incoming_stream , int channelNumber ) {
2017-12-01 01:58:47 +00:00
struct YamuxContext * yamuxContext = libp2p_yamux_get_context ( incoming_stream - > stream_context ) ;
2017-11-06 18:36:11 +00:00
struct Stream * out = libp2p_stream_new ( ) ;
if ( out ! = NULL ) {
2017-11-19 18:37:03 +00:00
out - > stream_type = STREAM_TYPE_YAMUX ;
2017-11-08 15:51:43 +00:00
out - > address = incoming_stream - > address ;
// don't allow the incoming_stream to close the channel
out - > close = libp2p_yamux_channel_null_close ;
2017-11-06 18:36:11 +00:00
struct YamuxChannelContext * ctx = ( struct YamuxChannelContext * ) malloc ( sizeof ( struct YamuxChannelContext ) ) ;
2017-12-01 01:58:47 +00:00
if ( yamuxContext = = NULL ) {
2017-11-19 18:37:03 +00:00
out - > parent_stream = incoming_stream - > parent_stream ;
out - > peek = incoming_stream - > parent_stream - > peek ;
out - > read = incoming_stream - > parent_stream - > read ;
out - > read_raw = incoming_stream - > parent_stream - > read_raw ;
out - > write = incoming_stream - > parent_stream - > write ;
out - > socket_mutex = incoming_stream - > parent_stream - > socket_mutex ;
ctx - > yamux_context = incoming_stream - > parent_stream - > stream_context ;
ctx - > child_stream = incoming_stream ;
// this does the wrap
incoming_stream - > parent_stream = out ;
} else {
2017-12-01 01:58:47 +00:00
out - > parent_stream = incoming_stream ;
out - > peek = incoming_stream - > peek ;
out - > read = incoming_stream - > read ;
out - > read_raw = incoming_stream - > read_raw ;
out - > write = incoming_stream - > write ;
out - > socket_mutex = incoming_stream - > socket_mutex ;
ctx - > yamux_context = incoming_stream - > stream_context ;
ctx - > child_stream = NULL ;
2017-11-19 18:37:03 +00:00
}
2017-11-23 11:23:50 +00:00
ctx - > channel = ( uint32_t ) channelNumber ;
2017-11-06 21:38:55 +00:00
ctx - > closed = 0 ;
ctx - > state = 0 ;
ctx - > window_size = 0 ;
2017-11-06 18:36:11 +00:00
ctx - > type = YAMUX_CHANNEL_CONTEXT ;
2017-11-06 21:38:55 +00:00
ctx - > stream = out ;
2017-11-30 19:32:36 +00:00
ctx - > buffer = threadsafe_buffer_context_new ( ) ;
ctx - > read_running = 0 ;
2017-11-06 18:36:11 +00:00
out - > stream_context = ctx ;
2017-11-30 19:32:36 +00:00
out - > handle_upgrade = libp2p_yamux_handle_upgrade ;
out - > channel = channelNumber ;
2017-11-06 18:36:11 +00:00
}
return out ;
2017-11-06 12:27:03 +00:00
}
2017-11-08 15:51:43 +00:00
/****
* Add a stream " channel " to the yamux handler
* @ param ctx the context
* @ param stream the stream to add
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
int libp2p_yamux_stream_add ( struct YamuxContext * ctx , struct Stream * stream ) {
if ( stream = = NULL )
return 0 ;
// wrap the new stream in a YamuxChannelContext
2017-11-19 18:37:03 +00:00
struct Stream * channel_stream = libp2p_yamux_channel_stream_new ( stream , 0 ) ;
2017-11-08 15:51:43 +00:00
if ( channel_stream = = NULL )
return 0 ;
struct YamuxChannelContext * channel_context = ( struct YamuxChannelContext * ) channel_stream - > stream_context ;
// the negotiation was successful. Add it to the list of channels that we have
2017-11-23 11:23:50 +00:00
uint32_t itemNo = ( uint32_t ) libp2p_utils_vector_add ( ctx - > channels , channel_stream ) ;
2017-11-19 18:37:03 +00:00
// There are 2 streams for each protocol. A server has the even numbered streams, the
// client the odd number streams. If we are the server, we need to kick off the
// process to add a stream of the same type.
2017-11-08 15:51:43 +00:00
channel_context - > channel = itemNo ;
2017-11-19 18:37:03 +00:00
if ( ctx - > am_server & & itemNo % 2 ! = 0 ) {
// we're the server, and they have a negotiated a new protocol.
// negotiate a stream for us to talk to them.
2017-11-23 11:23:50 +00:00
struct Stream * yamux_stream = ctx - > stream ;
2017-11-19 18:37:03 +00:00
struct Stream * server_to_client_stream = stream - > negotiate ( yamux_stream ) ;
libp2p_yamux_stream_add ( ctx , server_to_client_stream ) ;
}
2017-11-08 15:51:43 +00:00
return 1 ;
}
2017-11-29 04:41:46 +00:00
/***
* Wait for yamux stream to become ready
* @ param session_context the session context to check
* @ param timeout_secs the number of seconds to wait for things to become ready
* @ returns true ( 1 ) if it becomes ready , false ( 0 ) otherwise
*/
int libp2p_yamux_stream_ready ( struct SessionContext * session_context , int timeout_secs ) {
int counter = 0 ;
while ( session_context ! = NULL
& & session_context - > default_stream ! = NULL
& & session_context - > default_stream - > stream_type ! = STREAM_TYPE_YAMUX
& & counter < = timeout_secs ) {
counter + + ;
sleep ( 1 ) ;
}
if ( session_context ! = NULL
& & session_context - > default_stream ! = NULL
& & session_context - > default_stream - > stream_type = = STREAM_TYPE_YAMUX
& & counter < 5 ) {
struct YamuxContext * ctx = ( struct YamuxContext * ) session_context - > default_stream - > stream_context ;
while ( ctx - > state ! = yamux_stream_est & & counter < = timeout_secs ) {
counter + + ;
sleep ( 1 ) ;
}
if ( ctx - > state = = yamux_stream_est )
return 1 ;
}
return 0 ;
}