Starting to wire identity protocol into yamux

This commit is contained in:
John Jones 2017-11-06 07:27:03 -05:00
parent a91e840770
commit d3f740b4e0
9 changed files with 95 additions and 9 deletions

View file

@ -14,6 +14,7 @@
#include "libp2p/net/multistream.h" #include "libp2p/net/multistream.h"
#include "libp2p/secio/secio.h" #include "libp2p/secio/secio.h"
#include "libp2p/yamux/yamux.h" #include "libp2p/yamux/yamux.h"
#include "libp2p/identify/identify.h"
struct TransportDialer* libp2p_conn_tcp_transport_dialer_new(); struct TransportDialer* libp2p_conn_tcp_transport_dialer_new();
@ -141,9 +142,13 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
new_stream = libp2p_yamux_stream_new(new_stream); new_stream = libp2p_yamux_stream_new(new_stream);
if (new_stream != NULL) { if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream; peer->sessionContext->default_stream = new_stream;
// identity over yamux // we have our swarm connection. Now we ask for some "channels"
// id over yamux
libp2p_yamux_stream_add(new_stream->stream_context, libp2p_identify_stream_new(new_stream));
// kademlia over yamux // kademlia over yamux
//libp2p_yamux_stream_add(new_stream->stream_context, libp2p_kademlia_stream_new(new_stream));
// circuit relay over yamux // circuit relay over yamux
//libp2p_yamux_stream_add(new_stream->stream_context, libp2p_circuit_relay_stream_new(new_stream));
return 1; return 1;
} else { } else {
libp2p_logger_error("dialer", "Unable to do yamux negotiation.\n"); libp2p_logger_error("dialer", "Unable to do yamux negotiation.\n");

View file

@ -35,12 +35,12 @@ int libp2p_identify_can_handle(const struct StreamMessage* msg) {
* @param context the context * @param context the context
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_identify_send_protocol(struct SessionContext *context) { int libp2p_identify_send_protocol(struct IdentifyContext *context) {
char *protocol = "/ipfs/id/1.0.0\n"; char *protocol = "/ipfs/id/1.0.0\n";
struct StreamMessage msg; struct StreamMessage msg;
msg.data = (uint8_t*) protocol; msg.data = (uint8_t*) protocol;
msg.data_size = strlen(protocol); msg.data_size = strlen(protocol);
if (!context->default_stream->write(context, &msg)) { if (!context->parent_stream->write(context, &msg)) {
libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n"); libp2p_logger_error("identify", "send_protocol: Unable to send identify protocol header.\n");
return 0; return 0;
} }
@ -53,10 +53,10 @@ int libp2p_identify_send_protocol(struct SessionContext *context) {
* @param context the SessionContext * @param context the SessionContext
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_identify_receive_protocol(struct SessionContext* context) { int libp2p_identify_receive_protocol(struct IdentifyContext* context) {
const char *protocol = "/ipfs/id/1.0.0\n"; const char *protocol = "/ipfs/id/1.0.0\n";
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
if (!context->default_stream->read(context, &results, 30)) { if (!context->parent_stream->read(context, &results, 30)) {
libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n"); libp2p_logger_error("identify", "receive_protocol: Unable to read results.\n");
return 0; return 0;
} }
@ -92,3 +92,35 @@ struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp
} }
return handler; return handler;
} }
/***
* Create a new stream that negotiates the identify protocol
*
* NOTE: This will be sent by our side (us asking them).
* Incoming "Identify" requests should be handled by the
* external protocol handler, not this function.
*
* @param parent_stream the parent stream
* @returns a new Stream that can talk "identify"
*/
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream) {
if (parent_stream == NULL)
return NULL;
struct Stream* out = libp2p_stream_new();
if (out != NULL) {
struct IdentifyContext* ctx = (struct IdentifyContext*) malloc(sizeof(struct IdentifyContext));
if (ctx == NULL) {
libp2p_stream_free(out);
return NULL;
}
ctx->parent_stream = parent_stream;
out->stream_context = ctx;
if (!libp2p_identify_send_protocol(ctx) || !libp2p_identify_receive_protocol(ctx)) {
libp2p_stream_free(out);
free(ctx);
return NULL;
}
}
return out;
}

View file

@ -21,9 +21,26 @@ typedef struct {
char *XXX_unrecognized; char *XXX_unrecognized;
} Identify; } Identify;
struct IdentifyContext {
struct Stream* parent_stream;
};
int libp2p_identify_can_handle(const struct StreamMessage* msg); int libp2p_identify_can_handle(const struct StreamMessage* msg);
int libp2p_identify_send_protocol(struct SessionContext *context); int libp2p_identify_send_protocol(struct IdentifyContext *context);
int libp2p_identify_receive_protocol(struct SessionContext* context); int libp2p_identify_receive_protocol(struct IdentifyContext* context);
int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context); int libp2p_identify_handle_message(const struct StreamMessage* msg, struct SessionContext* context, void* protocol_context);
int libp2p_identify_shutdown(void* protocol_context); int libp2p_identify_shutdown(void* protocol_context);
struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers); struct Libp2pProtocolHandler* libp2p_identify_build_protocol_handler(struct Libp2pVector* handlers);
/***
* Create a new stream that negotiates the identify protocol
*
* NOTE: This will be sent by our side (us asking them).
* Incoming "Identify" requests should be handled by the
* external protocol handler, not this function.
*
* @param parent_stream the parent stream
* @returns a new Stream that can talk "identify"
*/
struct Stream* libp2p_identify_stream_new(struct Stream* parent_stream);

View file

@ -46,6 +46,7 @@ struct Stream {
struct MultiAddress* address; // helps identify who is on the other end struct MultiAddress* address; // helps identify who is on the other end
pthread_mutex_t* socket_mutex; // only 1 transmission at a time pthread_mutex_t* socket_mutex; // only 1 transmission at a time
struct Stream* parent_stream; // what stream wraps this stream struct Stream* parent_stream; // what stream wraps this stream
int channel; // the channel this stream uses, for multiplexing protocols such as yamux
/** /**
* A generic place to store implementation-specific context items * A generic place to store implementation-specific context items
*/ */

View file

@ -30,8 +30,9 @@ int libp2p_utils_vector_total(struct Libp2pVector* in);
* Add a value to the vector * Add a value to the vector
* @param vector the vector to add the item to. * @param vector the vector to add the item to.
* @param value the value to be added NOTE: this only saves the pointer, it does not copy. * @param value the value to be added NOTE: this only saves the pointer, it does not copy.
* @returns the index of the item in the vector
*/ */
void libp2p_utils_vector_add(struct Libp2pVector *vector, const void * value); int libp2p_utils_vector_add(struct Libp2pVector *vector, const void * value);
void libp2p_utils_vector_set(struct Libp2pVector *vector, int pos, void *value); void libp2p_utils_vector_set(struct Libp2pVector *vector, int pos, void *value);
const void *libp2p_utils_vector_get(struct Libp2pVector *vector, int); const void *libp2p_utils_vector_get(struct Libp2pVector *vector, int);
void libp2p_utils_vector_delete(struct Libp2pVector *vector, int pos); void libp2p_utils_vector_delete(struct Libp2pVector *vector, int pos);

View file

@ -15,6 +15,7 @@ static const int yamux_default_timeout = 10;
struct YamuxContext { struct YamuxContext {
struct Stream* stream; struct Stream* stream;
struct yamux_session* session; struct yamux_session* session;
struct Libp2pVector* channels;
}; };
/** /**
@ -38,3 +39,11 @@ int yamux_send_protocol(struct SessionContext* context);
int yamux_receive_protocol(struct SessionContext* context); int yamux_receive_protocol(struct SessionContext* context);
struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream); struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream);
/****
* Add a stream "channel" to the yamux handler
* @param ctx the context
* @param stream the stream to add
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream);

View file

@ -15,6 +15,7 @@ struct Stream* libp2p_stream_new() {
stream->socket_mutex = NULL; stream->socket_mutex = NULL;
stream->stream_context = NULL; stream->stream_context = NULL;
stream->write = NULL; stream->write = NULL;
stream->channel = 0;
} }
return stream; return stream;
} }

View file

@ -35,11 +35,12 @@ static void libp2p_utils_vector_resize(struct Libp2pVector *v, int capacity)
* @param v the vector to add to * @param v the vector to add to
* @param item the item to add * @param item the item to add
*/ */
void libp2p_utils_vector_add(struct Libp2pVector *v, const void *item) int libp2p_utils_vector_add(struct Libp2pVector *v, const void *item)
{ {
if (v->capacity == v->total) if (v->capacity == v->total)
libp2p_utils_vector_resize(v, v->capacity * 2); libp2p_utils_vector_resize(v, v->capacity * 2);
v->items[v->total++] = item; v->items[v->total++] = item;
return v->total;
} }
void libp2p_utils_vector_set(struct Libp2pVector *v, int index, void *item) void libp2p_utils_vector_set(struct Libp2pVector *v, int index, void *item)

View file

@ -228,6 +228,7 @@ struct YamuxContext* libp2p_yamux_context_new() {
struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext)); struct YamuxContext* ctx = (struct YamuxContext*) malloc(sizeof(struct YamuxContext));
if (ctx != NULL) { if (ctx != NULL) {
ctx->stream = NULL; ctx->stream = NULL;
ctx->channels = libp2p_utils_vector_new(1);
} }
return ctx; return ctx;
} }
@ -283,6 +284,11 @@ int libp2p_yamux_negotiate(struct YamuxContext* ctx) {
} }
} }
//TODO: okay, we're almost done. Let incoming stuff be marshaled to the correct handler.
// this should be somewhat automatic, as they ask, and we negotiate
//TODO: we should open some streams with them (multistream, id, kademlia, relay)
// this is not automatic, as we need to start the negotiation process
retVal = 1; retVal = 1;
exit: exit:
if (results != NULL) if (results != NULL)
@ -321,3 +327,16 @@ struct Stream* libp2p_yamux_stream_new(struct Stream* parent_stream) {
} }
return out; return out;
} }
/****
* Add a stream "channel" to the yamux handler
* @param ctx the context
* @param stream the stream to add
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_yamux_stream_add(struct YamuxContext* ctx, struct Stream* stream) {
int itemNo = libp2p_utils_vector_add(ctx->channels, stream);
stream->channel = itemNo;
return 1;
}