Network write methods now expect StreamMessage struct

This commit is contained in:
jmjatlanta 2017-10-23 09:47:54 -05:00
parent 6147769f4b
commit 8480542b45
10 changed files with 103 additions and 64 deletions

View file

@ -53,11 +53,10 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** dat
/** /**
* Write to an open multistream host * Write to an open multistream host
* @param socket_fd the socket file descriptor * @param socket_fd the socket file descriptor
* @param results where to put the results. NOTE: this memory is allocated * @param msg the message to write
* @param results_size the size of the results in bytes
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_size); int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg);
/** /**
* Connect to a multistream host, and this includes the multistream handshaking. * Connect to a multistream host, and this includes the multistream handshaking.

View file

@ -49,10 +49,9 @@ struct Stream {
* Writes to a stream * Writes to a stream
* @param stream the stream context (usually a SessionContext pointer) * @param stream the stream context (usually a SessionContext pointer)
* @param buffer what to write * @param buffer what to write
* @param how much to write
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int (*write)(void* stream_context, const unsigned char* buffer, size_t buffer_size); int (*write)(void* stream_context, struct StreamMessage* buffer);
/** /**
* Closes a stream * Closes a stream

View file

@ -44,8 +44,11 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_net_multistream_send_protocol(struct SessionContext *context) { int libp2p_net_multistream_send_protocol(struct SessionContext *context) {
char *protocol = "/multistream/1.0.0\n"; const char* protocol = "/multistream/1.0.0\n";
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { struct StreamMessage msg;
msg.data = (uint8_t*) protocol;
msg.data_size = strlen(protocol);
if (!context->default_stream->write(context, &msg)) {
libp2p_logger_error("multistream", "send_protocol: Unable to send multistream protocol header.\n"); libp2p_logger_error("multistream", "send_protocol: Unable to send multistream protocol header.\n");
return 0; return 0;
} }
@ -191,27 +194,26 @@ int libp2p_net_multistream_peek(void* stream_context) {
/** /**
* Write to an open multistream host * Write to an open multistream host
* @param stream_context the session context * @param stream_context the session context
* @param data the data to send * @param msg the data to send
* @param data_length the length of the data
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_length) { int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct SessionContext* session_context = (struct SessionContext*)stream_context;
struct Stream* stream = session_context->default_stream; struct Stream* stream = session_context->default_stream;
int num_bytes = 0; int num_bytes = 0;
if (data_length > 0) { // only do this is if there is something to send if (msg->data_size > 0) { // only do this is if there is something to send
// first send the size // first send the size
unsigned char varint[12]; unsigned char varint[12];
size_t varint_size = 0; size_t varint_size = 0;
varint_encode(data_length, &varint[0], 12, &varint_size); varint_encode(msg->data_size, &varint[0], 12, &varint_size);
// now put the size with the data // now put the size with the data
unsigned char* buffer = (unsigned char*)malloc(data_length + varint_size); unsigned char* buffer = (unsigned char*)malloc(msg->data_size + varint_size);
if (buffer == NULL) if (buffer == NULL)
return 0; return 0;
memset(buffer, 0, data_length + varint_size); memset(buffer, 0, msg->data_size + varint_size);
memcpy(buffer, varint, varint_size); memcpy(buffer, varint, varint_size);
memcpy(&buffer[varint_size], data, data_length); memcpy(&buffer[varint_size], msg->data, msg->data_size);
// determine if this should run through the secio protocol or not // determine if this should run through the secio protocol or not
if (session_context->secure_stream == NULL) { if (session_context->secure_stream == NULL) {
int sd = *((int*)stream->socket_descriptor); int sd = *((int*)stream->socket_descriptor);
@ -222,11 +224,14 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data
return 0; return 0;
} }
// then send the actual data // then send the actual data
num_bytes += socket_write(sd, (char*)data, data_length, 0); num_bytes += socket_write(sd, (char*)msg->data, msg->data_size, 0);
session_context->last_comm_epoch = os_utils_gmtime(); session_context->last_comm_epoch = os_utils_gmtime();
} else { } else {
// write using secio // write using secio
num_bytes = stream->write(stream_context, buffer, data_length + varint_size); struct StreamMessage outgoing;
outgoing.data = buffer;
outgoing.data_size = msg->data_size + varint_size;
num_bytes = stream->write(stream_context, &outgoing);
} }
free(buffer); free(buffer);
} }
@ -428,10 +433,13 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname,
*/ */
int libp2p_net_multistream_negotiate(struct SessionContext* session) { int libp2p_net_multistream_negotiate(struct SessionContext* session) {
const char* protocolID = "/multistream/1.0.0\n"; const char* protocolID = "/multistream/1.0.0\n";
struct StreamMessage outgoing;
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
int retVal = 0; int retVal = 0;
// send the protocol id // send the protocol id
if (!libp2p_net_multistream_write(session, (unsigned char*)protocolID, strlen(protocolID))) outgoing.data = (uint8_t*)protocolID;
outgoing.data_size = strlen(protocolID);
if (!libp2p_net_multistream_write(session, &outgoing))
goto exit; goto exit;
// expect the same back // expect the same back
libp2p_net_multistream_read(session, &results, multistream_default_timeout); libp2p_net_multistream_read(session, &results, multistream_default_timeout);

View file

@ -82,8 +82,11 @@ int libp2p_routing_dht_protobuf_message(struct KademliaMessage* message, unsigne
int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) {
int retVal = 0; int retVal = 0;
char* protocol = "/ipfs/kad/1.0.0\n"; char* protocol = "/ipfs/kad/1.0.0\n";
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
struct StreamMessage* results = NULL; struct StreamMessage* results = NULL;
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { if (!context->default_stream->write(context, &outgoing)) {
libp2p_logger_error("dht_protocol", "Unable to write to stream during upgrade attempt.\n"); libp2p_logger_error("dht_protocol", "Unable to write to stream during upgrade attempt.\n");
goto exit; goto exit;
} }
@ -113,7 +116,10 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) {
*/ */
int libp2p_routing_dht_handshake(struct SessionContext* context) { int libp2p_routing_dht_handshake(struct SessionContext* context) {
char* protocol = "/ipfs/kad/1.0.0\n"; char* protocol = "/ipfs/kad/1.0.0\n";
return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)); struct StreamMessage outgoing;
outgoing.data = (uint8_t*) protocol;
outgoing.data_size = strlen(protocol);
return context->default_stream->write(context, &outgoing);
} }
/** /**
@ -477,7 +483,10 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
// if we have something to send, send it. // if we have something to send, send it.
if (result_buffer != NULL) { if (result_buffer != NULL) {
libp2p_logger_debug("dht_protocol", "Sending message back to caller. Message type: %d\n", message->message_type); libp2p_logger_debug("dht_protocol", "Sending message back to caller. Message type: %d\n", message->message_type);
if (!session->default_stream->write(session, result_buffer, result_buffer_size)) struct StreamMessage outgoing;
outgoing.data = result_buffer;
outgoing.data_size = result_buffer_size;
if (!session->default_stream->write(session, &outgoing))
goto exit; goto exit;
} else { } else {
libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type); libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type);
@ -527,6 +536,7 @@ int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, st
int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message) { int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message) {
size_t protobuf_size = 0, retVal = 0; size_t protobuf_size = 0, retVal = 0;
unsigned char* protobuf = NULL; unsigned char* protobuf = NULL;
struct StreamMessage outgoing;
protobuf_size = libp2p_message_protobuf_encode_size(message); protobuf_size = libp2p_message_protobuf_encode_size(message);
protobuf = (unsigned char*)malloc(protobuf_size); protobuf = (unsigned char*)malloc(protobuf_size);
@ -540,7 +550,9 @@ int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struc
} }
// send the message // send the message
if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) { outgoing.data = protobuf;
outgoing.data_size = protobuf_size;
if (!sessionContext->default_stream->write(sessionContext, &outgoing)) {
libp2p_logger_error("dht_protocol", "send_message: Attempted to write to Kademlia stream, but could not.\n"); libp2p_logger_error("dht_protocol", "send_message: Attempted to write to Kademlia stream, but could not.\n");
goto exit; goto exit;
} }

View file

@ -647,8 +647,10 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
*/ */
int libp2p_secio_send_protocol(struct SessionContext* session) { int libp2p_secio_send_protocol(struct SessionContext* session) {
char* protocol = "/secio/1.0.0\n"; char* protocol = "/secio/1.0.0\n";
int protocol_len = strlen(protocol); struct StreamMessage outgoing;
return session->default_stream->write(session, (unsigned char *)protocol, protocol_len); outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
return session->default_stream->write(session, &outgoing);
} }
/*** /***
@ -750,15 +752,14 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in
* Write to an encrypted stream * Write to an encrypted stream
* @param session the session parameters * @param session the session parameters
* @param bytes the bytes to write * @param bytes the bytes to write
* @param num_bytes the number of bytes to write
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_secio_encrypted_write(void* stream_context, const unsigned char* bytes, size_t num_bytes) { int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) {
struct SessionContext* session = (struct SessionContext*) stream_context; struct SessionContext* session = (struct SessionContext*) stream_context;
// writer uses the local cipher and mac // writer uses the local cipher and mac
unsigned char* buffer = NULL; unsigned char* buffer = NULL;
size_t buffer_size = 0; size_t buffer_size = 0;
if (!libp2p_secio_encrypt(session, bytes, num_bytes, &buffer, &buffer_size)) { if (!libp2p_secio_encrypt(session, bytes->data, bytes->data_size, &buffer, &buffer_size)) {
libp2p_logger_error("secio", "secio_encrypt returned false.\n"); libp2p_logger_error("secio", "secio_encrypt returned false.\n");
return 0; return 0;
} }
@ -1170,8 +1171,10 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs
libp2p_secio_initialize_crypto(local_session); libp2p_secio_initialize_crypto(local_session);
// send their nonce to verify encryption works // send their nonce to verify encryption works
//libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Sending their nonce\n"); struct StreamMessage outgoing;
if (libp2p_secio_encrypted_write(local_session, (unsigned char*)local_session->remote_nonce, 16) <= 0) { outgoing.data = (uint8_t*)local_session->remote_nonce;
outgoing.data_size = 16;
if (libp2p_secio_encrypted_write(local_session, &outgoing) <= 0) {
libp2p_logger_error("secio", "Encrytped write returned 0 or less.\n"); libp2p_logger_error("secio", "Encrytped write returned 0 or less.\n");
goto exit; goto exit;
} }

View file

@ -40,9 +40,10 @@ int test_multistream_get_list() {
goto exit; goto exit;
// try to respond something, ls command // try to respond something, ls command
const unsigned char* out = (unsigned char*)"ls\n"; struct StreamMessage outgoing;
outgoing.data = (uint8_t*)"ls\n";
if (libp2p_net_multistream_write(&session, out, strlen((char*)out)) <= 0) outgoing.data_size = 3;
if (libp2p_net_multistream_write(&session, &outgoing) <= 0)
goto exit; goto exit;
// retrieve response // retrieve response

View file

@ -153,7 +153,10 @@ int test_secio_handshake() {
} }
// now attempt an "ls" // now attempt an "ls"
if (libp2p_net_multistream_write(secure_session, (unsigned char*)"ls\n", 3) == 0) { struct StreamMessage outgoing;
outgoing.data = (uint8_t*)"ls\n";
outgoing.data_size = 3;
if (libp2p_net_multistream_write(secure_session, &outgoing) == 0) {
fprintf(stdout, "Unable to send ls to multistream\n"); fprintf(stdout, "Unable to send ls to multistream\n");
goto exit; goto exit;
} }
@ -172,7 +175,9 @@ int test_secio_handshake() {
results = NULL; results = NULL;
// try to yamux // try to yamux
char* yamux_string = "/yamux/1.0.0\n"; char* yamux_string = "/yamux/1.0.0\n";
if (!libp2p_net_multistream_write(secure_session, (uint8_t*)yamux_string, strlen(yamux_string))) { outgoing.data = (uint8_t*)yamux_string;
outgoing.data_size = strlen(yamux_string);
if (!libp2p_net_multistream_write(secure_session, &outgoing)) {
libp2p_logger_error("test_secio", "Unable to send yamux protocol request\n"); libp2p_logger_error("test_secio", "Unable to send yamux protocol request\n");
goto exit; goto exit;
} }
@ -489,7 +494,10 @@ int test_secio_handshake_go() {
} }
// now attempt an "ls" // now attempt an "ls"
if (libp2p_net_multistream_write(secure_session, (unsigned char*)"ls\n", 3) == 0) { struct StreamMessage outgoing;
outgoing.data = (uint8_t*)"ls\n";
outgoing.data_size = 3;
if (libp2p_net_multistream_write(secure_session, &outgoing) == 0) {
fprintf(stdout, "Unable to send ls to multistream\n"); fprintf(stdout, "Unable to send ls to multistream\n");
goto exit; goto exit;
} }

View file

@ -104,10 +104,13 @@ ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err)
session->closed = 1; session->closed = 1;
int sz = sizeof(struct yamux_frame); struct StreamMessage outgoing;
if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) outgoing.data = (uint8_t*)&f;
outgoing.data_size = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, &outgoing))
return 0; return 0;
return sz; return outgoing.data_size;
} }
/*** /***
@ -133,10 +136,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po
if (!timespec_get(&session->since_ping, TIME_UTC)) if (!timespec_get(&session->since_ping, TIME_UTC))
return -EACCES; return -EACCES;
int sz = sizeof(struct yamux_frame); struct StreamMessage outgoing;
if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) outgoing.data = (uint8_t*)&f;
outgoing.data_size = sizeof(struct yamux_frame);
if (!session->session_context->default_stream->write(session->session_context, &outgoing))
return 0; return 0;
return sz; return outgoing.data_size;
} }
/** /**

View file

@ -72,6 +72,21 @@ FOUND:;
return st; return st;
} }
/**
* Write a frame to the network
* @param yamux_stream the stream context
* @param f the frame
*/
int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f) {
encode_frame(f);
struct StreamMessage outgoing;
outgoing.data = (uint8_t*)f;
outgoing.data_size = sizeof(struct yamux_frame);
if (!yamux_stream->session->session_context->default_stream->write(yamux_stream->session->session_context, &outgoing))
return 0;
return outgoing.data_size;
}
/*** /***
* Initialize a stream between 2 peers * Initialize a stream between 2 peers
* @param stream the stream to initialize * @param stream the stream to initialize
@ -93,11 +108,7 @@ ssize_t yamux_stream_init(struct yamux_stream* stream)
stream->state = yamux_stream_syn_sent; stream->state = yamux_stream_syn_sent;
encode_frame(&f); return yamux_write_frame(stream, &f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
} }
/*** /***
@ -120,11 +131,7 @@ ssize_t yamux_stream_close(struct yamux_stream* stream)
stream->state = yamux_stream_closing; stream->state = yamux_stream_closing;
encode_frame(&f); return yamux_write_frame(stream, &f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
} }
/** /**
@ -147,11 +154,7 @@ ssize_t yamux_stream_reset(struct yamux_stream* stream)
stream->state = yamux_stream_closed; stream->state = yamux_stream_closed;
encode_frame(&f); return yamux_write_frame(stream, &f);
int sz = sizeof(struct yamux_frame);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
} }
static enum yamux_frame_flags get_flags(struct yamux_stream* stream) static enum yamux_frame_flags get_flags(struct yamux_stream* stream)
@ -188,12 +191,8 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta)
.streamid = stream->id, .streamid = stream->id,
.length = (uint32_t)delta .length = (uint32_t)delta
}; };
encode_frame(&f);
int sz = sizeof(struct yamux_frame); return yamux_write_frame(stream, &f);
if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz))
return 0;
return sz;
} }
/*** /***
@ -236,8 +235,10 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo
memcpy(sendd, &f, sizeof(struct yamux_frame)); memcpy(sendd, &f, sizeof(struct yamux_frame));
memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv); memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv);
int sz = adv + sizeof(struct yamux_frame); struct StreamMessage outgoing;
if (!s->session_context->default_stream->write(s->session_context, (uint8_t*)sendd, sz)) outgoing.data = (uint8_t*)sendd;
outgoing.data_size = adv + sizeof(struct yamux_frame);
if (!s->session_context->default_stream->write(s->session_context, &outgoing))
return adv; return adv;
data += adv; data += adv;

View file

@ -58,7 +58,10 @@ void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8
*/ */
int yamux_send_protocol(struct SessionContext* context) { int yamux_send_protocol(struct SessionContext* context) {
char* protocol = "/yamux/1.0.0\n"; char* protocol = "/yamux/1.0.0\n";
if (!context->default_stream->write(context, (uint8_t*)protocol, strlen(protocol))) struct StreamMessage outgoing;
outgoing.data = (uint8_t*)protocol;
outgoing.data_size = strlen(protocol);
if (!context->default_stream->write(context, &outgoing))
return 0; return 0;
return 1; return 1;
} }