Intermediate save trying to get yamux streams happy

This commit is contained in:
John Jones 2017-11-29 10:57:48 -05:00
parent f0342785d2
commit 5af0422d74
6 changed files with 136 additions and 94 deletions

View file

@ -376,6 +376,7 @@ struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
out->stream_context = ctx; out->stream_context = ctx;
out->close = libp2p_identify_close; out->close = libp2p_identify_close;
out->negotiate = libp2p_identify_stream_new; out->negotiate = libp2p_identify_stream_new;
out->handle_message = libp2p_identify_handle_message;
// do we expect a reply? // do we expect a reply?
if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) { if (!libp2p_identify_send_protocol(parent_stream) /* || !libp2p_identify_receive_protocol(parent_stream) */) {
libp2p_stream_free(out); libp2p_stream_free(out);

View file

@ -130,6 +130,15 @@ struct Stream {
* @returns a new Stream, or NULL on error * @returns a new Stream, or NULL on error
*/ */
struct Stream* (*negotiate)(struct Stream* parent_stream); struct Stream* (*negotiate)(struct Stream* parent_stream);
/****
* A message has been received, and needs to be handled
* @param message the message received
* @param stream where the message came from
* @param protocol_context the context for the protocol
* @returns < 0 on error, 0 if no further processing needs to be done, or 1 for success
*/
int (*handle_message)(const struct StreamMessage* message, struct Stream* stream, void* protocol_context);
}; };
struct Stream* libp2p_stream_new(); struct Stream* libp2p_stream_new();

View file

@ -24,6 +24,10 @@ int multistream_default_timeout = 5;
* An implementation of the libp2p multistream * An implementation of the libp2p multistream
*/ */
// forward declarations
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context);
int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) { int libp2p_net_multistream_can_handle(const struct StreamMessage* msg) {
if (msg == NULL || msg->data == NULL || msg->data_size == 0) if (msg == NULL || msg->data == NULL || msg->data_size == 0)
return 0; return 0;
@ -482,6 +486,7 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->handle_upgrade = libp2p_net_multistream_handle_upgrade;
out->address = parent_stream->address; out->address = parent_stream->address;
out->socket_mutex = parent_stream->socket_mutex; out->socket_mutex = parent_stream->socket_mutex;
out->handle_message = libp2p_net_multistream_handle_message;
// build MultistreamContext // build MultistreamContext
struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (ctx == NULL) { if (ctx == NULL) {

View file

@ -267,6 +267,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
} }
} }
} else { } else {
libp2p_logger_debug("yamux", "yamux_decode: received something for yamux stream %d.\n", f.streamid);
// we're handling a stream, not something at the yamux protocol level // we're handling a stream, not something at the yamux protocol level
for (size_t i = 0; i < yamux_session->cap_streams; ++i) for (size_t i = 0; i < yamux_session->cap_streams; ++i)
{ {
@ -278,7 +279,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
if (s->id == f.streamid) // we have a match between the stored stream and the current stream if (s->id == f.streamid) // we have a match between the stored stream and the current stream
{ {
libp2p_logger_debug("yamux", "We found our stream id.\n"); libp2p_logger_debug("yamux", "We found our stream id of %d.\n", f.streamid);
if (f.flags & yamux_frame_rst) if (f.flags & yamux_frame_rst)
{ {
libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n"); libp2p_logger_debug("yamux", "They are asking that this stream be reset.\n");
@ -316,11 +317,12 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
return -EPROTO; return -EPROTO;
} }
libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); libp2p_logger_debug("yamux", "Processing the data after the frame, which is %d bytes.\n", incoming_size - frame_size);
ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size);
yamux_pull_message_from_frame(incoming, incoming_size, return_message); libp2p_logger_debug("yamux", "decode: yamux_stream_process returned %d.\n", (int)re);
//yamux_pull_message_from_frame(incoming, incoming_size, return_message);
return (re < 0) ? re : (re + incoming_size); return (re < 0) ? re : (re + incoming_size);
} } // stream id matches
} }
// This stream is not in my list of streams. // This stream is not in my list of streams.
@ -332,16 +334,18 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
// if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server)
if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) {
libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n"); // JMJ just debugging stream 2 for now
if (f.streamid == 2) {
libp2p_logger_debug("yamux", "Stream id %d is a new stream. Creating it...\n", f.streamid);
struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message); struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message);
if (yamuxChannelStream == NULL) { if (yamuxChannelStream == NULL) {
libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux stream for stream id %d.\n", f.streamid);
return -EPROTO; return -EPROTO;
} }
struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context; struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)yamuxChannelStream->stream_context;
if (yamux_session->new_stream_fn) { if (yamux_session->new_stream_fn) {
libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn for stream %d.\n", f.streamid);
yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message); yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message);
} }
// handle window update (if there is one) // handle window update (if there is one)
@ -362,11 +366,15 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
// TODO: Start negotiations of multistream // TODO: Start negotiations of multistream
struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0);
if (multistream != NULL) { if (multistream != NULL) {
libp2p_logger_debug("yamux", "Successfully negotiated multistream on stream %d.\n", f.streamid); libp2p_logger_debug("yamux", "Successfully sent the multistream id on stream %d.\n", f.streamid);
channelContext->child_stream = multistream; channelContext->child_stream = multistream;
} else { } else {
libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid); libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid);
} }
} else {
libp2p_logger_error("yamux", "Temporarily not doing anthing for streams other than stream 2.\n");
return 0;
}
} else { } else {
libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)"));

View file

@ -10,12 +10,14 @@
#include "libp2p/yamux/frame.h" #include "libp2p/yamux/frame.h"
#include "libp2p/yamux/stream.h" #include "libp2p/yamux/stream.h"
#include "libp2p/yamux/yamux.h" #include "libp2p/yamux/yamux.h"
#include "libp2p/utils/logger.h"
#define MIN(x,y) (y^((x^y)&-(x<y))) #define MIN(x,y) (y^((x^y)&-(x<y)))
#define MAX(x,y) (x^((x^y)&-(x<y))) #define MAX(x,y) (x^((x^y)&-(x<y)))
// forward declarations // forward declarations
struct YamuxContext* libp2p_yamux_get_context(void* context); struct YamuxContext* libp2p_yamux_get_context(void* context);
struct YamuxChannelContext* libp2p_yamux_get_channel_context(void* context);
/*** /***
* Create a new stream * Create a new stream
@ -364,7 +366,7 @@ struct yamux_stream* yamux_stream_new() {
} }
/*** /***
* process stream * A frame came in. This looks at the data after the frame and does the right thing.
* @param stream the stream * @param stream the stream
* @param frame the frame * @param frame the frame
* @param incoming the stream bytes (after the frame) * @param incoming the stream bytes (after the frame)
@ -379,6 +381,7 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr
{ {
case yamux_frame_window_update: case yamux_frame_window_update:
{ {
libp2p_logger_debug("yamux", "stream_process: We received a window update.\n");
uint64_t nws = (uint64_t) ( (int64_t)stream->window_size + (int64_t)(int32_t)f.length ); uint64_t nws = (uint64_t) ( (int64_t)stream->window_size + (int64_t)(int32_t)f.length );
nws &= 0xFFFFFFFFLL; nws &= 0xFFFFFFFFLL;
stream->window_size = (uint32_t)nws; stream->window_size = (uint32_t)nws;
@ -386,11 +389,32 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr
//no break //no break
case yamux_frame_data: case yamux_frame_data:
{ {
if (incoming_size != (ssize_t)f.length) if (incoming_size != (ssize_t)f.length) {
if (f.type == yamux_frame_data) {
libp2p_logger_debug("yamux", "stream_process: They said we should look for frame data, but sizes don't match. They said %d and we see %d.\n", f.length, incoming_size);
return -1; return -1;
}
}
if (incoming_size == 0)
return 0;
// the old way
/*
if (stream->read_fn) if (stream->read_fn)
stream->read_fn(stream, f.length, (void*)incoming); stream->read_fn(stream, f.length, (void*)incoming);
*/
// the new way
struct StreamMessage stream_message;
stream_message.data_size = incoming_size;
stream_message.data = (uint8_t*)incoming;
libp2p_logger_debug("yamux", "Calling handle_message for stream type %d with message of %d bytes. [%s]\n", stream->stream->stream_type, stream_message.data_size, stream_message.data);
struct YamuxChannelContext* channelContext = libp2p_yamux_get_channel_context(stream->stream->stream_context);
if (channelContext == NULL) {
libp2p_logger_error("yamux", "Unable to get channel context for stream %d.\n", frame->streamid);
return -EPROTO;
}
channelContext->child_stream->handle_message(&stream_message, channelContext->child_stream, NULL);
return incoming_size; return incoming_size;
} }

View file

@ -68,56 +68,6 @@ int yamux_can_handle(const struct StreamMessage* msg) {
return 0; return 0;
} }
/**
* the yamux stream received some bytes. Process them
* @param stream the stream that the data came in on
* @param msg the message
* @param incoming the stream buffer
*/
/*
void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
struct Libp2pVector* handlers = stream->userdata;
int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers);
if (retVal == -1) {
// TODO handle error condition
libp2p_logger_error("yamux", "Marshalling returned error.\n");
} else if (retVal > 0) {
// TODO handle everything went okay
libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n");
} else {
// TODO we've been told we shouldn't do anything anymore
libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n");
}
return;
}
*/
/***
* Check to see if the reply is the yamux protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_receive_protocol(struct YamuxContext* context) {
char* protocol = "/yamux/1.0.0\n";
struct StreamMessage* results = NULL;
int retVal = 0;
if (!context->stream->parent_stream->read(context->stream->parent_stream->stream_context, &results, 30)) {
libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n");
goto exit;
}
// the first byte is the size, so skip it
char* ptr = strstr((char*)&results->data[1], protocol);
if (ptr == NULL || ptr - (char*)results->data > 1) {
goto exit;
}
retVal = 1;
exit:
libp2p_stream_message_free(results);
return retVal;
}
/*** /***
* The remote is attempting to negotiate yamux * The remote is attempting to negotiate yamux
* @param msg the incoming message * @param msg the incoming message
@ -231,6 +181,32 @@ int libp2p_yamux_close(struct Stream* stream) {
return 1; return 1;
} }
/***
* Determine if the incoming is a data frame, but we need more data
* @param incoming the incoming message
* @returns > 0 if we need more data, 0 if not
*/
int yamux_more_to_read(struct StreamMessage* incoming) {
if (incoming == NULL)
return 0;
if (incoming->data_size < 12) {
return 0;
}
// get frame
struct yamux_frame* original_frame = (struct yamux_frame*)incoming->data;
struct yamux_frame* copy = (struct yamux_frame*) malloc(sizeof(struct yamux_frame));
memcpy(copy, original_frame, sizeof(struct yamux_frame));
decode_frame(copy);
if (copy->type == yamux_frame_data) {
libp2p_logger_debug("yamux", "Checking frame sizes. It says we should have %d, and I see %d.\n", copy->length, incoming->data_size - sizeof(struct yamux_frame));
int retVal = copy->length - (incoming->data_size - sizeof(struct yamux_frame));
free(copy);
return retVal;
}
free(copy);
return 0;
}
/** /**
* Read from the network, expecting a yamux frame. * Read from the network, expecting a yamux frame.
* NOTE: This will also dispatch the frame to the correct protocol * NOTE: This will also dispatch the frame to the correct protocol
@ -301,14 +277,31 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
libp2p_logger_error("yamux", "yamux_decode returned error.\n"); libp2p_logger_error("yamux", "yamux_decode returned error.\n");
} else if (ctx != NULL) { } else if (ctx != NULL) {
// this is the normal situation (not dead code). // this is the normal situation (not dead code).
libp2p_logger_debug("yamux", "read: It looks like we're trying to negotiate a new protocol or received a yamux frame.\n");
struct StreamMessage* incoming = NULL; struct StreamMessage* incoming = NULL;
// we need a lock
if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) { if (parent_stream->read(parent_stream->stream_context, &incoming, yamux_default_timeout)) {
libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size); libp2p_logger_debug("yamux", "read: successfully read %d bytes from network.\n", incoming->data_size);
// This could be a data frame with the actual data coming later. Yuck.
// JMJ in the case of an incomplete buffer, the next read should be the data. This must
// be true, as the next data does not have a frame. We should only read the bytes we need.
int moreToRead = yamux_more_to_read(incoming);
if (moreToRead > 0) {
uint8_t buffer[moreToRead];
if (parent_stream->read_raw(parent_stream->stream_context, buffer, moreToRead, timeout_secs) == moreToRead) {
// we have the bytes we need
uint8_t* new_buffer = (uint8_t*) malloc(incoming->data_size + moreToRead);
memcpy(new_buffer, incoming->data, incoming->data_size);
memcpy(&new_buffer[incoming->data_size], buffer, moreToRead);
incoming->data_size += moreToRead;
free(incoming->data);
incoming->data = new_buffer;
} else {
// we didn't get the bytes we needed
return 0;
}
}
// parse the frame. This is where the work happens. // parse the frame. This is where the work happens.
// JMJ: Maybe this should come back with a message to be processed further, along with some stream number to if (yamux_decode(ctx, incoming->data, incoming->data_size, message) >= 0) {
// know who to dispatch it to. Or perhaps it should have all the information to handle it internally (better option)
if (yamux_decode(ctx, incoming->data, incoming->data_size, message) == 0) {
libp2p_stream_message_free(incoming); libp2p_stream_message_free(incoming);
// The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled // The message may not have anything in it. If so, return 0, as if nothing was done. Everything has been handled
if (*message != NULL && (*message)->data_size == 0) { if (*message != NULL && (*message)->data_size == 0) {
@ -320,6 +313,8 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
} }
libp2p_logger_error("yamux", "yamux_decode returned error.\n"); libp2p_logger_error("yamux", "yamux_decode returned error.\n");
libp2p_stream_message_free(incoming); libp2p_stream_message_free(incoming);
} else {
// read failed
} }
} }
libp2p_logger_error("yamux", "Unable to do network read.\n"); libp2p_logger_error("yamux", "Unable to do network read.\n");