c-libp2p/yamux/yamux.c

324 lines
10 KiB
C

#include <string.h>
#include <unistd.h>
#include "varint.h"
#include "libp2p/yamux/session.h"
#include "libp2p/yamux/yamux.h"
#include "libp2p/net/protocol.h"
#include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
#include "libp2p/utils/logger.h"
/**
* Determines if this protocol can handle the incoming message
* @param incoming the incoming data
* @param incoming_size the size of the incoming data buffer
* @returns true(1) if it can handle this message, false(0) if not
*/
int yamux_can_handle(const struct StreamMessage* msg) {
char *protocol = "/yamux/1.0.0\n";
int protocol_size = strlen(protocol);
// is there a varint in front?
size_t num_bytes = 0;
if (msg->data[0] != protocol[0] && msg->data[1] != protocol[1]) {
varint_decode(msg->data, msg->data_size, &num_bytes);
}
if (msg->data_size >= protocol_size - num_bytes) {
if (strncmp(protocol, (char*) &msg->data[num_bytes], protocol_size) == 0)
return 1;
}
return 0;
}
/**
* the yamux stream received some bytes. Process them
* @param stream the stream that the data came in on
* @param msg the message
* @param incoming the stream buffer
*/
void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
struct Libp2pVector* handlers = stream->userdata;
int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers);
if (retVal == -1) {
// TODO handle error condition
libp2p_logger_error("yamux", "Marshalling returned error.\n");
} else if (retVal > 0) {
// TODO handle everything went okay
libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n");
} else {
// TODO we've been told we shouldn't do anything anymore
libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n");
}
return;
}
/***
* Send the yamux protocol out the default stream
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_send_protocol(struct SessionContext* context) {
char* protocol = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
if (!context->default_stream->write(context, &outgoing))
return 0;
return 1;
}
/***
* Check to see if the reply is the yamux protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_receive_protocol(struct SessionContext* context) {
char* protocol = "/yamux/1.0.0\n";
struct StreamMessage* results = NULL;
int retVal = 0;
if (!context->default_stream->read(context, &results, 30)) {
libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n");
goto exit;
}
// the first byte is the size, so skip it
char* ptr = strstr((char*)&results->data[1], protocol);
if (ptr == NULL || ptr - (char*)results->data > 1) {
goto exit;
}
retVal = 1;
exit:
libp2p_stream_message_free(results);
return retVal;
}
/***
* Handles the message
* @param msg the incoming message
* @param incoming_size the size of the incoming data buffer
* @param session_context the information about the incoming connection
* @param protocol_context the protocol-dependent context
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/
int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) {
// they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream
struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context);
uint8_t* buf = (uint8_t*) malloc(msg->data_size);
if (buf == NULL)
return -1;
memcpy(buf, msg->data, msg->data_size);
for(;;) {
int retVal = yamux_decode(yamux, msg->data, msg->data_size);
free(buf);
buf = NULL;
if (!retVal)
break;
else { // try to read more from this stream
// TODO need more information as to what this loop should do
}
}
/*
struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context;
uint8_t* results = NULL;
size_t bytes_read = 0;
int numRetries = 0;
int retVal = 0;
int max_retries = 100; // try for 5 minutes
for(;;) {
// try to read for 5 seconds
if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) {
// we read something from the network. Process it.
// NOTE: If it is a multistream protocol that we are receiving, ignore it.
if (yamux_can_handle(results, bytes_read))
continue;
numRetries = 0;
retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers);
if (results != NULL)
free(results);
// exit the loop on error (or if they ask us to no longer loop by returning 0)
if (retVal <= 0)
break;
} else {
// we were unable to read from the network.
// if it timed out, we should try again (if we're not out of retries)
if (numRetries >= max_retries)
break;
numRetries++;
}
}
*/
return 0;
}
/**
* Shutting down. Clean up any memory allocations
* @param protocol_context the context
* @returns true(1)
*/
int yamux_shutdown(void* protocol_context) {
return 0;
}
struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) {
struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new();
if (handler != NULL) {
handler->context = handlers;
handler->CanHandle = yamux_can_handle;
handler->HandleMessage = yamux_handle_message;
handler->Shutdown = yamux_shutdown;
}
return handler;
}
int libp2p_yamux_close(void* stream_context) {
//TODO: Implement
return 0;
}
/**
* Read from the network, expecting a yamux frame.
* NOTE: This will also dispatch the frame to the correct protocol
* @param stream_context the YamuxContext
* @param message the resultant message
* @param timeout_secs when to give up
* @returns true(1) on success, false(0) on failure
*/
int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int timeout_secs) {
struct YamuxContext* ctx = (struct YamuxContext*)stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream;
struct StreamMessage* incoming;
if (!parent_stream->read(parent_stream->stream_context, &incoming, timeout_secs))
return 0;
// we've got bytes from the network. process them as a yamux frame
return yamux_decode(ctx->session, incoming->data, incoming->data_size);
}
int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) {
//TODO: Implement
return 0;
}
/***
* Check to see if there is anything waiting on the network.
* @param stream_context the YamuxContext
* @returns the number of bytes waiting, or -1 on error
*/
int libp2p_yamux_peek(void* stream_context) {
if (stream_context == NULL)
return -1;
struct YamuxContext* ctx = (struct YamuxContext*)stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream;
if (parent_stream == NULL)
return -1;
return parent_stream->peek(parent_stream->stream_context);
}
int libp2p_yamux_read_raw(void* stream_context, uint8_t* buffer, int buffer_size, int timeout_secs) {
//TODO: Implement
return -1;
}
struct YamuxContext* libp2p_yamux_context_new() {
struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext));
if (ctx != NULL) {
ctx->stream = NULL;
}
return ctx;
}
void libp2p_yamux_stream_free(struct Stream* yamux_stream) {
//TODO: Implement
}
int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
struct StreamMessage* results = NULL;
int retVal = 0;
int haveTheirs = 0;
int peek_result = 0;
// see if they're trying to send something first
peek_result = libp2p_yamux_peek(ctx);
if (peek_result > 0) {
libp2p_logger_debug("yamux", "There is %d bytes waiting for us. Perhaps it is the yamux header we're expecting.\n", peek_result);
// get the protocol
ctx->stream->parent_stream->read(ctx->stream->parent_stream, &results, yamux_default_timeout);
if (results == NULL || results->data_size == 0) {
libp2p_logger_error("yamux", "We thought we had a yamux header, but we got nothing.\n");
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data);
goto exit;
}
haveTheirs = 1;
}
// send the protocol id
outgoing.data = (uint8_t*)protocolID;
outgoing.data_size = strlen(protocolID);
if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) {
libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n");
goto exit;
}
// wait for them to send the protocol id back
if (!haveTheirs) {
// expect the same back
ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, yamux_default_timeout);
if (results == NULL || results->data_size == 0) {
libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we got nothing.\n");
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data);
goto exit;
}
}
retVal = 1;
exit:
if (results != NULL)
free(results);
return retVal;
}
/***
* Negotiate the Yamux protocol
* @param parent_stream the parent stream
* @returns a Stream initialized and ready for yamux
*/
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream));
if (out != NULL) {
out->parent_stream = parent_stream;
out->close = libp2p_yamux_close;
out->read = libp2p_yamux_read;
out->write = libp2p_yamux_write;
out->peek = libp2p_yamux_peek;
out->read_raw = libp2p_yamux_read_raw;
out->address = parent_stream->address;
// build YamuxContext
struct YamuxContext* ctx = libp2p_yamux_context_new();
if (ctx == NULL) {
libp2p_yamux_stream_free(out);
return NULL;
}
out->stream_context = ctx;
ctx->stream = out;
// attempt to negotiate yamux protocol
if (!libp2p_yamux_negotiate(ctx)) {
libp2p_yamux_stream_free(out);
return NULL;
}
}
return out;
}