diff --git a/conn/tcp_transport_dialer.c b/conn/tcp_transport_dialer.c index 30d4bbb..c728213 100644 --- a/conn/tcp_transport_dialer.c +++ b/conn/tcp_transport_dialer.c @@ -19,7 +19,7 @@ int libp2p_conn_tcp_can_handle(const struct MultiAddress* addr) { int libp2p_conn_tcp_read(const struct Connection* connection, char** out, size_t* num_bytes) { int buffer_size = 65535; *out = (char*)malloc(buffer_size); - ssize_t bytes = socket_read(connection->socket_handle, *out, buffer_size, 0); + ssize_t bytes = socket_read(connection->socket_handle, *out, buffer_size, 0, 5); *num_bytes = bytes; return bytes > 0; } diff --git a/include/libp2p/net/multistream.h b/include/libp2p/net/multistream.h index 7babfc4..70ebc4f 100644 --- a/include/libp2p/net/multistream.h +++ b/include/libp2p/net/multistream.h @@ -18,9 +18,10 @@ * @param socket_fd the socket file descriptor * @param data the data to send * @param data_length the length of the data + * @param timeout_secs number of seconds before read gives up. Will return 0 data length. * @returns the number of bytes written */ -int libp2p_net_multistream_read(void* stream_context, unsigned char** data, size_t* data_length); +int libp2p_net_multistream_read(void* stream_context, unsigned char** data, size_t* data_length, int timeout_secs); /** * Write to an open multistream host * @param socket_fd the socket file descriptor diff --git a/include/libp2p/net/p2pnet.h b/include/libp2p/net/p2pnet.h index 9c80b45..72a8b31 100644 --- a/include/libp2p/net/p2pnet.h +++ b/include/libp2p/net/p2pnet.h @@ -12,7 +12,7 @@ int socket_local4(int s, uint32_t *ip, uint16_t *port); int socket_connect4(int s, uint32_t ip, uint16_t port); int socket_listen(int s, uint32_t *localip, uint16_t *localport); - ssize_t socket_read(int s, char *buf, size_t len, int flags); + ssize_t socket_read(int s, char *buf, size_t len, int flags, int timeout_secs); ssize_t socket_write(int s, const char *buf, size_t len, int flags); /** * Used to send the size of the next transmission for "framed" transmissions. NOTE: This will send in big endian format diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index c0a99b7..b04cb8d 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -15,9 +15,10 @@ struct Stream { * @param stream the stream context (usually a SessionContext pointer) * @param buffer where to put the results * @param bytes_read how many bytes were read + * @param timeout_secs number of seconds before a timeout * @returns true(1) on success, false(0) otherwise */ - int (*read)(void* stream_context, unsigned char** buffer, size_t* bytes_read); + int (*read)(void* stream_context, unsigned char** buffer, size_t* bytes_read, int timeout_secs); /** * Writes to a stream diff --git a/include/libp2p/record/message.h b/include/libp2p/record/message.h index 8d3f6aa..11bfe76 100644 --- a/include/libp2p/record/message.h +++ b/include/libp2p/record/message.h @@ -22,8 +22,8 @@ struct Libp2pMessage { char* key; // protobuf field 2 size_t key_size; struct Libp2pRecord* record; // protobuf field 3 - struct Libp2pLinkedList* closer_peer_head; // protobuf field 8 - struct Libp2pLinkedList* provider_peer_head; // protobuf field 9 + struct Libp2pLinkedList* closer_peer_head; // protobuf field 8 linked list of Libp2pPeers + struct Libp2pLinkedList* provider_peer_head; // protobuf field 9 linked list of Libp2pPeers int32_t cluster_level_raw; // protobuf field 10 }; diff --git a/net/multistream.c b/net/multistream.c index 8569dd8..73375de 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -55,9 +55,10 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data * @param socket_fd the socket file descriptor * @param results where to put the results. NOTE: this memory is allocated * @param results_size the size of the results in bytes + * @param timeout_secs the seconds before a timeout * @returns number of bytes received */ -int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size) { +int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size, int timeout_secs) { struct SessionContext* session_context = (struct SessionContext*)stream_context; struct Stream* stream = session_context->insecure_stream; int bytes = 0; @@ -69,7 +70,10 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s // first read the varint while(1) { unsigned char c; - bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0); + bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0, timeout_secs); + if (bytes == 0) { // timeout + return 0; + } pos[0] = c; if (c >> 7 == 0) { pos[1] = 0; @@ -83,10 +87,10 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s left = num_bytes_requested; do { - bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0); + bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0, timeout_secs); if (bytes < 0) { bytes = 0; - if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { + if ( (errno == EAGAIN)) { // do something intelligent } else { return 0; @@ -141,7 +145,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) { session.default_stream = stream; // try to receive the protocol id - return_result = libp2p_net_multistream_read(&session, &results, &results_size); + return_result = libp2p_net_multistream_read(&session, &results, &results_size, 5); if (return_result == 0 || results_size < 1) goto exit; @@ -177,7 +181,7 @@ int libp2p_net_multistream_negotiate(struct Stream* stream) { if (!libp2p_net_multistream_write(&secure_session, (unsigned char*)protocolID, strlen(protocolID))) goto exit; // expect the same back - libp2p_net_multistream_read(&secure_session, &results, &results_length); + libp2p_net_multistream_read(&secure_session, &results, &results_length, 5); if (results_length == 0) goto exit; if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0) diff --git a/net/socket.c b/net/socket.c index 8c99973..05ce15d 100644 --- a/net/socket.c +++ b/net/socket.c @@ -140,9 +140,14 @@ int socket_listen(int s, uint32_t *localip, uint16_t *localport) * to use something else before or after it can be done here instead of * outside the lib. */ -ssize_t socket_read(int s, char *buf, size_t len, int flags) +ssize_t socket_read(int s, char *buf, size_t len, int flags, int num_secs) { - return recv(s, buf, len, flags); + struct timeval tv; + tv.tv_sec = num_secs; + tv.tv_usec = 0; + setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(struct timeval)); + + return recv(s, buf, len, flags); } /* Same reason as socket_read, but to send data instead of receive. diff --git a/nodeio/nodeio.c b/nodeio/nodeio.c index a9d37e5..bea337d 100644 --- a/nodeio/nodeio.c +++ b/nodeio/nodeio.c @@ -11,7 +11,7 @@ int libp2p_nodeio_upgrade_stream(struct SessionContext* context) { size_t results_size = 0; if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) goto exit; - if (!context->default_stream->read(context, &results, &results_size)) + if (!context->default_stream->read(context, &results, &results_size, 5)) goto exit; if (results_size != strlen(protocol)) goto exit; @@ -38,7 +38,7 @@ int libp2p_nodeio_upgrade_stream(struct SessionContext* context) { int libp2p_nodeio_get(struct SessionContext* context, unsigned char* hash, int hash_length, unsigned char** results, size_t* results_size) { if (!context->default_stream->write(context, hash, hash_length)) return 0; - if (!context->default_stream->read(context, results, results_size)) + if (!context->default_stream->read(context, results, results_size, 5)) return 0; return 1; } diff --git a/peer/providerstore.c b/peer/providerstore.c index 06e2b4c..c58d74a 100644 --- a/peer/providerstore.c +++ b/peer/providerstore.c @@ -31,12 +31,30 @@ struct ProviderStore* libp2p_providerstore_new() { return out; } +void libp2p_providerstore_entry_free(struct ProviderEntry* in) { + if (in != NULL) { + if (in->hash != NULL) { + free(in->hash); + in->hash_size = 0; + } + if (in->peer_id != NULL) { + free(in->peer_id); + in->peer_id_size = 0; + } + free(in); + } +} + /*** * Clean resources used by a ProviderStore * @param in the ProviderStore to clean up */ void libp2p_providerstore_free(struct ProviderStore* in) { if (in != NULL) { + for(int i = 0; i < in->provider_entries->total; i++) { + struct ProviderEntry* entry = libp2p_utils_vector_get(in->provider_entries, i); + libp2p_providerstore_entry_free(entry); + } libp2p_utils_vector_free(in->provider_entries); free(in); in = NULL; diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index f4f8311..1ca8ba2 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -42,7 +42,7 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) { size_t results_size = 0; if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) goto exit; - if (!context->default_stream->read(context, &results, &results_size)) + if (!context->default_stream->read(context, &results, &results_size, 5)) goto exit; if (results_size != strlen(protocol)) goto exit; @@ -221,8 +221,10 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc } libp2p_logger_error("dht_protocol", "add_provider returning false\n"); } + /* if (peer != NULL) libp2p_peer_free(peer); + */ return retVal; } @@ -321,7 +323,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee struct Libp2pMessage* message = NULL; // read from stream - if (!session->default_stream->read(session, &buffer, &buffer_size)) + if (!session->default_stream->read(session, &buffer, &buffer_size, 5)) goto exit; // unprotobuf if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message)) @@ -362,9 +364,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee free(buffer); if (result_buffer != NULL) free(result_buffer); - /* JMJ Debugging if (message != NULL) libp2p_message_free(message); - */ return retVal; } diff --git a/secio/secio.c b/secio/secio.c index ea98558..ba9d084 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -463,7 +463,7 @@ int libp2p_secio_unencrypted_write(struct SessionContext* session, unsigned char * @param results_size the size of the results * @returns the number of bytes read */ -int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size) { +int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size, int timeout_secs) { uint32_t buffer_size; // first read the 4 byte integer @@ -472,7 +472,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char* int read = 0; int read_this_time = 0; do { - read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0); + read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0, timeout_secs); if (read_this_time < 0) { read_this_time = 0; if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -501,7 +501,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char* *results = malloc(left); unsigned char* ptr = *results; do { - read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0); + read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0, timeout_secs); if (read_this_time < 0) { read_this_time = 0; if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { @@ -628,13 +628,13 @@ int libp2p_secio_decrypt(const struct SessionContext* session, const unsigned ch * @param num_bytes the number of bytes read from the stream * @returns the number of bytes read */ -int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, size_t* num_bytes) { +int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, size_t* num_bytes, int timeout_secs) { struct SessionContext* session = (struct SessionContext*)stream_context; // reader uses the remote cipher and mac // read the data unsigned char* incoming = NULL; size_t incoming_size = 0; - if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size) <= 0) + if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size, timeout_secs) <= 0) return 0; return libp2p_secio_decrypt(session, incoming, incoming_size, bytes, num_bytes); } @@ -691,7 +691,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva if (!remote_requested) { // we should get back the secio confirmation libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading protocol response"); - bytes_written = libp2p_net_multistream_read(local_session, &results, &results_size); + bytes_written = libp2p_net_multistream_read(local_session, &results, &results_size, 20); if (bytes_written < 5 || strstr((char*)results, "secio") == NULL) goto exit; @@ -753,7 +753,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva // now receive the proposal from the new connection libp2p_logger_log("secio", LOGLEVEL_DEBUG, "receiving propose_in"); - bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size); + bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size, 10); if (bytes_written <= 0) goto exit; @@ -836,7 +836,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva // receive Exchange packet libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchagne packet"); - bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size); + bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size, 10); if (bytes_written == 0) goto exit; libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); @@ -904,7 +904,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva // receive our nonce to verify encryption works libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce"); - int bytes_read = libp2p_secio_encrypted_read(local_session, &results, &results_size); + int bytes_read = libp2p_secio_encrypted_read(local_session, &results, &results_size, 10); if (bytes_read <= 0) { libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Encrypted read returned %d", bytes_read); goto exit; diff --git a/test/test_multistream.h b/test/test_multistream.h index d744d35..85c64a8 100644 --- a/test/test_multistream.h +++ b/test/test_multistream.h @@ -41,7 +41,7 @@ int test_multistream_get_list() { goto exit; // retrieve response - retVal = libp2p_net_multistream_read(&session, &response, &response_size); + retVal = libp2p_net_multistream_read(&session, &response, &response_size, 5); if (retVal <= 0) goto exit;