diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 6cec932..5920abd 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -53,11 +53,10 @@ int libp2p_net_multistream_read(void* stream_context, struct StreamMessage** dat /** * Write to an open multistream host * @param socket_fd the socket file descriptor - * @param results where to put the results. NOTE: this memory is allocated - * @param results_size the size of the results in bytes + * @param msg the message to write * @returns true(1) on success, otherwise false(0) */ -int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_size); +int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg); /** * Connect to a multistream host, and this includes the multistream handshaking. diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index dc201f9..ee1dcaa 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -49,10 +49,9 @@ struct Stream { * Writes to a stream * @param stream the stream context (usually a SessionContext pointer) * @param buffer what to write - * @param how much to write * @returns true(1) on success, false(0) otherwise */ - int (*write)(void* stream_context, const unsigned char* buffer, size_t buffer_size); + int (*write)(void* stream_context, struct StreamMessage* buffer); /** * Closes a stream diff --git a/net/multistream.c b/net/multistream.c index 8ce1121..1a94cb1 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -44,8 +44,11 @@ int libp2p_net_multistream_can_handle(const uint8_t *incoming, const size_t inco * @returns true(1) on success, false(0) otherwise */ int libp2p_net_multistream_send_protocol(struct SessionContext *context) { - char *protocol = "/multistream/1.0.0\n"; - if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { + const char* protocol = "/multistream/1.0.0\n"; + struct StreamMessage msg; + msg.data = (uint8_t*) protocol; + msg.data_size = strlen(protocol); + if (!context->default_stream->write(context, &msg)) { libp2p_logger_error("multistream", "send_protocol: Unable to send multistream protocol header.\n"); return 0; } @@ -191,27 +194,26 @@ int libp2p_net_multistream_peek(void* stream_context) { /** * Write to an open multistream host * @param stream_context the session context - * @param data the data to send - * @param data_length the length of the data + * @param msg the data to send * @returns the number of bytes written */ -int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_length) { +int libp2p_net_multistream_write(void* stream_context, struct StreamMessage* msg) { struct SessionContext* session_context = (struct SessionContext*)stream_context; struct Stream* stream = session_context->default_stream; int num_bytes = 0; - if (data_length > 0) { // only do this is if there is something to send + if (msg->data_size > 0) { // only do this is if there is something to send // first send the size unsigned char varint[12]; size_t varint_size = 0; - varint_encode(data_length, &varint[0], 12, &varint_size); + varint_encode(msg->data_size, &varint[0], 12, &varint_size); // now put the size with the data - unsigned char* buffer = (unsigned char*)malloc(data_length + varint_size); + unsigned char* buffer = (unsigned char*)malloc(msg->data_size + varint_size); if (buffer == NULL) return 0; - memset(buffer, 0, data_length + varint_size); + memset(buffer, 0, msg->data_size + varint_size); memcpy(buffer, varint, varint_size); - memcpy(&buffer[varint_size], data, data_length); + memcpy(&buffer[varint_size], msg->data, msg->data_size); // determine if this should run through the secio protocol or not if (session_context->secure_stream == NULL) { int sd = *((int*)stream->socket_descriptor); @@ -222,11 +224,14 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data return 0; } // then send the actual data - num_bytes += socket_write(sd, (char*)data, data_length, 0); + num_bytes += socket_write(sd, (char*)msg->data, msg->data_size, 0); session_context->last_comm_epoch = os_utils_gmtime(); } else { // write using secio - num_bytes = stream->write(stream_context, buffer, data_length + varint_size); + struct StreamMessage outgoing; + outgoing.data = buffer; + outgoing.data_size = msg->data_size + varint_size; + num_bytes = stream->write(stream_context, &outgoing); } free(buffer); } @@ -428,10 +433,13 @@ struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, */ int libp2p_net_multistream_negotiate(struct SessionContext* session) { const char* protocolID = "/multistream/1.0.0\n"; + struct StreamMessage outgoing; struct StreamMessage* results = NULL; int retVal = 0; // send the protocol id - if (!libp2p_net_multistream_write(session, (unsigned char*)protocolID, strlen(protocolID))) + outgoing.data = (uint8_t*)protocolID; + outgoing.data_size = strlen(protocolID); + if (!libp2p_net_multistream_write(session, &outgoing)) goto exit; // expect the same back libp2p_net_multistream_read(session, &results, multistream_default_timeout); diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index b1e5310..e645cbe 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -82,8 +82,11 @@ int libp2p_routing_dht_protobuf_message(struct KademliaMessage* message, unsigne int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { int retVal = 0; char* protocol = "/ipfs/kad/1.0.0\n"; + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)protocol; + outgoing.data_size = strlen(protocol); struct StreamMessage* results = NULL; - if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) { + if (!context->default_stream->write(context, &outgoing)) { libp2p_logger_error("dht_protocol", "Unable to write to stream during upgrade attempt.\n"); goto exit; } @@ -113,7 +116,10 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { */ int libp2p_routing_dht_handshake(struct SessionContext* context) { char* protocol = "/ipfs/kad/1.0.0\n"; - return context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)); + struct StreamMessage outgoing; + outgoing.data = (uint8_t*) protocol; + outgoing.data_size = strlen(protocol); + return context->default_stream->write(context, &outgoing); } /** @@ -477,7 +483,10 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee // if we have something to send, send it. if (result_buffer != NULL) { libp2p_logger_debug("dht_protocol", "Sending message back to caller. Message type: %d\n", message->message_type); - if (!session->default_stream->write(session, result_buffer, result_buffer_size)) + struct StreamMessage outgoing; + outgoing.data = result_buffer; + outgoing.data_size = result_buffer_size; + if (!session->default_stream->write(session, &outgoing)) goto exit; } else { libp2p_logger_debug("dht_protocol", "DhtHandleMessage: Nothing to send back. Kademlia call has been handled. Message type: %d\n", message->message_type); @@ -527,6 +536,7 @@ int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, st int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message) { size_t protobuf_size = 0, retVal = 0; unsigned char* protobuf = NULL; + struct StreamMessage outgoing; protobuf_size = libp2p_message_protobuf_encode_size(message); protobuf = (unsigned char*)malloc(protobuf_size); @@ -540,7 +550,9 @@ int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struc } // send the message - if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) { + outgoing.data = protobuf; + outgoing.data_size = protobuf_size; + if (!sessionContext->default_stream->write(sessionContext, &outgoing)) { libp2p_logger_error("dht_protocol", "send_message: Attempted to write to Kademlia stream, but could not.\n"); goto exit; } diff --git a/secio/secio.c b/secio/secio.c index d79ada7..430e9f9 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -647,8 +647,10 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char* */ int libp2p_secio_send_protocol(struct SessionContext* session) { char* protocol = "/secio/1.0.0\n"; - int protocol_len = strlen(protocol); - return session->default_stream->write(session, (unsigned char *)protocol, protocol_len); + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)protocol; + outgoing.data_size = strlen(protocol); + return session->default_stream->write(session, &outgoing); } /*** @@ -750,15 +752,14 @@ int libp2p_secio_encrypt(struct SessionContext* session, const unsigned char* in * Write to an encrypted stream * @param session the session parameters * @param bytes the bytes to write - * @param num_bytes the number of bytes to write * @returns the number of bytes written */ -int libp2p_secio_encrypted_write(void* stream_context, const unsigned char* bytes, size_t num_bytes) { +int libp2p_secio_encrypted_write(void* stream_context, struct StreamMessage* bytes) { struct SessionContext* session = (struct SessionContext*) stream_context; // writer uses the local cipher and mac unsigned char* buffer = NULL; size_t buffer_size = 0; - if (!libp2p_secio_encrypt(session, bytes, num_bytes, &buffer, &buffer_size)) { + if (!libp2p_secio_encrypt(session, bytes->data, bytes->data_size, &buffer, &buffer_size)) { libp2p_logger_error("secio", "secio_encrypt returned false.\n"); return 0; } @@ -1170,8 +1171,10 @@ int libp2p_secio_handshake(struct SessionContext* local_session, const struct Rs libp2p_secio_initialize_crypto(local_session); // send their nonce to verify encryption works - //libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Sending their nonce\n"); - if (libp2p_secio_encrypted_write(local_session, (unsigned char*)local_session->remote_nonce, 16) <= 0) { + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)local_session->remote_nonce; + outgoing.data_size = 16; + if (libp2p_secio_encrypted_write(local_session, &outgoing) <= 0) { libp2p_logger_error("secio", "Encrytped write returned 0 or less.\n"); goto exit; } diff --git a/test/test_multistream.h b/test/test_multistream.h index d839fe6..062f638 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -40,9 +40,10 @@ int test_multistream_get_list() { goto exit; // try to respond something, ls command - const unsigned char* out = (unsigned char*)"ls\n"; - - if (libp2p_net_multistream_write(&session, out, strlen((char*)out)) <= 0) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)"ls\n"; + outgoing.data_size = 3; + if (libp2p_net_multistream_write(&session, &outgoing) <= 0) goto exit; // retrieve response diff --git a/test/test_secio.h b/test/test_secio.h index f90639f..caf631f 100644 --- a/test/test_secio.h +++ b/test/test_secio.h @@ -153,7 +153,10 @@ int test_secio_handshake() { } // now attempt an "ls" - if (libp2p_net_multistream_write(secure_session, (unsigned char*)"ls\n", 3) == 0) { + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)"ls\n"; + outgoing.data_size = 3; + if (libp2p_net_multistream_write(secure_session, &outgoing) == 0) { fprintf(stdout, "Unable to send ls to multistream\n"); goto exit; } @@ -172,7 +175,9 @@ int test_secio_handshake() { results = NULL; // try to yamux char* yamux_string = "/yamux/1.0.0\n"; - if (!libp2p_net_multistream_write(secure_session, (uint8_t*)yamux_string, strlen(yamux_string))) { + outgoing.data = (uint8_t*)yamux_string; + outgoing.data_size = strlen(yamux_string); + if (!libp2p_net_multistream_write(secure_session, &outgoing)) { libp2p_logger_error("test_secio", "Unable to send yamux protocol request\n"); goto exit; } @@ -489,7 +494,10 @@ int test_secio_handshake_go() { } // now attempt an "ls" - if (libp2p_net_multistream_write(secure_session, (unsigned char*)"ls\n", 3) == 0) { + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)"ls\n"; + outgoing.data_size = 3; + if (libp2p_net_multistream_write(secure_session, &outgoing) == 0) { fprintf(stdout, "Unable to send ls to multistream\n"); goto exit; } diff --git a/yamux/session.c b/yamux/session.c index a6e2c81..61aaa31 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -104,10 +104,13 @@ ssize_t yamux_session_close(struct yamux_session* session, enum yamux_error err) session->closed = 1; - int sz = sizeof(struct yamux_frame); - if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)&f; + outgoing.data_size = sizeof(struct yamux_frame); + + if (!session->session_context->default_stream->write(session->session_context, &outgoing)) return 0; - return sz; + return outgoing.data_size; } /*** @@ -133,10 +136,12 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po if (!timespec_get(&session->since_ping, TIME_UTC)) return -EACCES; - int sz = sizeof(struct yamux_frame); - if (!session->session_context->default_stream->write(session->session_context, (uint8_t*)&f, sz)) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)&f; + outgoing.data_size = sizeof(struct yamux_frame); + if (!session->session_context->default_stream->write(session->session_context, &outgoing)) return 0; - return sz; + return outgoing.data_size; } /** diff --git a/yamux/stream.c b/yamux/stream.c index 3b62cec..5ceb321 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -72,6 +72,21 @@ FOUND:; return st; } +/** + * Write a frame to the network + * @param yamux_stream the stream context + * @param f the frame + */ +int yamux_write_frame(struct yamux_stream* yamux_stream, struct yamux_frame* f) { + encode_frame(f); + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)f; + outgoing.data_size = sizeof(struct yamux_frame); + if (!yamux_stream->session->session_context->default_stream->write(yamux_stream->session->session_context, &outgoing)) + return 0; + return outgoing.data_size; +} + /*** * Initialize a stream between 2 peers * @param stream the stream to initialize @@ -93,11 +108,7 @@ ssize_t yamux_stream_init(struct yamux_stream* stream) stream->state = yamux_stream_syn_sent; - encode_frame(&f); - int sz = sizeof(struct yamux_frame); - if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) - return 0; - return sz; + return yamux_write_frame(stream, &f); } /*** @@ -120,11 +131,7 @@ ssize_t yamux_stream_close(struct yamux_stream* stream) stream->state = yamux_stream_closing; - encode_frame(&f); - int sz = sizeof(struct yamux_frame); - if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) - return 0; - return sz; + return yamux_write_frame(stream, &f); } /** @@ -147,11 +154,7 @@ ssize_t yamux_stream_reset(struct yamux_stream* stream) stream->state = yamux_stream_closed; - encode_frame(&f); - int sz = sizeof(struct yamux_frame); - if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) - return 0; - return sz; + return yamux_write_frame(stream, &f); } static enum yamux_frame_flags get_flags(struct yamux_stream* stream) @@ -188,12 +191,8 @@ ssize_t yamux_stream_window_update(struct yamux_stream* stream, int32_t delta) .streamid = stream->id, .length = (uint32_t)delta }; - encode_frame(&f); - int sz = sizeof(struct yamux_frame); - if (!stream->session->session_context->default_stream->write(stream->session->session_context, (uint8_t*)&f, sz)) - return 0; - return sz; + return yamux_write_frame(stream, &f); } /*** @@ -236,8 +235,10 @@ ssize_t yamux_stream_write(struct yamux_stream* stream, uint32_t data_length, vo memcpy(sendd, &f, sizeof(struct yamux_frame)); memcpy(sendd + sizeof(struct yamux_frame), data, (size_t)adv); - int sz = adv + sizeof(struct yamux_frame); - if (!s->session_context->default_stream->write(s->session_context, (uint8_t*)sendd, sz)) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)sendd; + outgoing.data_size = adv + sizeof(struct yamux_frame); + if (!s->session_context->default_stream->write(s->session_context, &outgoing)) return adv; data += adv; diff --git a/yamux/yamux.c b/yamux/yamux.c index 189a39c..1ee34f0 100644 --- a/yamux/yamux.c +++ b/yamux/yamux.c @@ -58,7 +58,10 @@ void yamux_read_stream(struct yamux_stream* stream, ssize_t incoming_size, uint8 */ int yamux_send_protocol(struct SessionContext* context) { char* protocol = "/yamux/1.0.0\n"; - if (!context->default_stream->write(context, (uint8_t*)protocol, strlen(protocol))) + struct StreamMessage outgoing; + outgoing.data = (uint8_t*)protocol; + outgoing.data_size = strlen(protocol); + if (!context->default_stream->write(context, &outgoing)) return 0; return 1; }