Swarm now functioning with yamux. Still needs more debugging.

This commit is contained in:
John Jones 2017-11-28 22:44:18 -05:00
parent bea9481c56
commit 0d4d475c2c
13 changed files with 520 additions and 65 deletions

View file

@ -19,6 +19,7 @@ OBJS = \
routing/*.o \ routing/*.o \
secio/*.o \ secio/*.o \
utils/*.o \ utils/*.o \
swarm/*.o \
yamux/*.o yamux/*.o
link: link:
@ -37,6 +38,7 @@ compile:
cd record; make all; cd record; make all;
cd routing; make all; cd routing; make all;
cd secio; make all; cd secio; make all;
cd swarm; make all;
cd utils; make all; cd utils; make all;
cd yamux; make all; cd yamux; make all;
@ -60,6 +62,7 @@ clean:
cd record; make clean; cd record; make clean;
cd routing; make clean; cd routing; make clean;
cd secio; make clean; cd secio; make clean;
cd swarm; make clean;
cd utils; make clean; cd utils; make clean;
cd test; make clean; cd test; make clean;
cd yamux; make clean; cd yamux; make clean;

View file

@ -25,7 +25,7 @@ struct TransportDialer* libp2p_conn_tcp_transport_dialer_new();
* @param private_key the local private key * @param private_key the local private key
* @returns a new Dialer struct * @returns a new Dialer struct
*/ */
struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key, struct SwarmContext* swarm) {
int success = 0; int success = 0;
struct Dialer* dialer = (struct Dialer*)malloc(sizeof(struct Dialer)); struct Dialer* dialer = (struct Dialer*)malloc(sizeof(struct Dialer));
if (dialer != NULL) { if (dialer != NULL) {
@ -33,6 +33,7 @@ struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore*
dialer->private_key = rsa_private_key; dialer->private_key = rsa_private_key;
dialer->transport_dialers = NULL; dialer->transport_dialers = NULL;
dialer->fallback_dialer = libp2p_conn_tcp_transport_dialer_new(dialer->peer_id, rsa_private_key); dialer->fallback_dialer = libp2p_conn_tcp_transport_dialer_new(dialer->peer_id, rsa_private_key);
dialer->swarm = swarm;
if (peer != NULL) { if (peer != NULL) {
dialer->peer_id = malloc(peer->id_size + 1); dialer->peer_id = malloc(peer->id_size + 1);
memset(dialer->peer_id, 0, peer->id_size + 1); memset(dialer->peer_id, 0, peer->id_size + 1);
@ -104,6 +105,8 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
if (conn_stream != NULL) { if (conn_stream != NULL) {
if (peer->sessionContext == NULL) { if (peer->sessionContext == NULL) {
peer->sessionContext = libp2p_session_context_new(); peer->sessionContext = libp2p_session_context_new();
struct ConnectionContext* conn_ctx = conn_stream->stream_context;
conn_ctx->session_context = peer->sessionContext;
} }
peer->sessionContext->insecure_stream = conn_stream; peer->sessionContext->insecure_stream = conn_stream;
peer->sessionContext->default_stream = conn_stream; peer->sessionContext->default_stream = conn_stream;
@ -115,11 +118,22 @@ int libp2p_conn_dialer_join_swarm(const struct Dialer* dialer, struct Libp2pPeer
} }
if (conn_stream == NULL) if (conn_stream == NULL)
return 0; return 0;
// multistream // we're connected. start listening for responses
struct Stream* new_stream = libp2p_net_multistream_stream_new(conn_stream, 0); libp2p_swarm_add_peer(dialer->swarm, peer);
// wait for multistream
int counter = 0;
if (!libp2p_net_multistream_ready(peer->sessionContext, 5)) {
return 0;
}
struct Stream* new_stream = peer->sessionContext->default_stream;
if (new_stream != NULL) { if (new_stream != NULL) {
// secio over multistream // secio over multistream
new_stream = libp2p_secio_stream_new(new_stream, peer, dialer->peerstore, dialer->private_key); new_stream = libp2p_secio_stream_new(new_stream, dialer->peerstore, dialer->private_key);
counter = 0;
if (!libp2p_secio_ready(peer->sessionContext, 10) ) {
return 0;
}
counter = 0;
if (new_stream != NULL) { if (new_stream != NULL) {
peer->sessionContext->default_stream = new_stream; peer->sessionContext->default_stream = new_stream;
// multistream over secio // multistream over secio

View file

@ -16,6 +16,7 @@
#include "libp2p/conn/connection.h" #include "libp2p/conn/connection.h"
#include "libp2p/conn/transport_dialer.h" #include "libp2p/conn/transport_dialer.h"
#include "libp2p/peer/peer.h" #include "libp2p/peer/peer.h"
#include "libp2p/swarm/swarm.h"
struct Dialer { struct Dialer {
/** /**
@ -34,16 +35,19 @@ struct Dialer {
//TODO: See dial.go, need to implement Protector //TODO: See dial.go, need to implement Protector
struct TransportDialer* fallback_dialer; // the default dialer. NOTE: this should not be in the list of transport_dialers struct TransportDialer* fallback_dialer; // the default dialer. NOTE: this should not be in the list of transport_dialers
struct SwarmContext* swarm;
}; };
/** /**
* Create a Dialer with the specified local information * Create a Dialer with the specified local information
* NOTE: This fills in the fallback_dialer too * NOTE: This fills in the fallback_dialer too
* @param peer_id the local PeerID * @param peer the local Peer
* @param peerstore the local peerstore
* @param private_key the local private key * @param private_key the local private key
* @param swarm the swarm
* @returns a new Dialer struct * @returns a new Dialer struct
*/ */
struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* private_key); struct Dialer* libp2p_conn_dialer_new(struct Libp2pPeer* peer, struct Peerstore* peerstore, struct RsaPrivateKey* private_key, struct SwarmContext* swarm);
/** /**
* free resources from the Dialer struct * free resources from the Dialer struct

View file

@ -14,11 +14,17 @@
* So in short, much of this will change. But for now, think of it as a Proof of Concept. * So in short, much of this will change. But for now, think of it as a Proof of Concept.
*/ */
enum MultistreamStatus {
multistream_status_initialized,
multistream_status_syn,
multistream_status_ack
};
struct MultistreamContext { struct MultistreamContext {
struct Libp2pVector* handlers; struct Libp2pVector* handlers;
struct SessionContext* session_context; struct SessionContext* session_context;
struct Stream* stream; struct Stream* stream;
volatile enum MultistreamStatus status;
}; };
/*** /***
@ -114,3 +120,12 @@ struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessag
struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, int theyRequested); struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, int theyRequested);
void libp2p_net_multistream_stream_free(struct Stream* stream); void libp2p_net_multistream_stream_free(struct Stream* stream);
/***
* Wait for multistream stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs);

View file

@ -10,6 +10,13 @@
* Handling of a secure connection * Handling of a secure connection
*/ */
enum SecioStatus {
secio_status_unknown,
secio_status_initialized,
secio_status_syn,
secio_status_ack
};
struct SecioContext { struct SecioContext {
struct Stream* stream; struct Stream* stream;
struct SessionContext* session_context; struct SessionContext* session_context;
@ -17,6 +24,7 @@ struct SecioContext {
struct Peerstore* peer_store; struct Peerstore* peer_store;
struct StreamMessage* buffered_message; struct StreamMessage* buffered_message;
size_t buffered_message_pos; size_t buffered_message_pos;
volatile enum SecioStatus status;
}; };
struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store); struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPrivateKey* private_key, struct Peerstore* peer_store);
@ -25,12 +33,11 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv
* Initiates a secio handshake. Use this method when you want to initiate a secio * Initiates a secio handshake. Use this method when you want to initiate a secio
* session. This should not be used to respond to incoming secio requests * session. This should not be used to respond to incoming secio requests
* @param parent_stream the parent stream * @param parent_stream the parent stream
* @param remote_peer the remote peer
* @param peerstore the peerstore * @param peerstore the peerstore
* @param rsa_private_key the local private key * @param rsa_private_key the local private key
* @returns a Secio Stream * @returns a Secio Stream
*/ */
struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key); struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key);
/*** /***
* Initiates a secio handshake. Use this method when you want to initiate a secio * Initiates a secio handshake. Use this method when you want to initiate a secio
@ -58,10 +65,16 @@ int libp2p_secio_receive_protocol(struct Stream* stream);
* performs initial communication over an insecure channel to share * performs initial communication over an insecure channel to share
* keys, IDs, and initiate connection. This is a framed messaging system * keys, IDs, and initiate connection. This is a framed messaging system
* NOTE: session must contain a valid socket_descriptor that is a multistream. * NOTE: session must contain a valid socket_descriptor that is a multistream.
* @param local_session the secure session to be filled * @param secio_stream a stream that is a Secio stream
* @param private_key our private key to use
* @param peerstore the collection of peers
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SecioContext* secio_context); int libp2p_secio_handshake(struct Stream* secio_stream);
/***
* Wait for secio stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_secio_ready(struct SessionContext* session_context, int timeout_secs);

View file

@ -0,0 +1,44 @@
#pragma once
/***
* This listens for requests from the connected peers
*/
#include "libp2p/utils/thread_pool.h"
#include "libp2p/db/datastore.h"
#include "libp2p/db/filestore.h"
#include "libp2p/peer/peer.h"
struct SwarmContext {
threadpool thread_pool;
struct Libp2pVector* protocol_handlers;
struct Datastore* datastore;
struct Filestore* filestore;
};
/***
* Add a connected peer to the swarm
* NOTE: We should already have a connection to the peer
* @param context the SwarmContext
* @param peer the connected peer
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_swarm_add_peer(struct SwarmContext* context, struct Libp2pPeer* peer);
/**
* add an incoming connection
* @param context the SwarmContext
* @param file_descriptor the incoming file descriptor of the connection
* @param ip the incoming ip (ipv4 format)
* @param port the incoming port
* @return true(1) on success, false(0) otherwise
*/
int libp2p_swarm_add_connection(struct SwarmContext* context, int file_descriptor, int ip, int port );
/**
* Fire up the swarm engine, and return its context
* @param protocol_handlers the protocol handlers
* @param datastore the datastore
* @param filestore the file store
* @returns the SwarmContext
*/
struct SwarmContext* libp2p_swarm_new(struct Libp2pVector* protocol_handlers, struct Datastore* datastore, struct Filestore* filestore);

View file

@ -165,7 +165,7 @@ struct StreamMessage* libp2p_net_multistream_prepare_to_send(struct StreamMessag
* @param msg the data to send * @param msg the data to send
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) { int libp2p_net_multistream_write_without_check(void* stream_context, struct StreamMessage* incoming) {
struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context; struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context;
struct Stream* parent_stream = multistream_context->stream->parent_stream; struct Stream* parent_stream = multistream_context->stream->parent_stream;
int num_bytes = 0; int num_bytes = 0;
@ -184,6 +184,47 @@ int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* inc
return num_bytes; return num_bytes;
} }
/***
* Wait for multistream stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_net_multistream_ready(struct SessionContext* session_context, int timeout_secs) {
int counter = 0;
while (session_context->default_stream->stream_type != STREAM_TYPE_MULTISTREAM && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (session_context->default_stream->stream_type == STREAM_TYPE_MULTISTREAM && counter < 5) {
struct MultistreamContext* ctx = (struct MultistreamContext*)session_context->default_stream->stream_context;
while (ctx->status != multistream_status_ack && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (ctx->status == multistream_status_ack)
return 1;
}
return 0;
}
/**
* Write to an open multistream host
* @param stream_context the session context
* @param msg the data to send
* @returns the number of bytes written
*/
int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* incoming) {
struct MultistreamContext* multistream_context = (struct MultistreamContext*) stream_context;
if (multistream_context->status != multistream_status_ack) {
libp2p_logger_error("multistream", "Attempt to write before protocol is completely set up.\n");
return 0;
}
return libp2p_net_multistream_write_without_check(stream_context, incoming);
}
/** /**
* Read from a multistream socket * Read from a multistream socket
* @param socket_fd the socket file descriptor * @param socket_fd the socket file descriptor
@ -306,8 +347,9 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
} }
/** /**
* Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function. * Negotiate the multistream protocol by sending the protocol id. This is a server side function.
* Servers should send the protocol ID, and then expect it back. * Servers should send the protocol ID, and then expect it back. Receiving the
* protocol id back is the responsibility of a future read, not part of this function.
* NOTE: the SessionContext should already contain the connected stream. If not, use * NOTE: the SessionContext should already contain the connected stream. If not, use
* libp2p_net_multistream_connect instead of this method. * libp2p_net_multistream_connect instead of this method.
* *
@ -320,38 +362,46 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq
struct StreamMessage outgoing; struct StreamMessage outgoing;
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
int retVal = 0; int retVal = 0;
int haveTheirs = 0; //int haveTheirs = 0;
int peek_result = 0; //int peek_result = 0;
/*
if (!theyRequested) { if (!theyRequested) {
// see if they're trying to send something first // see if they're trying to send something first
peek_result = libp2p_net_multistream_peek(ctx); peek_result = libp2p_net_multistream_peek(ctx);
/*
if (peek_result < 0) {
libp2p_logger_error("multistream", "Attempted a peek, but received an error.\n");
return 0;
}
*/
if (peek_result > 0) { if (peek_result > 0) {
libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result); libp2p_logger_debug("multistream", "negotiate: There is %d bytes waiting for us. Perhaps it is the multistream header we're expecting.\n", peek_result);
// get the protocol // get the protocol
//ctx->stream->parent_stream->read(ctx->stream->parent_stream->stream_context, &results, multistream_default_timeout);
libp2p_net_multistream_read(ctx, &results, multistream_default_timeout); libp2p_net_multistream_read(ctx, &results, multistream_default_timeout);
if (results == NULL || results->data_size == 0) if (results == NULL || results->data_size == 0) {
libp2p_logger_debug("multistream", "negotiate: We tried to read the %d bytes, but got nothing.\n", peek_result);
goto exit; goto exit;
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) }
if (strncmp((char*)results->data, protocolID, strlen(protocolID)) != 0) {
libp2p_logger_debug("multistream", "negotiate: We expected the multistream id, but got %s.\n", results->data);
goto exit; goto exit;
}
libp2p_logger_debug("multistream", "negotiate: We read %d bytes from the network, and received the multistream id.\n", results->data_size);
haveTheirs = 1; haveTheirs = 1;
} }
} }
*/
// send the protocol id // send the protocol id
outgoing.data = (uint8_t*)protocolID; outgoing.data = (uint8_t*)protocolID;
outgoing.data_size = strlen(protocolID); outgoing.data_size = strlen(protocolID);
if (!libp2p_net_multistream_write(ctx, &outgoing)) { if (!libp2p_net_multistream_write_without_check(ctx, &outgoing)) {
libp2p_logger_debug("multistream", "negotiate: Attempted to send the multistream id, but the write failed.\n"); libp2p_logger_debug("multistream", "negotiate: Attempted to send the multistream id, but the write failed.\n");
goto exit; goto exit;
} }
// update the status
if (theyRequested) {
ctx->status = multistream_status_ack;
} else {
ctx->status = multistream_status_syn;
}
/*
// wait for them to send the protocol id back // wait for them to send the protocol id back
if (!theyRequested && !haveTheirs) { if (!theyRequested && !haveTheirs) {
libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n"); libp2p_logger_debug("multistream", "negotiate: Wrote multistream id to network, awaiting reply...\n");
@ -366,6 +416,7 @@ int libp2p_net_multistream_negotiate(struct MultistreamContext* ctx, int theyReq
goto exit; goto exit;
} }
} }
*/
retVal = 1; retVal = 1;
exit: exit:
@ -406,6 +457,7 @@ int libp2p_net_multistream_handle_upgrade(struct Stream* multistream, struct Str
// take multistream out of the picture // take multistream out of the picture
if (new_stream->parent_stream == multistream) { if (new_stream->parent_stream == multistream) {
new_stream->parent_stream = multistream->parent_stream; new_stream->parent_stream = multistream->parent_stream;
multistream->parent_stream->handle_upgrade(multistream->parent_stream, new_stream);
} }
return 1; return 1;
} }
@ -429,12 +481,14 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
out->negotiate = libp2p_net_multistream_handshake; out->negotiate = libp2p_net_multistream_handshake;
out->handle_upgrade = libp2p_net_multistream_handle_upgrade; out->handle_upgrade = libp2p_net_multistream_handle_upgrade;
out->address = parent_stream->address; out->address = parent_stream->address;
out->socket_mutex = parent_stream->socket_mutex;
// build MultistreamContext // build MultistreamContext
struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext)); struct MultistreamContext* ctx = (struct MultistreamContext*) malloc(sizeof(struct MultistreamContext));
if (ctx == NULL) { if (ctx == NULL) {
libp2p_net_multistream_stream_free(out); libp2p_net_multistream_stream_free(out);
return NULL; return NULL;
} }
ctx->status = multistream_status_initialized;
out->stream_context = ctx; out->stream_context = ctx;
ctx->stream = out; ctx->stream = out;
ctx->handlers = NULL; ctx->handlers = NULL;
@ -445,6 +499,15 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
libp2p_net_multistream_stream_free(out); libp2p_net_multistream_stream_free(out);
return NULL; return NULL;
} }
if (!theyRequested) {
int timeout = 5;
int counter = 0;
// wait for the response
while(ctx->status != multistream_status_ack && counter < timeout) {
sleep(1);
counter++;
}
}
} }
return out; return out;
} }
@ -457,7 +520,18 @@ struct Stream* libp2p_net_multistream_stream_new(struct Stream* parent_stream, i
* @returns <0 on error, 0 for the caller to stop handling this, 1 for success * @returns <0 on error, 0 for the caller to stop handling this, 1 for success
*/ */
int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { int libp2p_net_multistream_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
// attempt negotiations if (stream->stream_type == STREAM_TYPE_MULTISTREAM) {
// we sent a multistream, and this is them responding
struct MultistreamContext* ctx = (struct MultistreamContext*) stream->stream_context;
if (ctx->status == multistream_status_ack) {
// uh oh, this stream is already set up. error
return -1;
} else {
ctx->status = multistream_status_ack;
}
return 1;
}
// the incoming stream is not a multistream. They are attempting to upgrade to multistream
struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1); struct Stream* new_stream = libp2p_net_multistream_stream_new(stream, 1);
if (new_stream != NULL) { if (new_stream != NULL) {
// upgrade // upgrade

View file

@ -58,12 +58,26 @@ int libp2p_secio_can_handle(const struct StreamMessage* msg) {
int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) { int libp2p_secio_handle_message(const struct StreamMessage* msg, struct Stream* stream, void* protocol_context) {
libp2p_logger_debug("secio", "Handling incoming secio message.\n"); libp2p_logger_debug("secio", "Handling incoming secio message.\n");
struct SecioContext* ctx = (struct SecioContext*)protocol_context; struct SecioContext* ctx = (struct SecioContext*)protocol_context;
// send them the protocol struct Stream* secio_stream = NULL;
if (!libp2p_secio_send_protocol(stream)) // get the latest stream for the session context, as it may have changed (multithreaded)
return -1; struct Stream* root_stream = stream;
int retVal = libp2p_secio_handshake(ctx); while (root_stream->parent_stream != NULL)
if (retVal) root_stream = root_stream->parent_stream;
struct ConnectionContext* connection_context = (struct ConnectionContext*)root_stream->stream_context;
stream = connection_context->session_context->default_stream;
// have we not already started negotiating?
if (stream->stream_type != STREAM_TYPE_SECIO) {
secio_stream = libp2p_secio_stream_new(stream, ctx->peer_store, ctx->private_key);
// send them the protocol
if (!libp2p_secio_send_protocol(stream))
return -1;
} else {
secio_stream = stream;
}
int retVal = libp2p_secio_handshake(secio_stream);
if (retVal) {
return 0; return 0;
}
return -1; return -1;
} }
@ -80,6 +94,9 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv
context->buffered_message_pos = -1; context->buffered_message_pos = -1;
context->private_key = private_key; context->private_key = private_key;
context->peer_store = peer_store; context->peer_store = peer_store;
context->stream = NULL;
context->session_context = NULL;
context->status = secio_status_unknown;
handler->context = context; handler->context = context;
handler->CanHandle = libp2p_secio_can_handle; handler->CanHandle = libp2p_secio_can_handle;
handler->HandleMessage = libp2p_secio_handle_message; handler->HandleMessage = libp2p_secio_handle_message;
@ -800,6 +817,11 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in
int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) { int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) {
struct SecioContext* ctx = (struct SecioContext*) stream_context; struct SecioContext* ctx = (struct SecioContext*) stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream; struct Stream* parent_stream = ctx->stream->parent_stream;
if (ctx->status != secio_status_ack) {
return parent_stream->write(parent_stream->stream_context, bytes);
}
struct SessionContext* session_context = ctx->session_context; struct SessionContext* session_context = ctx->session_context;
// writer uses the local cipher and mac // writer uses the local cipher and mac
@ -897,6 +919,10 @@ int libp2p_secio_encrypted_read(void* stream_context, struct StreamMessage** byt
int retVal = 0; int retVal = 0;
struct SecioContext* ctx = (struct SecioContext*)stream_context; struct SecioContext* ctx = (struct SecioContext*)stream_context;
struct Stream* parent_stream = ctx->stream->parent_stream; struct Stream* parent_stream = ctx->stream->parent_stream;
if (ctx->status != secio_status_ack) {
return parent_stream->read(parent_stream->stream_context, bytes, timeout_secs);
}
// reader uses the remote cipher and mac // reader uses the remote cipher and mac
// read the data // read the data
struct StreamMessage* msg = NULL; struct StreamMessage* msg = NULL;
@ -970,7 +996,7 @@ struct Libp2pPeer* libp2p_secio_get_peer_or_add(struct Peerstore* peerstore, str
* @param peerstore the collection of peers * @param peerstore the collection of peers
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SecioContext* secio_context) { int libp2p_secio_handshake(struct Stream* secio_stream) {
int retVal = 0; int retVal = 0;
size_t bytes_written = 0; size_t bytes_written = 0;
struct StreamMessage* incoming = NULL; struct StreamMessage* incoming = NULL;
@ -993,10 +1019,15 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
struct StretchedKey* k1 = NULL, *k2 = NULL; struct StretchedKey* k1 = NULL, *k2 = NULL;
struct Libp2pPeer* remote_peer = NULL; struct Libp2pPeer* remote_peer = NULL;
struct SecioContext* secio_context = secio_stream->stream_context;
struct SessionContext* local_session = secio_context->session_context; struct SessionContext* local_session = secio_context->session_context;
struct RsaPrivateKey* private_key = secio_context->private_key; struct RsaPrivateKey* private_key = secio_context->private_key;
struct Peerstore* peerstore = secio_context->peer_store; struct Peerstore* peerstore = secio_context->peer_store;
libp2p_logger_debug("secio", "handshake: Getting read lock.\n");
pthread_mutex_lock(secio_stream->socket_mutex);
libp2p_logger_debug("secio", "handshake: Got read lock.\n");
//TODO: make sure we're not talking to ourself //TODO: make sure we're not talking to ourself
// send the protocol id and the outgoing Propose struct // send the protocol id and the outgoing Propose struct
@ -1010,6 +1041,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
propose_out = libp2p_secio_propose_build(local_session->local_nonce, private_key, propose_out = libp2p_secio_propose_build(local_session->local_nonce, private_key,
SupportedExchanges, SupportedCiphers, SupportedHashes); SupportedExchanges, SupportedCiphers, SupportedHashes);
/*
if (libp2p_logger_watching_class("secio")) { if (libp2p_logger_watching_class("secio")) {
fprintf(stdout, "Our public key: "); fprintf(stdout, "Our public key: ");
for(int i = 0; i < propose_out->public_key_size; i++) { for(int i = 0; i < propose_out->public_key_size; i++) {
@ -1017,6 +1049,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
} }
fprintf(stdout, "\n"); fprintf(stdout, "\n");
} }
*/
// protobuf the proposal // protobuf the proposal
propose_out_size = libp2p_secio_propose_protobuf_encode_size(propose_out); propose_out_size = libp2p_secio_propose_protobuf_encode_size(propose_out);
@ -1027,15 +1060,17 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
// now send the Propose struct // now send the Propose struct
outgoing.data = propose_out_bytes; outgoing.data = propose_out_bytes;
outgoing.data_size = propose_out_size; outgoing.data_size = propose_out_size;
bytes_written = libp2p_secio_unencrypted_write(secio_context->stream, &outgoing); libp2p_logger_debug("secio", "About to send propose_out, of %d bytes.\n", propose_out_size);
bytes_written = libp2p_secio_unencrypted_write(secio_stream, &outgoing);
if (bytes_written != propose_out_size) { if (bytes_written != propose_out_size) {
libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written); libp2p_logger_error("secio", "Sent propose_out, but did not write the correct number of bytes. Should be %d but was %d.\n", propose_out_size, bytes_written);
goto exit; goto exit;
} }
libp2p_logger_debug("secio", "Sent propose_out, waiting for propose_in.\n");
// try to get the Propose struct from the remote peer // try to get the Propose struct from the remote peer
bytes_written = libp2p_secio_unencrypted_read(secio_context->stream, &incoming, 10); bytes_written = libp2p_secio_unencrypted_read(secio_stream, &incoming, 10);
if (bytes_written <= 0) { if (bytes_written <= 0) {
libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n"); libp2p_logger_error("secio", "Unable to get the remote's Propose struct.\n");
goto exit; goto exit;
@ -1141,7 +1176,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
outgoing.data = exchange_out_protobuf; outgoing.data = exchange_out_protobuf;
outgoing.data_size = exchange_out_protobuf_size; outgoing.data_size = exchange_out_protobuf_size;
bytes_written = libp2p_secio_unencrypted_write(secio_context->stream, &outgoing); bytes_written = libp2p_secio_unencrypted_write(secio_stream, &outgoing);
if (exchange_out_protobuf_size != bytes_written) { if (exchange_out_protobuf_size != bytes_written) {
libp2p_logger_error("secio", "Unable to write exchange_out\n"); libp2p_logger_error("secio", "Unable to write exchange_out\n");
goto exit; goto exit;
@ -1154,7 +1189,7 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
// receive Exchange packet // receive Exchange packet
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchange packet\n");
bytes_written = libp2p_secio_unencrypted_read(secio_context->stream, &incoming, 10); bytes_written = libp2p_secio_unencrypted_read(secio_stream, &incoming, 10);
if (bytes_written == 0) { if (bytes_written == 0) {
libp2p_logger_error("secio", "unable to read exchange packet.\n"); libp2p_logger_error("secio", "unable to read exchange packet.\n");
libp2p_peer_handle_connection_error(remote_peer); libp2p_peer_handle_connection_error(remote_peer);
@ -1236,6 +1271,8 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
libp2p_secio_initialize_crypto(local_session); libp2p_secio_initialize_crypto(local_session);
secio_context->status = secio_status_ack;
// send their nonce to verify encryption works // send their nonce to verify encryption works
outgoing.data = (uint8_t*)local_session->remote_nonce; outgoing.data = (uint8_t*)local_session->remote_nonce;
outgoing.data_size = 16; outgoing.data_size = 16;
@ -1264,17 +1301,21 @@ int libp2p_secio_handshake(struct SecioContext* secio_context) {
libp2p_stream_message_free(incoming); libp2p_stream_message_free(incoming);
incoming = NULL; incoming = NULL;
/* Stream->HandleUpgrade now does this...
// set up the secure stream in the struct // set up the secure stream in the struct
local_session->secure_stream = local_session->insecure_stream; local_session->secure_stream = local_session->insecure_stream;
local_session->secure_stream->read = libp2p_secio_encrypted_read; local_session->secure_stream->read = libp2p_secio_encrypted_read;
local_session->secure_stream->write = libp2p_secio_encrypted_write; local_session->secure_stream->write = libp2p_secio_encrypted_write;
// set secure as default // set secure as default
local_session->default_stream = local_session->secure_stream; local_session->default_stream = local_session->secure_stream;
*/
retVal = 1; retVal = 1;
//libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake complete\n"); //libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake complete\n");
exit: exit:
libp2p_logger_debug("secio", "Releasing read lock.\n");
pthread_mutex_unlock(secio_stream->socket_mutex);
libp2p_logger_debug("secio", "Read lock released.\n");
if (propose_in_bytes != NULL) if (propose_in_bytes != NULL)
free(propose_in_bytes); free(propose_in_bytes);
if (propose_out_bytes != NULL) if (propose_out_bytes != NULL)
@ -1356,13 +1397,19 @@ int libp2p_secio_close(struct Stream* stream) {
* Initiates a secio handshake. Use this method when you want to initiate a secio * Initiates a secio handshake. Use this method when you want to initiate a secio
* session. This should not be used to respond to incoming secio requests * session. This should not be used to respond to incoming secio requests
* @param parent_stream the parent stream * @param parent_stream the parent stream
* @param remote_peer the remote peer
* @param peerstore the peerstore * @param peerstore the peerstore
* @param rsa_private_key the local private key * @param rsa_private_key the local private key
* @returns a Secio Stream * @returns a Secio Stream
*/ */
struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp2pPeer* remote_peer, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) { struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Peerstore* peerstore, struct RsaPrivateKey* rsa_private_key) {
struct Stream* new_stream = libp2p_stream_new(); struct Stream* new_stream = libp2p_stream_new();
// get SessionContext
struct Stream* root_stream = parent_stream;
while (root_stream->parent_stream != NULL )
root_stream = root_stream->parent_stream;
struct ConnectionContext* connection_context = (struct ConnectionContext*)root_stream->stream_context;
struct SessionContext* session_context = connection_context->session_context;
if (new_stream != NULL) { if (new_stream != NULL) {
new_stream->stream_type = STREAM_TYPE_SECIO; new_stream->stream_type = STREAM_TYPE_SECIO;
struct SecioContext* ctx = (struct SecioContext*) malloc(sizeof(struct SecioContext)); struct SecioContext* ctx = (struct SecioContext*) malloc(sizeof(struct SecioContext));
@ -1375,18 +1422,19 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp
ctx->buffered_message_pos = -1; ctx->buffered_message_pos = -1;
new_stream->stream_context = ctx; new_stream->stream_context = ctx;
ctx->stream = new_stream; ctx->stream = new_stream;
ctx->session_context = remote_peer->sessionContext; ctx->session_context = session_context;
ctx->peer_store = peerstore; ctx->peer_store = peerstore;
ctx->private_key = rsa_private_key; ctx->private_key = rsa_private_key;
ctx->status = secio_status_initialized;
new_stream->parent_stream = parent_stream; new_stream->parent_stream = parent_stream;
new_stream->close = libp2p_secio_close; new_stream->close = libp2p_secio_close;
new_stream->peek = libp2p_secio_peek; new_stream->peek = libp2p_secio_peek;
new_stream->read = libp2p_secio_encrypted_read; new_stream->read = libp2p_secio_encrypted_read;
new_stream->read_raw = libp2p_secio_read_raw; new_stream->read_raw = libp2p_secio_read_raw;
new_stream->write = libp2p_secio_encrypted_write; new_stream->write = libp2p_secio_encrypted_write;
if (!libp2p_secio_send_protocol(parent_stream) new_stream->socket_mutex = parent_stream->socket_mutex;
|| !libp2p_secio_receive_protocol(parent_stream) parent_stream->handle_upgrade(parent_stream, new_stream);
|| !libp2p_secio_handshake(ctx)) { if (!libp2p_secio_send_protocol(parent_stream)) {
libp2p_stream_free(new_stream); libp2p_stream_free(new_stream);
new_stream = NULL; new_stream = NULL;
} }
@ -1394,3 +1442,33 @@ struct Stream* libp2p_secio_stream_new(struct Stream* parent_stream, struct Libp
return new_stream; return new_stream;
} }
/***
* Wait for secio stream to become ready
* @param session_context the session context to check
* @param timeout_secs the number of seconds to wait for things to become ready
* @returns true(1) if it becomes ready, false(0) otherwise
*/
int libp2p_secio_ready(struct SessionContext* session_context, int timeout_secs) {
int counter = 0;
while (session_context != NULL
&& session_context->default_stream != NULL
&& session_context->default_stream->stream_type != STREAM_TYPE_SECIO
&& counter <= timeout_secs) {
counter++;
sleep(1);
}
if (session_context != NULL
&& session_context->default_stream != NULL
&& session_context->default_stream->stream_type == STREAM_TYPE_SECIO
&& counter < 5) {
struct SecioContext* ctx = (struct SecioContext*)session_context->default_stream->stream_context;
while (ctx->status != secio_status_ack && counter <= timeout_secs) {
counter++;
sleep(1);
}
if (ctx->status == secio_status_ack)
return 1;
}
return 0;
}

18
swarm/Makefile Normal file
View file

@ -0,0 +1,18 @@
CC = gcc
CFLAGS = -O0 -Wall -I../include -I../../c-protobuf -I../../c-multiaddr/include -std=c99
ifdef DEBUG
CFLAGS += -g3
endif
LFLAGS =
DEPS =
OBJS = swarm.o
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)
all: $(OBJS)
clean:
rm -f *.o

View file

@ -1 +1,167 @@
/***
* This listens for requests from the connected peers
*/
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include "libp2p/net/protocol.h"
#include "libp2p/net/connectionstream.h"
#include "libp2p/swarm/swarm.h"
#include "libp2p/utils/logger.h"
/**
* Helps pass information to a new thread
*/
struct SwarmSession {
struct SessionContext* session_context;
struct SwarmContext* swarm_context;
};
int DEFAULT_NETWORK_TIMEOUT = 5;
/***
* Listens on a particular stream, and marshals the request
* @param stream the stream to listen to
* @param protocol_handlers a vector of protocol handlers
* @returns <0 on error, 0 if we shouldn't handle this anymore, or 1 on success
*/
int libp2p_swarm_listen_and_handle(struct Stream* stream, struct Libp2pVector* protocol_handlers) {
struct StreamMessage* results = NULL;
int retVal = 0;
// Read from the network
libp2p_logger_debug("swarm", "Attempting to get read lock.\n");
pthread_mutex_lock(stream->socket_mutex);
libp2p_logger_debug("swarm", "Got read lock.\n");
if (!stream->read(stream->stream_context, &results, 1)) {
libp2p_logger_debug("swarm", "Releasing read lock\n");
pthread_mutex_unlock(stream->socket_mutex);
libp2p_logger_error("swarm", "Unable to read from network. Exiting.\n");
return retVal;
}
libp2p_logger_debug("swarm", "Releasing read lock.\n");
pthread_mutex_unlock(stream->socket_mutex);
if (results != NULL) {
libp2p_logger_debug("swarm", "Attempting to marshal %d bytes from network.\n", results->data_size);
retVal = libp2p_protocol_marshal(results, stream, protocol_handlers);
libp2p_logger_debug("swarm", "The return value from the attempt to marshal %d bytes was %d.\n", results->data_size, retVal);
libp2p_stream_message_free(results);
} else {
libp2p_logger_debug("swarm", "Attempted read, but results were null. This is normal.\n");
}
return retVal;
}
/***
* This is on its own thread, and listens for incoming data from a particular client
* @param session the SessionContext
*/
void libp2p_swarm_listen(void* ctx) {
struct SwarmSession* swarm_session = (struct SwarmSession*) ctx;
struct SessionContext* session_context = swarm_session->session_context;
int retVal = 0;
for(;;) {
// Read from the network
retVal = libp2p_swarm_listen_and_handle(session_context->default_stream, swarm_session->swarm_context->protocol_handlers);
if (retVal < 0) {
// exit the loop on error
libp2p_logger_debug("swarm", "listen: Exiting loop due to retVal being %d.\n", retVal);
break;
}
} // end of loop
// clean up memory
if (session_context->host != NULL)
free(session_context->host);
free(swarm_session);
}
/***
* Add a connected peer to the swarm
* NOTE: We should already have a connection to the peer
* @param context the SwarmContext
* @param peer the connected peer
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_swarm_add_peer(struct SwarmContext* context, struct Libp2pPeer* peer) {
// spin off a thread for this peer
struct SwarmSession* swarm_session = (struct SwarmSession*) malloc(sizeof(struct SwarmSession));
swarm_session->session_context = peer->sessionContext;
swarm_session->swarm_context = context;
if (thpool_add_work(context->thread_pool, libp2p_swarm_listen, swarm_session) < 0) {
libp2p_logger_error("swarm", "Unable to fire up thread for peer %s\n", libp2p_peer_id_to_string(peer));
return 0;
}
libp2p_logger_info("swarm", "add_connection: added connection for peer %s.\n", libp2p_peer_id_to_string(peer));
return 1;
}
/**
* add an incoming connection
* @param context the SwarmContext
* @param file_descriptor the incoming file descriptor of the connection
* @param ip the incoming ip (ipv4 format)
* @param port the incoming port
* @return true(1) on success, false(0) otherwise
*/
int libp2p_swarm_add_connection(struct SwarmContext* context, int file_descriptor, int ip, int port ) {
// build a session context
struct SessionContext* session = libp2p_session_context_new();
if (session == NULL) {
libp2p_logger_error("swarm", "Unable to allocate SessionContext. Out of memory?\n");
return 0;
}
session->datastore = context->datastore;
session->filestore = context->filestore;
// convert IP address to text
session->host = malloc(INET_ADDRSTRLEN);
if (session->host == NULL) {
// we are out of memory
free(session->host);
return 0;
}
if (inet_ntop(AF_INET, &ip, session->host, INET_ADDRSTRLEN) == NULL) {
free(session->host);
session->host = NULL;
session->port = 0;
return 0;
}
session->port = port;
session->insecure_stream = libp2p_net_connection_new(file_descriptor, session->host, session->port, session);
session->default_stream = session->insecure_stream;
struct SwarmSession* swarm_session = (struct SwarmSession*) malloc(sizeof(struct SwarmSession));
swarm_session->session_context = session;
swarm_session->swarm_context = context;
if (thpool_add_work(context->thread_pool, libp2p_swarm_listen, swarm_session) < 0) {
libp2p_logger_error("swarm", "Unable to fire up thread for connection %d\n", file_descriptor);
return 0;
}
libp2p_logger_info("swarm", "add_connection: added connection %d.\n", file_descriptor);
return 1;
}
/**
* Fire up the swarm engine, and return its context
* @param protocol_handlers the protocol handlers
* @param datastore the datastore
* @param filestore the file store
* @returns the SwarmContext
*/
struct SwarmContext* libp2p_swarm_new(struct Libp2pVector* protocol_handlers, struct Datastore* datastore, struct Filestore* filestore) {
struct SwarmContext* context = (struct SwarmContext*) malloc(sizeof(struct SwarmContext));
if (context != NULL) {
context->thread_pool = thpool_init(25);
context->protocol_handlers = protocol_handlers;
context->datastore = datastore;
context->filestore = filestore;
}
return context;
}

View file

@ -12,7 +12,7 @@ int test_dialer_new() {
peer->id = malloc(strlen(peer_id)+1); peer->id = malloc(strlen(peer_id)+1);
strcpy(peer->id, peer_id); strcpy(peer->id, peer_id);
peer->id_size = strlen(peer_id); peer->id_size = strlen(peer_id);
struct Dialer* dialer = libp2p_conn_dialer_new(peer, NULL, private_key); struct Dialer* dialer = libp2p_conn_dialer_new(peer, NULL, private_key, NULL);
if (dialer == NULL) if (dialer == NULL)
goto exit; goto exit;
retVal = 1; retVal = 1;
@ -38,7 +38,7 @@ int test_dialer_dial() {
goto exit; goto exit;
peer->id_size = strlen((char*)peer->id); peer->id_size = strlen((char*)peer->id);
dialer = libp2p_conn_dialer_new(peer, NULL, NULL); dialer = libp2p_conn_dialer_new(peer, NULL, NULL, NULL);
if (dialer == NULL) if (dialer == NULL)
goto exit; goto exit;
@ -109,7 +109,7 @@ int test_dialer_join_swarm() {
peerstore = libp2p_peerstore_new(local_peer); peerstore = libp2p_peerstore_new(local_peer);
// 3) make the dialer // 3) make the dialer
dialer = libp2p_conn_dialer_new(local_peer, peerstore, rsa_private_key); dialer = libp2p_conn_dialer_new(local_peer, peerstore, rsa_private_key, NULL);
// 4) make the remote peer // 4) make the remote peer
remote_ma = multiaddress_new_from_string(remote_swarm); remote_ma = multiaddress_new_from_string(remote_swarm);
@ -155,7 +155,7 @@ int test_dialer_dial_multistream() {
peer->id_size = strlen((char*)peer->id); peer->id_size = strlen((char*)peer->id);
dialer = libp2p_conn_dialer_new(peer, NULL, NULL); dialer = libp2p_conn_dialer_new(peer, NULL, NULL, NULL);
if (dialer == NULL) if (dialer == NULL)
goto exit; goto exit;

View file

@ -343,7 +343,7 @@ int test_yamux_client_server_connect() {
libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers);
sleep(1); sleep(1);
// set up client (easiest to use transport dialers) // set up client (easiest to use transport dialers)
struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL);
struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234");
struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma);
if (stream == NULL) { if (stream == NULL) {
@ -391,7 +391,7 @@ int test_yamux_client_server_multistream() {
libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers); libp2p_net_server_start("127.0.0.1", 1234, protocol_handlers);
sleep(1); sleep(1);
// set up client (easiest to use transport dialers) // set up client (easiest to use transport dialers)
struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL);
struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234");
struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma);
if (stream == NULL) { if (stream == NULL) {
@ -468,7 +468,7 @@ int test_yamux_multistream_client() {
handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers); handler = libp2p_net_multistream_build_protocol_handler(protocol_handlers);
libp2p_utils_vector_add(protocol_handlers, handler); libp2p_utils_vector_add(protocol_handlers, handler);
// set up client (easiest to use transport dialers) // set up client (easiest to use transport dialers)
struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL); struct Dialer* dialer = libp2p_conn_dialer_new(NULL, NULL, NULL, NULL);
struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234"); struct MultiAddress* server_ma = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/1234");
struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma); struct Stream* stream = libp2p_conn_dialer_get_connection(dialer, server_ma);
if (stream == NULL) { if (stream == NULL) {

View file

@ -152,6 +152,36 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
return outgoing.data_size; return outgoing.data_size;
} }
/***
* Get message, removing the frame prefix
* @param incoming the incoming bytes
* @param incoming_size the size of incoming
* @param return_message where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int yamux_pull_message_from_frame(const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message) {
if (incoming_size <= 12) {
return 0;
}
int sz = sizeof(struct yamux_frame);
*return_message = libp2p_stream_message_new();
struct StreamMessage* msg = *return_message;
if (msg == NULL) {
libp2p_logger_debug("yamux", "pull_message_from_frame: Unable to allocate memory for message.\n");
return 0;
}
msg->data_size = incoming_size - sz;
msg->data = (uint8_t*) malloc(msg->data_size);
if (msg->data == NULL) {
libp2p_logger_debug("yamux", "pull_message_from_frame: Unable to allocate memory for data portion of message.\n");
libp2p_stream_message_free(msg);
*return_message = NULL;
return 0;
}
memcpy(msg->data, &incoming[sz], msg->data_size);
return 1;
}
/** /**
* Decode an incoming message * Decode an incoming message
* @param context the YamuxContext or YamuxChannelContext * @param context the YamuxContext or YamuxChannelContext
@ -181,6 +211,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
decode_frame(&f); decode_frame(&f);
// check yamux version // check yamux version
if (f.version != YAMUX_VERSION) { if (f.version != YAMUX_VERSION) {
libp2p_logger_error("yamux", "Incorrect Yamux version. Expected %d but received %d.\n", YAMUX_VERSION, f.version); libp2p_logger_error("yamux", "Incorrect Yamux version. Expected %d but received %d.\n", YAMUX_VERSION, f.version);
@ -287,6 +318,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n");
ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size);
yamux_pull_message_from_frame(incoming, incoming_size, return_message);
return (re < 0) ? re : (re + incoming_size); return (re < 0) ? re : (re + incoming_size);
} }
} }
@ -296,21 +328,12 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
if (f.flags & yamux_frame_syn) if (f.flags & yamux_frame_syn)
{ {
libp2p_logger_debug("yamux", "Looks like we have a new stream coming in. Stream %d.\n", f.streamid); libp2p_logger_debug("yamux", "Looks like we have a new stream coming in. Stream %d.\n", f.streamid);
struct StreamMessage* msg = libp2p_stream_message_new(); yamux_pull_message_from_frame(incoming, incoming_size, return_message);
if (incoming_size > sizeof(struct yamux_frame)) {
msg->data_size = incoming_size - sizeof(struct yamux_frame);
libp2p_logger_debug("yamux", "Stream %d has data after the frame, with a length of %d.\n", f.streamid, msg->data_size);
msg->data = malloc(msg->data_size);
memcpy(msg->data, &incoming[sizeof(struct yamux_frame)], msg->data_size);
} else {
libp2p_logger_debug("yamux", "Stream %d has no extra data after the frame.\n", f.streamid);
}
// if we didn't initiate it, add this new channel (odd stream id is from client, even is from server) // if we didn't initiate it, add this new channel (odd stream id is from client, even is from server)
if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) { if ( (f.streamid % 2 == 0 && !yamuxContext->am_server) || (f.streamid % 2 == 1 && yamuxContext->am_server) ) {
libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n"); libp2p_logger_debug("yamux", "This is a new channel. Creating it...\n");
struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, msg); struct Stream* yamuxChannelStream = yamux_channel_new(yamuxContext, f.streamid, *return_message);
if (yamuxChannelStream == NULL) { if (yamuxChannelStream == NULL) {
libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid); libp2p_logger_error("yamux", "session->yamux_decode: Unable to create new yamux channel for stream id %d.\n", f.streamid);
return -EPROTO; return -EPROTO;
@ -319,7 +342,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
if (yamux_session->new_stream_fn) { if (yamux_session->new_stream_fn) {
libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n");
yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg); yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, *return_message);
} }
// handle window update (if there is one) // handle window update (if there is one)
struct yamux_session_stream ss = yamux_session->streams[f.streamid]; struct yamux_session_stream ss = yamux_session->streams[f.streamid];
@ -332,19 +355,22 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s
yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size);
channelContext->state = yamux_stream_syn_recv; channelContext->state = yamux_stream_syn_recv;
if (f.type == yamux_frame_window_update) { if (f.type == yamux_frame_window_update) {
libp2p_logger_debug("yamux", "Received window update for stream %d. Sending one back.\n", f.streamid);
// send it back // send it back
yamux_stream_window_update(channelContext, ss.stream->window_size); yamux_stream_window_update(channelContext, ss.stream->window_size);
} }
// TODO: Start negotiations of multistream // TODO: Start negotiations of multistream
struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0);
if (multistream != NULL) { if (multistream != NULL) {
libp2p_logger_debug("yamux", "Successfully negotiated multistream on stream %d.\n", f.streamid);
channelContext->child_stream = multistream; channelContext->child_stream = multistream;
} else {
libp2p_logger_error("yamux", "Unable to negotiate multistream on stream %d.\n", f.streamid);
} }
} else { } else {
libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)"));
} }
*return_message = msg;
} }
else { else {
libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right."); libp2p_logger_error("yamux", "We had a (probably) new frame, but the flags didn't seem right.");