Make yamux compatible with GO. Still testing

This commit is contained in:
John Jones 2017-11-30 20:58:47 -05:00
parent 4218876198
commit 2e0391f68c
13 changed files with 271 additions and 128 deletions

View file

@ -143,13 +143,35 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
// yamux over multistream
new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, dialer->swarm->protocol_handlers);
if (new_stream != NULL) {
if (!libp2p_yamux_stream_ready(peer->sessionContext, 5))
if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) {
libp2p_logger_error("dialer", "Unable to get yamux into the ready status.\n");
return 0;
}
libp2p_logger_debug("dialer", "We successfully negotiated yamux.\n");
//peer->sessionContext->default_stream = new_stream;
// the rest should be done on another thread
// we have our swarm connection. Now we ask for some "channels"
// id over yamux
//libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream));
// id over multistream over yamux
const struct Libp2pProtocolHandler* handler = libp2p_protocol_get_handler(dialer->swarm->protocol_handlers, "/ipfs/id/1.0.0\n");
if (handler != NULL) {
Identify* identify = handler->context;
if (peer->sessionContext->default_stream->stream_type == STREAM_TYPE_YAMUX) {
struct YamuxContext* ctx = libp2p_yamux_get_context(peer->sessionContext->default_stream->stream_context);
// first get a new frame. It should be ready to go
struct Stream* new_channel = yamux_channel_new(ctx, 0, NULL);
if (new_channel != NULL && new_channel->channel > 0) {
// then get a multistream
struct Stream* yamux_multistream = libp2p_net_multistream_stream_new(new_channel, 0);
if (!libp2p_net_multistream_ready(new_channel->stream_context, 10)) {
libp2p_logger_error("dialer", "Unable to get multistream over yamux into the ready status.\n");
return 0;
}
// then get an identify
struct Stream* identify_stream = libp2p_identify_stream_new(yamux_multistream, identify, 1);
}
} else {
libp2p_logger_error("dialer", "Expected a yamux context, but got a context of type %d.\n", peer->sessionContext->default_stream->stream_type);
}
}
// kademlia over yamux
//libp2p_yamux_stream_add(new_stream->stream_context, libp2p_kademlia_stream_new(new_stream));
// circuit relay over yamux

View file

@ -10,6 +10,7 @@
#include "libp2p/conn/session.h"
#include "libp2p/identify/identify.h"
#include "libp2p/utils/logger.h"
#include "libp2p/yamux/yamux.h"
@ -41,16 +42,20 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) {
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_identify_send_protocol(struct Stream *stream, Identify* identify) {
int libp2p_identify_send_protocol(struct Stream *stream, Identify* identify, int initiatedByUs) {
size_t max_buffer_size = 6000;
uint8_t buffer[max_buffer_size];
char *protocol = "/ipfs/id/1.0.0\n";
int protocol_len = strlen(protocol);
// throw the protocol into the buffer
memcpy(&buffer[0], protocol, protocol_len);
if (!libp2p_identify_protobuf_encode(identify, &buffer[protocol_len], max_buffer_size-protocol_len, &max_buffer_size)) {
libp2p_logger_error("identify", "Unable to protobuf the identity.\n");
return 0;
if (!initiatedByUs) {
if (!libp2p_identify_protobuf_encode(identify, &buffer[protocol_len], max_buffer_size-protocol_len, &max_buffer_size)) {
libp2p_logger_error("identify", "Unable to protobuf the identity.\n");
return 0;
}
} else {
max_buffer_size = 0;
}
struct StreamMessage msg;
msg.data_size = protocol_len + max_buffer_size;
@ -331,13 +336,26 @@ exit:
* @returns <0 on error, 0 if loop should not continue, >0 on success
*/
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// attempt to create a new Identify connection with them.
// send the protocol id back, and set up the channel
Identify* identify = (Identify*)protocol_context;
struct Stream* new_stream = libp2p_identify_stream_new(stream, identify);
if (new_stream == NULL)
return -1;
return stream->handle_upgrade(stream, new_stream);
struct YamuxChannelContext* yamuxChannelContext = libp2p_yamux_get_parent_channel_context(stream);
struct YamuxContext* yamuxContext = libp2p_yamux_get_context(yamuxChannelContext);
struct Stream* new_stream = NULL;
if (yamuxChannelContext != NULL && yamuxChannelContext->child_stream->stream_type == STREAM_TYPE_YAMUX) {
if ( (yamuxContext->am_server && yamuxChannelContext->channel % 2 == 0)
|| (!yamuxContext->am_server && yamuxChannelContext->channel % 2 == 1)) {
// we initiated it, and they are replying
return 1;
}
} else {
// they initiated it.
// attempt to create a new Identify connection with them.
// send the protocol id back, and set up the channel
Identify* identify = (Identify*)protocol_context;
new_stream = libp2p_identify_stream_new(stream, identify, 0);
if (new_stream == NULL)
return -1;
return stream->handle_upgrade(stream, new_stream);
}
return 1;
}
/**
@ -377,6 +395,17 @@ int libp2p_identify_close(struct Stream* stream) {
return 1;
}
int libp2p_identify_read(void* stream_context, struct StreamMessage** msg, int timeout_secs) {
struct IdentifyContext* ctx = (struct IdentifyContext*) stream_context;
struct StreamMessage* internal = NULL;
if (ctx->parent_stream->read(ctx->parent_stream->stream_context, &internal, timeout_secs)) {
// we got an identity, but we should not do anything with it right now
// TODO: send a fin
libp2p_stream_message_free(internal);
}
return 0;
}
/***
* Create a new stream that negotiates the identify protocol
*
@ -387,7 +416,7 @@ int libp2p_identify_close(struct Stream* stream) {
* @param parent_stream the parent stream
* @returns a new Stream that can talk "identify"
*/
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify) {
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify, int initiatedByUs) {
if (parent_stream == NULL)
return NULL;
struct Stream* out = libp2p_stream_new();
@ -404,8 +433,9 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify
out->close = libp2p_identify_close;
out->negotiate = NULL;
out->bytes_waiting = NULL;
out->read = libp2p_identify_read;
// do we expect a reply?
if (!libp2p_identify_send_protocol(parent_stream, identify) /* || !libp2p_identify_receive_protocol(parent_stream) */) {
if (!libp2p_identify_send_protocol(parent_stream, identify, initiatedByUs) /* || !libp2p_identify_receive_protocol(parent_stream) */) {
libp2p_stream_free(out);
free(ctx);
return NULL;
@ -425,6 +455,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify
* @param parent_stream the parent stream
* @returns a new Stream that is a multistream, but with "identify" already negotiated
*/
/*
struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent_stream) {
if (parent_stream == NULL)
return NULL;
@ -451,3 +482,4 @@ struct Stream* libp2p_identify_stream_new_with_multistream(struct Stream* parent
}
return out;
}
*/

View file

@ -33,7 +33,7 @@ struct IdentifyContext {
};
int libp2p_identify_can_handle(const struct StreamMessage* msg);
int libp2p_identify_send_protocol(struct Stream* stream, Identify* identify);
int libp2p_identify_send_protocol(struct Stream* stream, Identify* identify, int initiatedByUs);
int libp2p_identify_receive_protocol(struct Stream* stream);
Identify* libp2p_identify_new();
void libp2p_identify_free(Identify* in);
@ -55,5 +55,5 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(char* publi
* @param parent_stream the parent stream
* @returns a new Stream that can talk "identify"
*/
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify);
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream, Identify* identify, int initiatedByUs);

View file

@ -123,9 +123,9 @@ void libp2p_net_multistream_stream_free(struct Stream* stream);
/***
* Wait for multistream stream to become ready
* @param session_context the session context to check
* @param context the session context to check, can also be a YamuxChannelContext
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs);
int libp2p_net_multistream_ready(void* context, int timeout_secs);

View file

@ -70,3 +70,11 @@ int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers);
* @param handlers the vector of handlers
*/
int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers);
/***
* Retrieve the correct protocol handlder for a particular protocol id
* @param protocol_handlers the collection of protocol handlers
* @param id the protocol id
* @returns a protocol handler that can handle id (or NULL if none found)
*/
const struct Libp2pProtocolHandler* libp2p_protocol_get_handler(struct Libp2pVector* protocol_handlers, const char* id);

View file

@ -129,3 +129,24 @@ struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incomin
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs);
/**
* Given a context, get the YamuxChannelContext
* @param stream_context the context
* @returns the YamuxChannelContext or NULL if there was none
*/
struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_context);
/***
* Given a context, get the YamuxContext
* @param stream_context a YamuxChannelContext or a YamuxContext
* @returns the YamuxContext, or NULL on error
*/
struct YamuxContext* libp2p_yamux_get_context(void* stream_context);
/***
* Walks down the tree, looking for the nearest YamuxChannelContext
* @param in the stream
* @returns the YamuxChannelContext or NULL
*/
struct YamuxChannelContext* libp2p_yamux_get_parent_channel_context(struct Stream* in);

View file

@ -16,6 +16,7 @@
#include "libp2p/net/multistream.h"
#include "libp2p/utils/logger.h"
#include "multiaddr/multiaddr.h"
#include "libp2p/yamux/yamux.h"
// NOTE: this is normally set to 5 seconds, but you may want to increase this during debugging
int multistream_default_timeout = 5;
@ -189,26 +190,43 @@ int libp2p_net_multistream_write_without_check(void* stream_context, struct Stre
return num_bytes;
}
int multistream_wait(struct Stream* stream, int timeout_secs) {
int counter = 0;
struct MultistreamContext* ctx = (struct MultistreamContext*)stream->stream_context;
while (ctx->status != multistream_status_ack && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (ctx->status == multistream_status_ack)
return 1;
return 0;
}
/***
* Wait for multistream stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs) {
int libp2p_net_multistream_ready(void* context, int timeout_secs) {
int counter = 0;
while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (session_context->default_stream->stream_type == STREAM_TYPE_MULTISTREAM && counter < 5) {
struct MultistreamContext* ctx = (struct MultistreamContext*)session_context->default_stream->stream_context;
while (ctx->status != multistream_status_ack && counter <= timeout_secs) {
struct YamuxChannelContext* yamuxChannelContext = libp2p_yamux_get_channel_context(context);
if (yamuxChannelContext != NULL) {
while (yamuxChannelContext->child_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (ctx->status == multistream_status_ack)
return 1;
if (counter <= 5)
return multistream_wait(yamuxChannelContext->child_stream, timeout_secs - counter);
} else {
struct SessionContext* session_context = (struct SessionContext*)context;
while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (counter <= 5) {
return multistream_wait(session_context->default_stream, timeout_secs - counter);
}
}
return 0;
}

View file

@ -7,6 +7,7 @@
* Handle the different protocols
*/
/***
* Compare incoming to see if they are requesting a protocol upgrade
* @param incoming the incoming string
@ -26,6 +27,19 @@ const struct Libp2pProtocolHandler* protocol_compare(struct StreamMessage* msg,
return NULL;
}
/***
* Retrieve the correct protocol handlder for a particular protocol id
* @param protocol_handlers the collection of protocol handlers
* @param id the protocol id
* @returns a protocol handler that can handle id (or NULL if none found)
*/
const struct Libp2pProtocolHandler* libp2p_protocol_get_handler(struct Libp2pVector* protocol_handlers, const char* id) {
struct StreamMessage message;
message.data_size = strlen(id);
message.data = (uint8_t*)id;
return protocol_compare(&message, protocol_handlers);
}
/**
* Allocate resources for a new Libp2pProtocolHandler
* @returns an allocated struct
@ -50,6 +64,16 @@ void libp2p_protocol_handler_free(struct Libp2pProtocolHandler* handler) {
free(handler);
}
int appears_to_be_a_protocol(struct StreamMessage* msg) {
if (msg == NULL)
return 0;
if (msg->data_size < 2)
return 0;
if (memchr(&msg->data[1], '\n', msg->data_size-1) != NULL)
return 1;
return 0;
}
/***
* Handle an incoming message
* @param message the incoming message
@ -61,6 +85,12 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, st
const struct Libp2pProtocolHandler* handler = protocol_compare(msg, handlers);
if (handler == NULL) {
if (appears_to_be_a_protocol(msg)) {
struct StreamMessage na_message;
na_message.data = (uint8_t*)"na\n";
na_message.data_size = 3;
stream->write(stream->stream_context, &na_message);
}
// set the msg->error code
msg->error_number = 100;
return -1;

View file

@ -20,6 +20,7 @@ struct SwarmSession {
int DEFAULT_NETWORK_TIMEOUT = 5;
/***
* Listens on a particular stream, and marshals the request
* @param stream the stream to listen to
@ -29,6 +30,8 @@ int DEFAULT_NETWORK_TIMEOUT = 5;
int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* protocol_handlers) {
struct StreamMessage* results = NULL;
int retVal = 0;
if (stream == NULL)
return -1;
// Read from the network
libp2p_logger_debug("swarm", "Attempting to get read lock.\n");
pthread_mutex_lock(stream->socket_mutex);

View file

@ -140,7 +140,7 @@ int test_yamux_identify() {
goto exit;
// Now add in another protocol
mock_stream->read = mock_identify_read_protocol;
if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream, handler->context))) {
if (!libp2p_yamux_stream_add(yamux_stream->stream_context, libp2p_identify_stream_new(yamux_stream, handler->context, 1))) {
goto exit;
}
// tear down

View file

@ -282,7 +282,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
libp2p_logger_debug("yamux", "We found our stream id of %d.\n", f.streamid);
if (f.flags & yamux_frame_rst)
{
libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n");
libp2p_logger_debug("yamux", "They are asking that stream %d be reset.\n", f.streamid);
// close the stream
s->state = yamux_stream_closed;
@ -291,10 +291,10 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
}
else if (f.flags & yamux_frame_fin)
{
libp2p_logger_debug("yamux", "They are asking that this stream be closed.\n");
libp2p_logger_debug("yamux", "They are asking that stream %d be closed.\n", f.streamid);
// local stream didn't initiate FIN
if (s->state != yamux_stream_closing)
yamux_stream_close(context);
yamux_stream_close(libp2p_yamux_get_parent_channel_context(s->stream));
s->state = yamux_stream_closed;
@ -303,7 +303,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
}
else if (f.flags & yamux_frame_ack)
{
libp2p_logger_debug("yamux", "They sent an ack.\n");
libp2p_logger_debug("yamux", "They sent an ack for stream %d.\n", f.streamid);
// acknowldegement
if (s->state != yamux_stream_syn_sent) {
libp2p_logger_debug("yamux", "We received an ack, but it seems we never sent anything!\n");
@ -317,15 +317,10 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
return -EPROTO;
}
libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size);
if (f.streamid == 2) {
ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size);
libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re);
return (re < 0) ? re : (re + incoming_size);
} else {
libp2p_logger_debug("yamux", "Only handling stream 2 for now.\n");
return 0;
}
libp2p_logger_debug("yamux", "Processing the data after the frame for stream %d, which is %d bytes.\n", f.streamid, incoming_size - frame_size);
ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size);
libp2p_logger_debug("yamux", "decode: yamux_stream_process for stream %d returned %d.\n", f.streamid, (int)re);
return (re < 0) ? re : (re + incoming_size);
//yamux_pull_message_from_frame(incoming, incoming_size, return_message);
} // stream id matches
}
@ -339,52 +334,44 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
// if we didn't initiate it, add this new channel (odd stream id is from client, even is from server)
if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) {
// JMJ just debugging stream 2 for now
if (f.streamid == 2) {
libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid);
struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message);
if (yamuxChannelStream == NULL) {
libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid);
return -EPROTO;
}
struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context;
if (yamux_session->new_stream_fn) {
libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid);
yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message);
}
// handle window update (if there is one)
struct yamux_session_stream ss = yamux_session->streams[f.streamid];
ss.alive = 1;
ss.stream = yamux_stream_new();
ss.stream->id = f.streamid;
ss.stream->session = yamux_session;
ss.stream->state = yamux_stream_syn_recv;
ss.stream->window_size = 0;
yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size);
channelContext->state = yamux_stream_syn_recv;
if (f.type == yamux_frame_window_update) {
libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid);
// send it back
yamux_stream_window_update(channelContext, ss.stream->window_size);
}
// TODO: Start negotiations of multistream
struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0);
if (multistream != NULL) {
libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid);
// this should already be done
// channelContext->child_stream = multistream;
} else {
libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid);
}
} else {
libp2p_logger_error("yamux", "Temporarily not doing anthing for streams other than stream 2.\n");
return 0;
}
libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid);
struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message);
if (yamuxChannelStream == NULL) {
libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid);
return -EPROTO;
}
struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context;
if (yamux_session->new_stream_fn) {
libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid);
yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message);
}
// handle window update (if there is one)
struct yamux_session_stream ss = yamux_session->streams[f.streamid];
ss.alive = 1;
ss.stream = yamux_stream_new();
ss.stream->id = f.streamid;
ss.stream->session = yamux_session;
ss.stream->state = yamux_stream_syn_recv;
ss.stream->window_size = 0;
yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size);
channelContext->state = yamux_stream_syn_recv;
if (f.type == yamux_frame_window_update) {
libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid);
// send it back
yamux_stream_window_update(channelContext, ss.stream->window_size);
}
// TODO: Start negotiations of multistream
struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0);
if (multistream != NULL) {
libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid);
// this should already be done
// channelContext->child_stream = multistream;
} else {
libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid);
}
} else {
libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)"));
}
libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)"));
}
}
else {
libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right.");
@ -404,7 +391,7 @@ struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* sess
for (size_t i = 0; i < session->cap_streams; ++i)
{
struct yamux_session_stream* ss = &session->streams[i];
if (ss->stream->stream->channel == channel)
if (ss != NULL && ss->stream != NULL && ss->stream->stream != NULL && ss->stream->stream->channel == channel)
return ss;
}
return NULL;

View file

@ -92,7 +92,7 @@ FOUND:;
// success
}
*/
struct Stream* channelStream = libp2p_yamux_channel_stream_new(context->stream, id);
struct Stream* channelStream = nst.stream;
struct YamuxChannelContext* channel = (struct YamuxChannelContext*)channelStream->stream_context;
channel->channel = id;
channel->child_stream = NULL;
@ -152,6 +152,9 @@ ssize_t yamux_stream_init(struct YamuxChannelContext* channel_ctx)
*/
ssize_t yamux_stream_close(void* context)
{
if (context == NULL)
return 0;
if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) {
struct YamuxChannelContext* channel_ctx = (struct YamuxChannelContext*) context;
if (!channel_ctx || channel_ctx->state != yamux_stream_est || channel_ctx->closed)
@ -373,23 +376,31 @@ struct yamux_stream* yamux_stream_new() {
*/
void* yamux_read_method(void* args) {
struct YamuxChannelContext* context = (struct YamuxChannelContext*) args;
context->read_running = 1;
struct StreamMessage* message = NULL;
// continue to read until the buffer is empty
while (context->buffer->buffer_size > 0) {
if (!context->read_running) {
context->read_running = 1;
if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) {
context->read_running = 0;
libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data);
int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers);
libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal);
libp2p_stream_message_free(message);
} else {
context->read_running = 0;
libp2p_logger_debug("yamux", "read_method: read returned false.\n");
}
if (context->child_stream == NULL || context->child_stream->stream_context == NULL || context->child_stream->read == NULL) {
libp2p_logger_error("yamux", "read_method: Child stream not set up properly for channel %d.\n", context->channel);
context->read_running = 0;
return NULL;
}
struct Stream* child_stream = context->child_stream;
if (child_stream == NULL || child_stream->read == NULL) {
libp2p_logger_error("yamux", "read_method: Child stream not set up properly for channel %d.\n", context->channel);
context->read_running = 0;
return NULL;
}
if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) {
libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data);
int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers);
libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal);
libp2p_stream_message_free(message);
} else {
libp2p_logger_debug("yamux", "read_method: read returned false.\n");
}
}
context->read_running = 0;
return NULL;
}
@ -398,10 +409,12 @@ void* yamux_read_method(void* args) {
* @param context the YamuxChannelContext
*/
int libp2p_yamux_notify_child_stream_has_data(struct YamuxChannelContext* context) {
pthread_t new_thread;
if (!context->read_running) {
pthread_t new_thread;
if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0)
return 1;
if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0)
return 1;
}
return 0;
}

View file

@ -13,6 +13,20 @@
int libp2p_yamux_channels_free(struct YamuxContext* ctx);
struct Stream* libp2p_yamux_get_parent_stream(void* context);
/***
* Walks down the tree, looking for the nearest YamuxChannelContext
* @param in the stream
* @returns the YamuxChannelContext or NULL
*/
struct YamuxChannelContext* libp2p_yamux_get_parent_channel_context(struct Stream* in) {
if (in == NULL)
return NULL;
struct YamuxChannelContext* retVal = libp2p_yamux_get_channel_context(in->stream_context);
if (retVal == NULL) {
return libp2p_yamux_get_parent_channel_context(in->parent_stream);
}
return NULL;
}
/**
* Given a context, get the YamuxChannelContext
* @param stream_context the context
@ -32,6 +46,8 @@ struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_contex
* @returns the YamuxContext, or NULL on error
*/
struct YamuxContext* libp2p_yamux_get_context(void* stream_context) {
if (stream_context == NULL)
return NULL;
char proto = ((uint8_t*)stream_context)[0];
struct YamuxChannelContext* channel = NULL;
struct YamuxContext* ctx = NULL;
@ -527,13 +543,13 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size
* @param stream the parent stream
* @returns a YamuxContext
*/
struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) {
struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream, int amServer) {
struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext));
if (ctx != NULL) {
ctx->type = YAMUX_CONTEXT;
ctx->stream = NULL;
ctx->channels = libp2p_utils_vector_new(1);
ctx->session = yamux_session_new(NULL, stream, yamux_session_server, NULL);
ctx->session = yamux_session_new(NULL, stream, amServer ? yamux_session_server : yamux_session_client, NULL);
ctx->am_server = 0;
ctx->state = 0;
ctx->buffered_message = NULL;
@ -568,11 +584,7 @@ int libp2p_yamux_send_protocol(struct Stream* stream) {
int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_stream) {
// put this stream in the collection, and tie it to an id
if (libp2p_logger_watching_class("yamux")) {
const char* stream_type = "";
if (new_stream->stream_type == STREAM_TYPE_MULTISTREAM) {
stream_type = "Multistream";
}
libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type);
libp2p_logger_debug("yamux", "handle_upgrade called for stream type %d.\n", new_stream->stream_type);
}
struct YamuxContext* yamux_context = libp2p_yamux_get_context(yamux_stream->stream_context);
struct YamuxChannelContext* yamux_channel_context = libp2p_yamux_get_channel_context(yamux_stream->stream_context);
@ -624,7 +636,7 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv
out->address = parent_stream->address;
out->socket_mutex = parent_stream->socket_mutex;
// build YamuxContext
struct YamuxContext* ctx = libp2p_yamux_context_new(out);
struct YamuxContext* ctx = libp2p_yamux_context_new(out, am_server);
if (ctx == NULL) {
libp2p_yamux_stream_free(out);
return NULL;
@ -778,18 +790,15 @@ int libp2p_yamux_channel_null_close(struct Stream* stream) {
* @returns a new Stream that has a YamuxChannelContext
*/
struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, int channelNumber) {
struct YamuxContext* yamuxContext = libp2p_yamux_get_context(incoming_stream->stream_context);
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
int isYamux = 0;
char first_char = ((uint8_t*)incoming_stream->stream_context)[0];
if (first_char == YAMUX_CONTEXT)
isYamux = 1;
out->stream_type = STREAM_TYPE_YAMUX;
out->address = incoming_stream->address;
// don't allow the incoming_stream to close the channel
out->close = libp2p_yamux_channel_null_close;
struct YamuxChannelContext* ctx = (struct YamuxChannelContext*)malloc(sizeof(struct YamuxChannelContext));
if (!isYamux) {
if (yamuxContext == NULL) {
out->parent_stream = incoming_stream->parent_stream;
out->peek = incoming_stream->parent_stream->peek;
out->read = incoming_stream->parent_stream->read;
@ -801,14 +810,14 @@ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, i
// this does the wrap
incoming_stream->parent_stream = out;
} else {
out->parent_stream = incoming_stream;
out->peek = incoming_stream->peek;
out->read = incoming_stream->read;
out->read_raw = incoming_stream->read_raw;
out->write = incoming_stream->write;
out->socket_mutex = incoming_stream->socket_mutex;
ctx->yamux_context = incoming_stream->stream_context;
ctx->child_stream = NULL;
out->parent_stream = incoming_stream;
out->peek = incoming_stream->peek;
out->read = incoming_stream->read;
out->read_raw = incoming_stream->read_raw;
out->write = incoming_stream->write;
out->socket_mutex = incoming_stream->socket_mutex;
ctx->yamux_context = incoming_stream->stream_context;
ctx->child_stream = NULL;
}
ctx->channel = (uint32_t) channelNumber;
ctx->closed = 0;