Now correctly negotiating yamux

This commit is contained in:
John Jones 2017-11-29 07:40:52 -05:00
parent f4db6f15fb
commit f0342785d2

View file

@ -92,22 +92,6 @@ void yamux_read_stream(struct yamux_stream* stream, struct StreamMessage* msg) {
} }
*/ */
/***
* Send the yamux protocol out the default stream
* NOTE: if we initiate the connection, we should expect the same back
* @param context the SessionContext
* @returns true(1) on success, false(0) otherwise
*/
int yamux_send_protocol(struct Stream* stream) {
char* protocol = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
if (!stream->write(stream->stream_context, &outgoing))
return 0;
return 1;
}
/*** /***
* Check to see if the reply is the yamux protocol header we expect * Check to see if the reply is the yamux protocol header we expect
* NOTE: if we initiate the connection, we should expect the same back * NOTE: if we initiate the connection, we should expect the same back
@ -143,6 +127,8 @@ int yamux_receive_protocol(struct YamuxContext* context) {
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// get latest stream (multithreaded, so could be stale)
stream = libp2p_net_connection_get_session_context(stream)->default_stream;
if (stream->stream_type == STREAM_TYPE_YAMUX) { if (stream->stream_type == STREAM_TYPE_YAMUX) {
struct YamuxContext* ctx = (struct YamuxContext*) stream->stream_context; struct YamuxContext* ctx = (struct YamuxContext*) stream->stream_context;
if (ctx->state == yamux_stream_est) { if (ctx->state == yamux_stream_est) {
@ -270,9 +256,28 @@ int libp2p_yamux_read(void* stream_context, struct StreamMessage** message, int
ctx = (struct YamuxContext*)stream_context; ctx = (struct YamuxContext*)stream_context;
} }
if (ctx == NULL) {
libp2p_logger_error("yamux", "read: The incoming stream is not a yamux stream.\n");
return 0;
}
if (ctx->state != yamux_stream_est) { if (ctx->state != yamux_stream_est) {
libp2p_logger_debug("yamux", "read: Yamux still not inited, so passing to lower protocol.\n"); libp2p_logger_debug("yamux", "read: Yamux still not inited, so passing to lower protocol.\n");
return ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, timeout_secs); // perhaps this is the yamux protocol id we've been expecting
int retVal = ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, message, timeout_secs);
libp2p_logger_debug("yamux", "read: we did the lower read, and received a %d.\n", retVal);
if (retVal > 0) {
struct StreamMessage* incoming_message = *message;
libp2p_logger_debug("yamux", "read: The lower read has a message of %d bytes that says: %s.\n", incoming_message->data_size, incoming_message->data);
if (strstr((char*)incoming_message->data, "/yamux/1.0.0") != NULL) {
libp2p_logger_debug("yamux", "read: We got the protocol we've been waiting for.\n");
ctx->state = yamux_stream_est;
libp2p_stream_message_free(incoming_message);
*message = NULL;
return 0;
}
}
return retVal;
} }
struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context); struct Stream* parent_stream = libp2p_yamux_get_parent_stream(stream_context);
@ -507,6 +512,8 @@ struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) {
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_yamux_send_protocol(struct Stream* stream) { int libp2p_yamux_send_protocol(struct Stream* stream) {
// JMJ debug
libp2p_logger_debug("yamux", "Sending protocol through stream of type %d and channel %d.\n", stream->stream_type, stream->channel);
const char* protocolID = "/yamux/1.0.0\n"; const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing; struct StreamMessage outgoing;
outgoing.data_size = strlen(protocolID); outgoing.data_size = strlen(protocolID);
@ -515,70 +522,6 @@ int libp2p_yamux_send_protocol(struct Stream* stream) {
return stream->write(stream->stream_context, &outgoing); return stream->write(stream->stream_context, &outgoing);
} }
int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) {
const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
struct StreamMessage* results = NULL;
int retVal = 0;
int haveTheirs = 0;
int peek_result = 0;
// see if they're trying to send something first (only if we're the client)
if (!am_server) {
peek_result = libp2p_yamux_peek(ctx);
if (peek_result > 0) {
libp2p_logger_debug("yamux", "There is %d bytes waiting for us. Perhaps it is the yamux header we're expecting.\n", peek_result);
// get the protocol
ctx->stream->parent_stream->read(ctx->stream->parent_stream, &results, yamux_default_timeout);
if (results == NULL || results->data_size == 0) {
libp2p_logger_error("yamux", "We thought we had a yamux header, but we got nothing.\n");
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_error("yamux", "We thought we had a yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data);
goto exit;
}
libp2p_stream_message_free(results);
results = NULL;
haveTheirs = 1;
}
}
// send the protocol id
outgoing.data = (uint8_t*)protocolID;
outgoing.data_size = strlen(protocolID);
libp2p_logger_debug("yamux", "Attempting to write the yamux protocol id as %s.\n", (am_server ? "server" : "client"));
if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) {
libp2p_logger_error("yamux", "We attempted to write the yamux protocol id, but the write call failed.\n");
goto exit;
}
// wait for them to send the protocol id back
if (!am_server && !haveTheirs) {
// expect the same back
ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, yamux_default_timeout);
if (results == NULL || results->data_size == 0) {
libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we got nothing.\n");
goto exit;
}
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_error("yamux", "We tried to retrieve the yamux header, but we received %d bytes that contained %s.\n", (int)results->data_size, results->data);
goto exit;
}
}
//TODO: okay, we're almost done. Let incoming stuff be marshaled to the correct handler.
// this should be somewhat automatic, as they ask, and we negotiate
//TODO: we should open some streams with them (multistream, id, kademlia, relay)
// this is not automatic, as we need to start the negotiation process
retVal = 1;
exit:
if (results != NULL)
libp2p_stream_message_free(results);
return retVal;
}
/*** /***
* A new protocol was asked for. Give it a "channel" * A new protocol was asked for. Give it a "channel"
* @param yamux_stream the yamux stream * @param yamux_stream the yamux stream