#include #include #include "varint.h" #include "libp2p/yamux/session.h" #include "libp2p/net/protocol.h" #include "libp2p/net/stream.h" #include "libp2p/conn/session.h" #include "libp2p/utils/logger.h" /** * Determines if this protocol can handle the incoming message * @param incoming the incoming data * @param incoming_size the size of the incoming data buffer * @returns true(1) if it can handle this message, false(0) if not */ int yamux_can_handle(const struct StreamMessage* msg) { char *protocol = "/yamux/1.0.0\n"; int protocol_size = strlen(protocol); // is there a varint in front? size_t num_bytes = 0; if (msg->data[0] != protocol[0] && msg->data[1] != protocol[1]) { varint_decode(msg->data, msg->data_size, &num_bytes); } if (msg->data_size >= protocol_size - num_bytes) { if (strncmp(protocol, (char*) &msg->data[num_bytes], protocol_size) == 0) return 1; } return 0; } /** * the yamux stream received some bytes. Process them * @param stream the stream that the data came in on * @param msg the message * @param incoming the stream buffer */ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) { struct Libp2pVector* handlers = stream->userdata; int retVal = libp2p_protocol_marshal(msg, stream->session->session_context, handlers); if (retVal == -1) { // TODO handle error condition libp2p_logger_error("yamux", "Marshalling returned error.\n"); } else if (retVal > 0) { // TODO handle everything went okay libp2p_logger_debug("yamux", "Marshalling was successful. We should continue processing.\n"); } else { // TODO we've been told we shouldn't do anything anymore libp2p_logger_debug("yamux", "Marshalling was successful. We should stop processing.\n"); } return; } /*** * Send the yamux protocol out the default stream * NOTE: if we initiate the connection, we should expect the same back * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ int yamux_send_protocol(struct SessionContext* context) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage outgoing; outgoing.data = (uint8_t*)protocol; outgoing.data_size = strlen(protocol); if (!context->default_stream->write(context, &outgoing)) return 0; return 1; } /*** * Check to see if the reply is the yamux protocol header we expect * NOTE: if we initiate the connection, we should expect the same back * @param context the SessionContext * @returns true(1) on success, false(0) otherwise */ int yamux_receive_protocol(struct SessionContext* context) { char* protocol = "/yamux/1.0.0\n"; struct StreamMessage* results = NULL; int retVal = 0; if (!context->default_stream->read(context, &results, 30)) { libp2p_logger_error("yamux", "receive_protocol: Unable to read results.\n"); goto exit; } // the first byte is the size, so skip it char* ptr = strstr((char*)&results->data[1], protocol); if (ptr == NULL || ptr - (char*)results->data > 1) { goto exit; } retVal = 1; exit: libp2p_stream_message_free(results); return retVal; } /*** * Handles the message * @param msg the incoming message * @param incoming_size the size of the incoming data buffer * @param session_context the information about the incoming connection * @param protocol_context the protocol-dependent context * @returns 0 if the caller should not continue looping, <0 on error, >0 on success */ int yamux_handle_message(const struct StreamMessage* msg, struct SessionContext* session_context, void* protocol_context) { // they've asked to swicth to yamux. Do the switch and return 0 so that nothing else listens on this stream struct yamux_session* yamux = yamux_session_new(NULL, session_context, yamux_session_server, protocol_context); uint8_t* buf = (uint8_t*) malloc(msg->data_size); if (buf == NULL) return -1; memcpy(buf, msg->data, msg->data_size); for(;;) { int retVal = yamux_decode(yamux, msg->data, msg->data_size); free(buf); buf = NULL; if (!retVal) break; else { // try to read more from this stream // TODO need more information as to what this loop should do } } /* struct Libp2pVector* handlers = (struct Libp2pVector*)protocol_context; uint8_t* results = NULL; size_t bytes_read = 0; int numRetries = 0; int retVal = 0; int max_retries = 100; // try for 5 minutes for(;;) { // try to read for 5 seconds if (session_context->default_stream->read(session_context, &results, &bytes_read, 5)) { // we read something from the network. Process it. // NOTE: If it is a multistream protocol that we are receiving, ignore it. if (yamux_can_handle(results, bytes_read)) continue; numRetries = 0; retVal = libp2p_protocol_marshal(results, bytes_read, session_context, handlers); if (results != NULL) free(results); // exit the loop on error (or if they ask us to no longer loop by returning 0) if (retVal <= 0) break; } else { // we were unable to read from the network. // if it timed out, we should try again (if we're not out of retries) if (numRetries >= max_retries) break; numRetries++; } } */ return 0; } /** * Shutting down. Clean up any memory allocations * @param protocol_context the context * @returns true(1) */ int yamux_shutdown(void* protocol_context) { return 0; } struct Libp2pProtocolHandler* yamux_build_protocol_handler(struct Libp2pVector* handlers) { struct Libp2pProtocolHandler* handler = libp2p_protocol_handler_new(); if (handler != NULL) { handler->context = handlers; handler->CanHandle = yamux_can_handle; handler->HandleMessage = yamux_handle_message; handler->Shutdown = yamux_shutdown; } return handler; } /*** * Negotiate the Yamux protocol * @param parent_stream the parent stream * @returns a Stream initialized and ready for yamux */ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) { return NULL; }