Added buffering to yamux

This commit is contained in:
John Jones 2017-11-30 14:32:36 -05:00
parent 5af0422d74
commit e67d626000
17 changed files with 568 additions and 135 deletions

View file

@ -141,7 +141,7 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
return 0; return 0;
libp2p_logger_debug("dialer", "We successfully negotiated multistream over secio.\n"); libp2p_logger_debug("dialer", "We successfully negotiated multistream over secio.\n");
// yamux over multistream // yamux over multistream
new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, NULL); new_stream = libp2p_yamux_stream_new(peer->sessionContext->default_stream, 0, dialer->swarm->protocol_handlers);
if (new_stream != NULL) { if (new_stream != NULL) {
if (!libp2p_yamux_stream_ready(peer->sessionContext, 5)) if (!libp2p_yamux_stream_ready(peer->sessionContext, 5))
return 0; return 0;

View file

@ -376,7 +376,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
out->stream_context = ctx; out->stream_context = ctx;
out->close = libp2p_identify_close; out->close = libp2p_identify_close;
out->negotiate = libp2p_identify_stream_new; out->negotiate = libp2p_identify_stream_new;
out->handle_message = libp2p_identify_handle_message; out->bytes_waiting = NULL;
// do we expect a reply? // do we expect a reply?
if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) { if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) {
libp2p_stream_free(out); libp2p_stream_free(out);

View file

@ -63,3 +63,10 @@ int libp2p_protocol_marshal(struct StreamMessage* message, struct Stream* stream
* @returns true(1) * @returns true(1)
*/ */
int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers); int libp2p_protocol_handlers_shutdown(struct Libp2pVector* handlers);
/***
* Check to see if this is a valid protocol
* @param msg the message
* @param handlers the vector of handlers
*/
int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers);

View file

@ -133,12 +133,10 @@ struct Stream {
/**** /****
* A message has been received, and needs to be handled * A message has been received, and needs to be handled
* @param message the message received * @param stream the stream that has the message waiting
* @param stream where the message came from * @returns number of bytes processed
* @param protocol_context the context for the protocol
* @returns < 0 on error, 0 if no further processing needs to be done, or 1 for success
*/ */
int (*handle_message)(const struct StreamMessage* message, struct Stream* stream, void* protocol_context); int (*bytes_waiting)(struct Stream* stream);
}; };
struct Stream* libp2p_stream_new(); struct Stream* libp2p_stream_new();
@ -165,3 +163,18 @@ int libp2p_stream_lock(struct Stream* stream);
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_stream_unlock(struct Stream* stream); int libp2p_stream_unlock(struct Stream* stream);
/***
* Determine if this stream is open
* @param stream the stream to check
* @returns true(1) if the stream is open, false otherwise
*/
int libp2p_stream_is_open(struct Stream* stream);
/**
* Look for the latest stream
* (properly handles both raw streams and yamux streams)
* @param in the incoming stream
* @returns the latest child stream
*/
struct Stream* libp2p_stream_get_latest_stream(struct Stream* in);

View file

@ -0,0 +1,58 @@
#pragma once
/**
* A thredsafe buffer
*/
#include <string.h>
#include <pthread.h>
#include <stdint.h>
/***
* Holds the information about the buffer
*/
struct ThreadsafeBufferContext {
size_t buffer_size;
uint8_t* buffer;
pthread_mutex_t lock;
};
/***
* Allocate a new context
* @returns a newly allocated context, or NULL on error (out of memory?)
*/
struct ThreadsafeBufferContext* threadsafe_buffer_context_new();
/***
* Free resources of a buffer context
* @param context the context
*/
void threadsafe_buffer_context_free(struct ThreadsafeBufferContext* context);
/***
* Read from the buffer without destroying its contents or moving its read pointer
* @param context the context
* @param results where to put the results
* @param results_size the size of the results
* @returns number of bytes read
*/
size_t threadsafe_buffer_peek(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size);
/***
* Read from the buffer.
* NOTE: If results_size is more than what is left in the buffer, this will read everything.
* @param context the context
* @param results where to put the results
* @param results_size the size of the buffer
* @returns number of bytes read
*/
size_t threadsafe_buffer_read(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size);
/****
* Add bytes to the end of the buffer
* @param context the context
* @param bytes the bytes to add
* @param bytes_size the size of bytes
* @returns the size added to the buffer (0 on error)
*/
size_t threadsafe_buffer_write(struct ThreadsafeBufferContext* context, const uint8_t* bytes, size_t bytes_size);

View file

@ -145,3 +145,11 @@ ssize_t yamux_session_read(struct yamux_session* session);
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message); int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message);
/***
* Find the correct yamux session stream
* @param streams the collection
* @param channel the id
* @returns the correce yamux_session_stream
*/
struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* session, int channel);

View file

@ -2,6 +2,7 @@
#include "libp2p/net/protocol.h" #include "libp2p/net/protocol.h"
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
#include "libp2p/utils/threadsafe_buffer.h"
#include "libp2p/yamux/stream.h" #include "libp2p/yamux/stream.h"
/*** /***
@ -54,6 +55,10 @@ struct YamuxChannelContext {
int state; int state;
// whether or not the connection is closed // whether or not the connection is closed
int closed; int closed;
// a buffer for data coming in from the network
struct ThreadsafeBufferContext* buffer;
// true if read is already running
int read_running;
}; };
/** /**

View file

@ -26,6 +26,7 @@ int multistream_default_timeout = 5;
// forward declarations // forward declarations
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context); int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context);
int libp2p_net_multistream_bytes_waiting(struct Stream* stream);
int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) {
@ -271,8 +272,9 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** res
rslts = NULL; rslts = NULL;
} }
// now get the data from the parent stream // now get the data from the parent stream
if (!parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs)) { int bytes_read = parent_stream->read_raw(parent_stream->stream_context, rslts->data, rslts->data_size, timeout_secs);
libp2p_logger_error("multistream", "read: Was supposed to read %d bytes, but read_raw returned false.\n", num_bytes_requested); if (bytes_read != num_bytes_requested) {
libp2p_logger_error("multistream", "read: Was supposed to read %d bytes, but read_raw returned %d.\n", num_bytes_requested, bytes_read);
// problem reading from the parent stream // problem reading from the parent stream
libp2p_stream_message_free(*results); libp2p_stream_message_free(*results);
*results = NULL; *results = NULL;
@ -366,30 +368,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq
struct StreamMessage outgoing; struct StreamMessage outgoing;
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
int retVal = 0; int retVal = 0;
//int haveTheirs = 0;
//int peek_result = 0;
/*
if (!theyRequested) {
// see if they're trying to send something first
peek_result = libp2p_net_multistream_peek(ctx);
if (peek_result > 0) {
libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result);
// get the protocol
libp2p_net_multistream_read(ctx, &results, multistream_default_timeout);
if (results == NULL || results->data_size == 0) {
libp2p_logger_debug("multistream", "negotiate: We tried to read the %d bytes, but got nothing.\n", peek_result);
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_debug("multistream", "negotiate: We expected the multistream id, but got %s.\n", results->data);
goto exit;
}
libp2p_logger_debug("multistream", "negotiate: We read %d bytes from the network, and received the multistream id.\n", results->data_size);
haveTheirs = 1;
}
}
*/
// send the protocol id // send the protocol id
outgoing.data = (uint8_t*)protocolID; outgoing.data = (uint8_t*)protocolID;
outgoing.data_size = strlen(protocolID); outgoing.data_size = strlen(protocolID);
@ -405,23 +384,6 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq
ctx->status = multistream_status_syn; ctx->status = multistream_status_syn;
} }
/*
// wait for them to send the protocol id back
if (!theyRequested && !haveTheirs) {
libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n");
// expect the same back
int retVal = libp2p_net_multistream_read(ctx, &results, multistream_default_timeout);
if (retVal == 0 || results == NULL || results->data_size == 0) {
libp2p_logger_debug("multistream", "negotiate: expected the multistream id back, but got nothing. RetVal: %d.\n", retVal);
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_debug("multistream", "negotiate: Expected the multistream id back, but did not receive it. We did receive %d bytes though.\n)", results->data_size);
goto exit;
}
}
*/
retVal = 1; retVal = 1;
exit: exit:
if (results != NULL) if (results != NULL)
@ -459,9 +421,15 @@ struct Stream* libp2p_net_multistream_handshake(struct Stream* stream) {
*/ */
int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Stream* new_stream) { int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Stream* new_stream) {
// take multistream out of the picture // take multistream out of the picture
if (multistream->stream_type != STREAM_TYPE_MULTISTREAM) {
libp2p_logger_error("multistream", "Attempt to upgrade from multistream to %d, but the first parameter is not multistream, it is %d.\n", new_stream->stream_type, multistream->stream_type);
return 0;
}
if (new_stream->parent_stream == multistream) { if (new_stream->parent_stream == multistream) {
new_stream->parent_stream = multistream->parent_stream; new_stream->parent_stream = multistream->parent_stream;
multistream->parent_stream->handle_upgrade(multistream->parent_stream, new_stream); multistream->parent_stream->handle_upgrade(multistream->parent_stream, new_stream);
} else {
libp2p_logger_error("multistream", "Attempt to upgrade from multistream ti %d, but the parent stream is not multistream.\n", new_stream->stream_type);
} }
return 1; return 1;
} }
@ -486,7 +454,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->handle_upgrade = libp2p_net_multistream_handle_upgrade;
out->address = parent_stream->address; out->address = parent_stream->address;
out->socket_mutex = parent_stream->socket_mutex; out->socket_mutex = parent_stream->socket_mutex;
out->handle_message = libp2p_net_multistream_handle_message; out->bytes_waiting = libp2p_net_multistream_bytes_waiting;
// build MultistreamContext // build MultistreamContext
struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (ctx == NULL) { if (ctx == NULL) {
@ -498,6 +466,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
ctx->stream = out; ctx->stream = out;
ctx->handlers = NULL; ctx->handlers = NULL;
ctx->session_context = NULL; ctx->session_context = NULL;
parent_stream->handle_upgrade(parent_stream, out);
// attempt to negotiate multistream protocol // attempt to negotiate multistream protocol
if (!libp2p_net_multistream_negotiate(ctx, theyRequested)) { if (!libp2p_net_multistream_negotiate(ctx, theyRequested)) {
libp2p_logger_debug("multistream", "multistream_stream_new: negotiate failed\n"); libp2p_logger_debug("multistream", "multistream_stream_new: negotiate failed\n");
@ -525,6 +494,10 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
* @returns <0 on error, 0 for the caller to stop handling this, 1 for success * @returns <0 on error, 0 for the caller to stop handling this, 1 for success
*/ */
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// get the latest stream, as this stuff is multithreaded and may be stale
struct Stream* latest_stream = libp2p_stream_get_latest_stream(stream);
if (latest_stream != NULL)
stream = latest_stream;
if (stream->stream_type == STREAM_TYPE_MULTISTREAM) { if (stream->stream_type == STREAM_TYPE_MULTISTREAM) {
// we sent a multistream, and this is them responding // we sent a multistream, and this is them responding
struct MultistreamContext* ctx = (struct MultistreamContext*) stream->stream_context; struct MultistreamContext* ctx = (struct MultistreamContext*) stream->stream_context;
@ -547,6 +520,27 @@ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struc
return -1; return -1;
} }
/***
* We have bytes waiting on the network.
* Normally, multistream is just a step towards another protocol.
* @param stream this multistream
* @returns number of bytes processed
*/
int libp2p_net_multistream_bytes_waiting(struct Stream* stream) {
libp2p_logger_debug("multistream", "bytes_waiting called\n");
struct StreamMessage* message = NULL;
if (libp2p_net_multistream_read(stream->stream_context, &message, 5)) {
libp2p_logger_debug("multistream", "bytes_waiting: Read %d bytes from stream. [%s]\n", message->data_size, message->data);
struct MultistreamContext* context = (struct MultistreamContext*) stream->stream_context;
int retVal = libp2p_protocol_marshal(message, stream, context->handlers);
if (retVal >= 0)
return message->data_size;
} else {
libp2p_logger_error("multistream", "bytes_waiting said there were bytes waiting, but read was false.\n");
}
return 0;
}
/*** /***
* The handler to handle calls to the protocol * The handler to handle calls to the protocol
* @param handler_vector a Libp2pVector of protocol handlers * @param handler_vector a Libp2pVector of protocol handlers

View file

@ -69,6 +69,17 @@ int libp2p_protocol_marshal(struct StreamMessage* msg, struct Stream* stream, st
return handler->HandleMessage(msg, stream, handler->context); return handler->HandleMessage(msg, stream, handler->context);
} }
/***
* Check to see if this is a valid protocol
* @param msg the message
* @param handlers the vector of handlers
*/
int libp2p_protocol_is_valid_protocol(struct StreamMessage* msg, struct Libp2pVector* handlers) {
if (protocol_compare(msg, handlers) == NULL)
return 0;
return 1;
}
/*** /***
* Shut down all protocol handlers and free vector * Shut down all protocol handlers and free vector
* @param handlers vector of Libp2pProtocolHandler * @param handlers vector of Libp2pProtocolHandler

View file

@ -3,6 +3,7 @@
#include "multiaddr/multiaddr.h" #include "multiaddr/multiaddr.h"
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
#include "libp2p/net/connectionstream.h" #include "libp2p/net/connectionstream.h"
#include "libp2p/yamux/yamux.h"
int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) { int libp2p_stream_default_handle_upgrade(struct Stream* parent_stream, struct Stream* new_stream) {
return libp2p_net_connection_upgrade(parent_stream, new_stream); return libp2p_net_connection_upgrade(parent_stream, new_stream);
@ -39,3 +40,41 @@ void libp2p_stream_free(struct Stream* stream) {
free(stream); free(stream);
} }
} }
int libp2p_stream_is_open(struct Stream* stream) {
if (stream == NULL)
return 0;
struct Stream* base_stream = stream;
while (base_stream->parent_stream != NULL)
base_stream = base_stream->parent_stream;
if (base_stream->stream_type == STREAM_TYPE_RAW) {
struct ConnectionContext* ctx = (struct ConnectionContext*)base_stream->stream_context;
if (ctx->socket_descriptor > 0)
return 1;
}
return 0;
}
// forward declaration
struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* stream_context);
/**
* Look for the latest stream
* (properly handles both raw streams and yamux streams)
* @param in the incoming stream
* @returns the latest child stream
*/
struct Stream* libp2p_stream_get_latest_stream(struct Stream* in) {
if (in == NULL)
return NULL;
if (in->stream_type == STREAM_TYPE_RAW) {
struct ConnectionContext* ctx = (struct ConnectionContext*)in->stream_context;
return ctx->session_context->default_stream;
} else if (in->stream_type == STREAM_TYPE_YAMUX) {
struct YamuxChannelContext* ctx = libp2p_yamux_get_channel_context(in->stream_context);
if (ctx != NULL)
return ctx->child_stream;
}
return libp2p_stream_get_latest_stream(in->parent_stream);
}

View file

@ -584,6 +584,21 @@ int libp2p_secio_get_socket_descriptor(struct Stream* stream) {
return ctx->socket_descriptor; return ctx->socket_descriptor;
} }
/***
* Navigate down the tree of streams, and set the raw socket descriptor to 0,
* as it appears the connection has been closed. Cleanup will happen later.
* @param stream the stream
* @returns true(1)
*/
int libp2p_secio_set_socket_descriptor(struct Stream* stream) {
struct Stream* current = stream;
while (current->parent_stream != NULL)
current = current->parent_stream;
struct ConnectionContext* ctx = current->stream_context;
ctx->socket_descriptor = 0;
return 1;
}
/*** /***
* Write bytes to an unencrypted stream * Write bytes to an unencrypted stream
* @param session the session information * @param session the session information
@ -654,6 +669,9 @@ int libp2p_secio_unencrypted_read(struct Stream* secio_stream, struct StreamMess
int socket_descriptor = libp2p_secio_get_socket_descriptor(secio_stream); int socket_descriptor = libp2p_secio_get_socket_descriptor(secio_stream);
if (socket_descriptor <= 0)
return 0;
// first read the 4 byte integer // first read the 4 byte integer
char* size = (char*)&buffer_size; char* size = (char*)&buffer_size;
int left = 4; int left = 4;
@ -680,8 +698,11 @@ int libp2p_secio_unencrypted_read(struct Stream* secio_stream, struct StreamMess
libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno)); libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno));
return 0; return 0;
} }
else else {
libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: 0 bytes read, but errno shows no error. Trying again.\n"); libp2p_logger_error("secio", "Stream has been shut down from other end.\n");
libp2p_secio_set_socket_descriptor(secio_stream);
return 0;
}
} }
} else { } else {
left = left - read_this_time; left = left - read_this_time;

View file

@ -36,6 +36,10 @@ int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* p
if (!stream->read(stream->stream_context, &results, 1)) { if (!stream->read(stream->stream_context, &results, 1)) {
libp2p_logger_debug("swarm", "Releasing read lock\n"); libp2p_logger_debug("swarm", "Releasing read lock\n");
pthread_mutex_unlock(stream->socket_mutex); pthread_mutex_unlock(stream->socket_mutex);
if (!libp2p_stream_is_open(stream)) {
libp2p_logger_error("swarm", "Attempted read on stream, but has been closed.\n");
return -1;
}
libp2p_logger_error("swarm", "Unable to read from network (could just be a timeout). Exiting the read.\n"); libp2p_logger_error("swarm", "Unable to read from network (could just be a timeout). Exiting the read.\n");
return retVal; return retVal;
} }

View file

@ -7,7 +7,7 @@ endif
LFLAGS = LFLAGS =
DEPS = DEPS =
OBJS = string_list.o vector.o linked_list.o logger.o urlencode.o thread_pool.o OBJS = string_list.o vector.o linked_list.o logger.o urlencode.o thread_pool.o threadsafe_buffer.o
%.o: %.c $(DEPS) %.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS) $(CC) -c -o $@ $< $(CFLAGS)

123
utils/threadsafe_buffer.c Normal file
View file

@ -0,0 +1,123 @@
/**
* A thredsafe buffer
*/
#include <stdlib.h>
#include "libp2p/utils/threadsafe_buffer.h"
#include "libp2p/utils/logger.h"
/***
* Allocate a new context
* @returns a newly allocated context, or NULL on error (out of memory?)
*/
struct ThreadsafeBufferContext* threadsafe_buffer_context_new() {
struct ThreadsafeBufferContext* context = (struct ThreadsafeBufferContext*) malloc(sizeof(struct ThreadsafeBufferContext));
if (context != NULL) {
context->buffer_size = 0;
context->buffer = NULL;
pthread_mutex_init(&context->lock, NULL);
}
return context;
}
/***
* Free resources of a buffer context
* @param context the context
*/
void threadsafe_buffer_context_free(struct ThreadsafeBufferContext* context) {
if (context != NULL) {
if (context->buffer != NULL)
free(context->buffer);
free(context);
}
}
/***
* Read from the buffer without destroying its contents or moving its read pointer
* @param context the context
* @param results where to put the results
* @param results_size the size of the results
* @returns number of bytes read
*/
size_t threadsafe_buffer_peek(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size) {
size_t bytes_read = 0;
if (context == NULL)
return 0;
pthread_mutex_lock(&context->lock);
// do the read
if (context->buffer != NULL && context->buffer_size > 0) {
bytes_read = results_size < context->buffer_size ? results_size : context->buffer_size;
memcpy(results, context->buffer, bytes_read);
}
pthread_mutex_unlock(&context->lock);
return bytes_read;
}
/***
* Read from the buffer.
* NOTE: If results_size is more than what is left in the buffer, this will read everything.
* @param context the context
* @param results where to put the results
* @param results_size the size of the buffer
* @returns number of bytes read
*/
size_t threadsafe_buffer_read(struct ThreadsafeBufferContext* context, uint8_t* results, size_t results_size) {
size_t bytes_read = 0;
if (context == NULL)
return 0;
pthread_mutex_lock(&context->lock);
// do the read
if (context->buffer != NULL && context->buffer_size > 0) {
bytes_read = results_size < context->buffer_size ? results_size : context->buffer_size;
libp2p_logger_debug("threadsafe_buffer", "read: We want to read %d bytes, and have %d in the buffer. Therefore, we will read %d.\n", results_size, context->buffer_size, bytes_read);
memcpy(results, context->buffer, bytes_read);
}
// adjust the size
if (bytes_read > 0) {
if (context->buffer_size - bytes_read > 0) {
// more remains
size_t new_buffer_size = context->buffer_size - bytes_read;
uint8_t* new_buffer = (uint8_t*) malloc(new_buffer_size);
memcpy(new_buffer, &context->buffer[bytes_read], new_buffer_size);
free(context->buffer);
context->buffer = new_buffer;
context->buffer_size = new_buffer_size;
} else {
// everything has been read
free(context->buffer);
context->buffer = NULL;
context->buffer_size = 0;
}
}
pthread_mutex_unlock(&context->lock);
return bytes_read;
}
/****
* Add bytes to the end of the buffer
* @param context the context
* @param bytes the bytes to add
* @param bytes_size the size of bytes
* @returns the size added to the buffer (0 on error)
*/
size_t threadsafe_buffer_write(struct ThreadsafeBufferContext* context, const uint8_t* bytes, size_t bytes_size) {
if (context == NULL)
return 0;
if (bytes_size == 0)
return 0;
size_t bytes_copied = 0;
pthread_mutex_lock(&context->lock);
// allocate memory
uint8_t* new_buffer = (uint8_t*) realloc(context->buffer, context->buffer_size + bytes_size);
if (new_buffer != NULL) {
// copy data
memcpy(&new_buffer[context->buffer_size], bytes, bytes_size);
context->buffer_size += bytes_size;
context->buffer = new_buffer;
bytes_copied = bytes_size;
libp2p_logger_debug("threadsafe_buffer", "write: Added %d bytes. Buffer now contains %d bytes.\n", bytes_size, context->buffer_size);
}
pthread_mutex_unlock(&context->lock);
return bytes_copied;
}

View file

@ -318,10 +318,15 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
} }
libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size); libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size);
if (f.streamid == 2) {
ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size);
libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re); libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re);
//yamux_pull_message_from_frame(incoming, incoming_size, return_message);
return (re < 0) ? re : (re + incoming_size); return (re < 0) ? re : (re + incoming_size);
} else {
libp2p_logger_debug("yamux", "Only handling stream 2 for now.\n");
return 0;
}
//yamux_pull_message_from_frame(incoming, incoming_size, return_message);
} // stream id matches } // stream id matches
} }
@ -367,7 +372,8 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0);
if (multistream != NULL) { if (multistream != NULL) {
libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid); libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid);
channelContext->child_stream = multistream; // this should already be done
// channelContext->child_stream = multistream;
} else { } else {
libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid);
} }
@ -388,3 +394,19 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
return 0; return 0;
} }
/***
* Find the correct yamux session stream
* @param streams the collection
* @param channel the id
* @returns the correce yamux_session_stream
*/
struct yamux_session_stream* yamux_get_session_stream(struct yamux_session* session, int channel) {
for (size_t i = 0; i < session->cap_streams; ++i)
{
struct yamux_session_stream* ss = &session->streams[i];
if (ss->stream->stream->channel == channel)
return ss;
}
return NULL;
}

View file

@ -11,6 +11,7 @@
#include "libp2p/yamux/stream.h" #include "libp2p/yamux/stream.h"
#include "libp2p/yamux/yamux.h" #include "libp2p/yamux/yamux.h"
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
#include "libp2p/utils/threadsafe_buffer.h"
#define MIN(x,y) (y^((x^y)&-(x<y))) #define MIN(x,y) (y^((x^y)&-(x<y)))
#define MAX(x,y) (x^((x^y)&-(x<y))) #define MAX(x,y) (x^((x^y)&-(x<y)))
@ -365,6 +366,45 @@ struct yamux_stream* yamux_stream_new() {
return out; return out;
} }
/***
* Called by notify_child_stream_has_data to process incoming data (perhaps)
* @param args a YamuxChannelContext
* @returns NULL;
*/
void* yamux_read_method(void* args) {
struct YamuxChannelContext* context = (struct YamuxChannelContext*) args;
struct StreamMessage* message = NULL;
// continue to read until the buffer is empty
while (context->buffer->buffer_size > 0) {
if (!context->read_running) {
context->read_running = 1;
if (context->child_stream->read(context->child_stream->stream_context, &message, 5) && message != NULL) {
context->read_running = 0;
libp2p_logger_debug("yamux", "read_method: read returned a message of %d bytes. [%s]\n", message->data_size, message->data);
int retVal = libp2p_protocol_marshal(message, context->child_stream, context->yamux_context->protocol_handlers);
libp2p_logger_debug("yamux", "read_method: protocol_marshal returned %d.\n", retVal);
libp2p_stream_message_free(message);
} else {
context->read_running = 0;
libp2p_logger_debug("yamux", "read_method: read returned false.\n");
}
}
}
return NULL;
}
/**
* spin off a new thread to handle a child's reading of data
* @param context the YamuxChannelContext
*/
int libp2p_yamux_notify_child_stream_has_data(struct YamuxChannelContext* context) {
pthread_t new_thread;
if (pthread_create(&new_thread, NULL, yamux_read_method, context) == 0)
return 1;
return 0;
}
/*** /***
* A frame came in. This looks at the data after the frame and does the right thing. * A frame came in. This looks at the data after the frame and does the right thing.
* @param stream the stream * @param stream the stream
@ -405,17 +445,48 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr
stream->read_fn(stream, f.length, (void*)incoming); stream->read_fn(stream, f.length, (void*)incoming);
*/ */
// the new way // the new way
struct StreamMessage stream_message; // get the yamux channel stream
stream_message.data_size = incoming_size; struct Stream* channel_stream = stream->stream;
stream_message.data = (uint8_t*)incoming; while(channel_stream != NULL && channel_stream->stream_type != STREAM_TYPE_YAMUX)
libp2p_logger_debug("yamux", "Calling handle_message for stream type %d with message of %d bytes. [%s]\n", stream->stream->stream_type, stream_message.data_size, stream_message.data); channel_stream = channel_stream->parent_stream;
struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream->stream->stream_context); struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(channel_stream->stream_context);
if (channelContext == NULL) { if (channelContext == NULL) {
libp2p_logger_error("yamux", "Unable to get channel context for stream %d.\n", frame->streamid); libp2p_logger_error("yamux", "Unable to get channel context for stream %d.\n", frame->streamid);
return -EPROTO; return -EPROTO;
} }
channelContext->child_stream->handle_message(&stream_message, channelContext->child_stream, NULL); libp2p_logger_debug("yamux", "writing %d bytes to channel context %d.\n", incoming_size, channelContext->channel);
threadsafe_buffer_write(channelContext->buffer, incoming, incoming_size);
if(channelContext->child_stream == NULL) {
// we have to handle this ourselves
// see if we have the entire message
int buffer_size = channelContext->buffer->buffer_size;
uint8_t buffer[buffer_size];
buffer_size = threadsafe_buffer_peek(channelContext->buffer, buffer, buffer_size);
struct StreamMessage message;
message.data_size = buffer_size;
message.data = buffer;
if (libp2p_protocol_is_valid_protocol(&message, channelContext->yamux_context->protocol_handlers)) {
// marshal the call
buffer_size = threadsafe_buffer_read(channelContext->buffer, buffer, buffer_size);
message.data_size = buffer_size;
message.data = buffer;
libp2p_protocol_marshal(&message, stream->stream, channelContext->yamux_context->protocol_handlers);
}
} else {
// Alert the child protocol that these bytes came in.
// NOTE: We're doing the work in a separate thread
// we tell them, and they need to be smart enough to see if this is a complete message or not
libp2p_yamux_notify_child_stream_has_data(channelContext);
/*
struct StreamMessage* message = NULL;
if (channelContext->child_stream->read(channelContext->child_stream->stream_context, &message, 5) && message != NULL) {
int retVal = libp2p_protocol_marshal(message, channelContext->child_stream, channelContext->yamux_context->protocol_handlers);
libp2p_stream_message_free(message);
if (retVal < 0)
return 0;
}
*/
}
return incoming_size; return incoming_size;
} }
default: default:

View file

@ -207,6 +207,43 @@ int yamux_more_to_read(struct StreamMessage* incoming) {
return 0; return 0;
} }
int libp2p_yamux_channel_read(void* stream_context, struct StreamMessage** message, int timeout_secs) {
if (stream_context == NULL) {
libp2p_logger_error("yamux", "channel_read: stream context null.\n");
return 0;
}
struct YamuxChannelContext* context = libp2p_yamux_get_channel_context(stream_context);
if (context == NULL) {
libp2p_logger_error("yamux", "channel_read: stream_context not a channel context\n");
return 0;
}
// reserve the necessary memory
*message = libp2p_stream_message_new();
struct StreamMessage* msg = *message;
if (msg == NULL) {
libp2p_logger_error("yamux", "channel_read: Unable to allocate memory for message struct.\n");
return 0;
}
msg->data_size = context->buffer->buffer_size;
if (msg->data_size == 0) {
libp2p_logger_debug("yamux", "channel_read: Nothing to read.\n");
libp2p_stream_message_free(msg);
*message = NULL;
return 0;
}
msg->data = (uint8_t*) malloc(msg->data_size);
if (msg->data == NULL) {
libp2p_logger_error("yamux", "chanel_read: Unable to allocate memory for message data.\n");
libp2p_stream_message_free(msg);
*message = NULL;
return 0;
}
// ok, we have our struct. Now fill it
msg->data_size = threadsafe_buffer_read(context->buffer, msg->data, msg->data_size);
libp2p_logger_debug("yamux", "channel_read: Read %d bytes from buffer.\n", msg->data_size);
return msg->data_size;
}
/** /**
* Read from the network, expecting a yamux frame. * Read from the network, expecting a yamux frame.
* NOTE: This will also dispatch the frame to the correct protocol * NOTE: This will also dispatch the frame to the correct protocol
@ -220,18 +257,14 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
libp2p_logger_error("yamux", "read was passed a null context.\n"); libp2p_logger_error("yamux", "read was passed a null context.\n");
return 0; return 0;
} }
// look at the first byte of the context to determine if this is a YamuxContext (we're negotiating) struct YamuxChannelContext* channel = libp2p_yamux_get_channel_context(stream_context);
// or a YamuxChannelContext (we're talking to an established channel) if (channel != NULL) {
struct YamuxContext* ctx = NULL; // do a channel read instead
struct YamuxChannelContext* channel = NULL; return libp2p_yamux_channel_read(stream_context, message, timeout_secs);
char proto = ((uint8_t*)stream_context)[0];
if (proto == YAMUX_CHANNEL_CONTEXT) {
channel = (struct YamuxChannelContext*)stream_context;
ctx = channel->yamux_context;
} else if (proto == YAMUX_CONTEXT) {
ctx = (struct YamuxContext*)stream_context;
} }
struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context);
if (ctx == NULL) { if (ctx == NULL) {
libp2p_logger_error("yamux", "read: The incoming stream is not a yamux stream.\n"); libp2p_logger_error("yamux", "read: The incoming stream is not a yamux stream.\n");
return 0; return 0;
@ -257,25 +290,6 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
} }
struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context);
if (channel != NULL && channel->channel != 0) {
// I don't think this will ever be the case. This I believe to be dead code
libp2p_logger_debug("yamux", "Data received on yamux stream %d.\n", channel->channel);
// we have an established channel. Use it.
if (!parent_stream->read(parent_stream->stream_context, message, yamux_default_timeout)) {
libp2p_logger_error("yamux", "Read: Attepted to read from channel %d, but the read failed.\n", channel->channel);
return 0;
}
if (message == NULL) {
libp2p_logger_error("yamux", "Read: Successfully read from channel %d, but message was NULL.\n", channel->channel);
}
// TODO: This is not right. It must be sorted out.
struct StreamMessage* msg = *message;
libp2p_logger_debug("yamux", "Read: Received %d bytes on channel %d.\n", msg->data_size, channel->channel);
if (yamux_decode(channel, msg->data, msg->data_size, message) == 0) {
return 1;
}
libp2p_logger_error("yamux", "yamux_decode returned error.\n");
} else if (ctx != NULL) {
// this is the normal situation (not dead code). // this is the normal situation (not dead code).
struct StreamMessage* incoming = NULL; struct StreamMessage* incoming = NULL;
// we need a lock // we need a lock
@ -316,7 +330,7 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
} else { } else {
// read failed // read failed
} }
}
libp2p_logger_error("yamux", "Unable to do network read.\n"); libp2p_logger_error("yamux", "Unable to do network read.\n");
return 0; return 0;
} }
@ -439,6 +453,29 @@ int libp2p_yamux_peek(void* stream_context) {
return parent_stream->peek(parent_stream->stream_context); return parent_stream->peek(parent_stream->stream_context);
} }
/***
* Read from the yamux channel buffer, mimics a network read
* @param stream_context a YamuxChannelContext
* @param buffer where to put the results
* @param buffer_size the size of the buffer
* @param timeout_secs how long to wait (currently unused)
* @returns the number of bytes placed into the buffer
*/
int libp2p_yamux_channel_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) {
if (stream_context == NULL)
return 0;
struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream_context);
if (channelContext == NULL)
return 0;
// wait to see if we get the bytes we need
int counter = 0;
while (channelContext->buffer->buffer_size < buffer_size && counter < timeout_secs) {
sleep(1);
counter++;
}
return threadsafe_buffer_read(channelContext->buffer, buffer, buffer_size);
}
/*** /***
* Read from the network, and place it in the buffer * Read from the network, and place it in the buffer
* NOTE: This may put something in the internal read buffer (i.e. buffer_size is too small) * NOTE: This may put something in the internal read buffer (i.e. buffer_size is too small)
@ -452,6 +489,11 @@ int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size
if (stream_context == NULL) { if (stream_context == NULL) {
return -1; return -1;
} }
struct YamuxChannelContext* channel_context = libp2p_yamux_get_channel_context(stream_context);
if (channel_context != NULL) {
// do a read_raw on a channel
return libp2p_yamux_channel_read_raw(stream_context, buffer, buffer_size, timeout_secs);
}
struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context); struct YamuxContext* ctx = libp2p_yamux_get_context(stream_context);
if (ctx->buffered_message_pos == -1 || ctx->buffered_message == NULL) { if (ctx->buffered_message_pos == -1 || ctx->buffered_message == NULL) {
// we need to get info from the network // we need to get info from the network
@ -532,12 +574,23 @@ int libp2p_yamux_handle_upgrade(struct Stream* yamux_stream, struct Stream* new_
} }
libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type); libp2p_logger_debug("yamux", "handle_upgrade called for stream %s.\n", stream_type);
} }
struct YamuxContext* yamux_context = (struct YamuxContext*)yamux_stream->stream_context; struct YamuxContext* yamux_context = libp2p_yamux_get_context(yamux_stream->stream_context);
struct YamuxChannelContext* yamux_channel_context = libp2p_yamux_get_channel_context(yamux_stream->stream_context);
if (yamux_channel_context != NULL) {
// they've asked to upgrade on a channel. Make them the new default stream for this channel
yamux_channel_context->child_stream = new_stream;
struct yamux_session_stream* yamux_session_stream = yamux_get_session_stream(yamux_channel_context->yamux_context->session, yamux_channel_context->channel);
if (yamux_session_stream == NULL) {
libp2p_logger_error("yamux", "Unable to get correct session stream.\n");
return 0;
}
yamux_session_stream->stream->stream = new_stream;
return 1;
} else {
// they've asked to upgrade on the main channel. I don't think this should never happen.
libp2p_logger_debug("yamux", "handle_upgrade: Attempt to upgrade on the main yamux channel");
return libp2p_yamux_stream_add(yamux_context, new_stream); return libp2p_yamux_stream_add(yamux_context, new_stream);
} }
void libp2p_yamux_read_from_yamux_session(struct yamux_stream* stream, uint32_t data_len, void* data) {
} }
/*** /***
@ -763,7 +816,11 @@ struct Stream* libp2p_yamux_channel_stream_new(struct Stream* incoming_stream, i
ctx->window_size = 0; ctx->window_size = 0;
ctx->type = YAMUX_CHANNEL_CONTEXT; ctx->type = YAMUX_CHANNEL_CONTEXT;
ctx->stream = out; ctx->stream = out;
ctx->buffer = threadsafe_buffer_context_new();
ctx->read_running = 0;
out->stream_context = ctx; out->stream_context = ctx;
out->handle_upgrade = libp2p_yamux_handle_upgrade;
out->channel = channelNumber;
} }
return out; return out;
} }