More yamux fixes

This commit is contained in:
John Jones 2017-11-28 23:41:46 -05:00
parent 0d4d475c2c
commit 8551121bf8
5 changed files with 100 additions and 20 deletions

View file

@ -121,7 +121,6 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
// we're connected. start listening for responses // we're connected. start listening for responses
libp2p_swarm_add_peer(dialer->swarm, peer); libp2p_swarm_add_peer(dialer->swarm, peer);
// wait for multistream // wait for multistream
int counter = 0;
if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) { if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) {
return 0; return 0;
} }
@ -129,21 +128,25 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
if (new_stream != NULL) { if (new_stream != NULL) {
// secio over multistream // secio over multistream
new_stream = libp2p_secio_stream_new(new_stream, dialer->peerstore, dialer->private_key); new_stream = libp2p_secio_stream_new(new_stream, dialer->peerstore, dialer->private_key);
counter = 0; if (new_stream != NULL) {
if (!libp2p_secio_ready(peer->sessionContext, 10) ) { if (!libp2p_secio_ready(peer->sessionContext, 10) ) {
return 0; return 0;
} }
counter = 0; libp2p_logger_debug("dialer", "We successfully negotiated secio.\n");
if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream;
// multistream over secio // multistream over secio
new_stream = libp2p_net_multistream_stream_new(new_stream, 0); // Don't bother, as the other side requests multistream
//new_stream = libp2p_net_multistream_stream_new(new_stream, 0);
if (new_stream != NULL) { if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream; if (!libp2p_net_multistream_ready(peer->sessionContext, 5))
return 0;
libp2p_logger_debug("dialer", "We successfully negotiated multistream over secio.\n");
// yamux over multistream // yamux over multistream
new_stream = libp2p_yamux_stream_new(new_stream, 0, NULL); new_stream = libp2p_yamux_stream_new(new_stream, 0, NULL);
if (new_stream != NULL) { if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream; if (!libp2p_yamux_stream_ready(peer->sessionContext, 5))
return 0;
libp2p_logger_debug("dialer", "We successfully negotiated yamux.\n");
//peer->sessionContext->default_stream = new_stream;
// we have our swarm connection. Now we ask for some "channels" // we have our swarm connection. Now we ask for some "channels"
// id over yamux // id over yamux
//libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream)); //libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream));

View file

@ -116,3 +116,11 @@ void libp2p_yamux_channel_free(struct YamuxChannelContext* ctx);
* @returns a new StreamMessage that has a yamux_frame * @returns a new StreamMessage that has a yamux_frame
*/ */
struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming); struct StreamMessage* libp2p_yamux_prepare_to_send(struct StreamMessage* incoming);
/***
* Wait for yamux stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs);

View file

@ -534,6 +534,8 @@ int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struc
// the incoming stream is not a multistream. They are attempting to upgrade to multistream // the incoming stream is not a multistream. They are attempting to upgrade to multistream
struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1); struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1);
if (new_stream != NULL) { if (new_stream != NULL) {
struct MultistreamContext* ctx = (struct MultistreamContext*)stream->stream_context;
ctx->status = multistream_status_ack;
// upgrade // upgrade
return stream->handle_upgrade(stream, new_stream); return stream->handle_upgrade(stream, new_stream);
} }

View file

@ -36,7 +36,7 @@ int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* p
if (!stream->read(stream->stream_context, &results, 1)) { if (!stream->read(stream->stream_context, &results, 1)) {
libp2p_logger_debug("swarm", "Releasing read lock\n"); libp2p_logger_debug("swarm", "Releasing read lock\n");
pthread_mutex_unlock(stream->socket_mutex); pthread_mutex_unlock(stream->socket_mutex);
libp2p_logger_error("swarm", "Unable to read from network. Exiting.\n"); libp2p_logger_error("swarm", "Unable to read from network (could just be a timeout). Exiting the read.\n");
return retVal; return retVal;
} }
libp2p_logger_debug("swarm", "Releasing read lock.\n"); libp2p_logger_debug("swarm", "Releasing read lock.\n");

View file

@ -143,13 +143,29 @@ int yamux_receive_protocol(struct YamuxContext* context) {
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success * @returns 0 if the caller should not continue looping, <0 on error, >0 on success
*/ */
int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { int yamux_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
struct Stream* new_stream = libp2p_yamux_stream_new(stream, 1, protocol_context); if (stream->stream_type == STREAM_TYPE_YAMUX) {
if (new_stream == NULL) struct YamuxContext* ctx = (struct YamuxContext*) stream->stream_context;
if (ctx->state == yamux_stream_est) {
// TODO: This is probably a frame. we need to handle this.
return -1; return -1;
// upgrade } else {
stream->handle_upgrade(stream, new_stream); //TODO: check to make sure they sent the yamux protocol id
// we sent a protocol ID, and this is them responding
ctx->state = yamux_stream_est;
}
return 1; return 1;
} }
// the incoming stream is not yamux. They are attempting to upgrade to yamux
struct YamuxProtocolContext* yamuxProtocolContext = (struct YamuxProtocolContext*)protocol_context;
struct Stream* new_stream = libp2p_yamux_stream_new(stream, 0, yamuxProtocolContext->protocol_handlers);
if (new_stream != NULL) {
struct YamuxContext* ctx = (struct YamuxContext*) new_stream->stream_context;
ctx->state = yamux_stream_est;
// upgrade
return stream->handle_upgrade(stream, new_stream);
}
return -1;
}
/** /**
* Shutting down. Clean up any memory allocations * Shutting down. Clean up any memory allocations
@ -365,6 +381,11 @@ int libp2p_yamux_write(void* stream_context, struct StreamMessage* message) {
if (ctx == NULL && channel == NULL) if (ctx == NULL && channel == NULL)
return 0; return 0;
if (ctx->state != yamux_stream_est) {
struct Stream* parent_stream = ctx->stream->parent_stream;
return parent_stream->write(parent_stream->stream_context, message);
}
struct StreamMessage* outgoing_message = libp2p_yamux_prepare_to_send(message); struct StreamMessage* outgoing_message = libp2p_yamux_prepare_to_send(message);
// now convert fame for network use // now convert fame for network use
struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data; struct yamux_frame* frame = (struct yamux_frame*)outgoing_message->data;
@ -472,6 +493,20 @@ struct YamuxContext* libp2p_yamux_context_new(struct Stream* stream) {
return ctx; return ctx;
} }
/***
* Write the protocol id for yamux to the stream
* @param stream the stream to write to
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_send_protocol(struct Stream* stream) {
const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data_size = strlen(protocolID);
outgoing.data = (uint8_t*)protocolID;
outgoing.error_number = 0;
return stream->write(stream->stream_context, &outgoing);
}
int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) { int libp2p_yamux_negotiate(struct YamuxContext* ctx, int am_server) {
const char* protocolID = "/yamux/1.0.0\n"; const char* protocolID = "/yamux/1.0.0\n";
struct StreamMessage outgoing; struct StreamMessage outgoing;
@ -599,13 +634,14 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream, int am_serv
ctx->stream = out; ctx->stream = out;
ctx->am_server = am_server; ctx->am_server = am_server;
ctx->protocol_handlers = protocol_handlers; ctx->protocol_handlers = protocol_handlers;
ctx->state = yamux_stream_inited;
// tell protocol below that we want to upgrade
parent_stream->handle_upgrade(parent_stream, out);
// attempt to negotiate yamux protocol // attempt to negotiate yamux protocol
if (!libp2p_yamux_negotiate(ctx, am_server)) { if (!libp2p_yamux_send_protocol(parent_stream)) {
libp2p_yamux_stream_free(out); libp2p_yamux_stream_free(out);
return NULL; return NULL;
} }
// tell protocol below that we want to upgrade
parent_stream->handle_upgrade(parent_stream, out);
} }
return out; return out;
} }
@ -814,3 +850,34 @@ int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
} }
return 1; return 1;
} }
/***
* Wait for yamux stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_yamux_stream_ready(struct SessionContext* session_context, int timeout_secs) {
int counter = 0;
while (session_context != NULL
&& session_context->default_stream != NULL
&& session_context->default_stream->stream_type != STREAM_TYPE_YAMUX
&& counter <= timeout_secs) {
counter++;
sleep(1);
}
if (session_context != NULL
&& session_context->default_stream != NULL
&& session_context->default_stream->stream_type == STREAM_TYPE_YAMUX
&& counter < 5) {
struct YamuxContext* ctx = (struct YamuxContext*)session_context->default_stream->stream_context;
while (ctx->state != yamux_stream_est && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (ctx->state == yamux_stream_est)
return 1;
}
return 0;
}