passing network timeouts to read methods

This commit is contained in:
John Jones 2017-04-17 14:03:27 -05:00
parent caf02463c6
commit c1e7131c7c
12 changed files with 59 additions and 30 deletions

View file

@ -19,7 +19,7 @@ int libp2p_conn_tcp_can_handle(const struct MultiAddress* addr) {
int libp2p_conn_tcp_read(const struct Connection* connection, char** out, size_t* num_bytes) { int libp2p_conn_tcp_read(const struct Connection* connection, char** out, size_t* num_bytes) {
int buffer_size = 65535; int buffer_size = 65535;
*out = (char*)malloc(buffer_size); *out = (char*)malloc(buffer_size);
ssize_t bytes = socket_read(connection->socket_handle, *out, buffer_size, 0); ssize_t bytes = socket_read(connection->socket_handle, *out, buffer_size, 0, 5);
*num_bytes = bytes; *num_bytes = bytes;
return bytes > 0; return bytes > 0;
} }

View file

@ -18,9 +18,10 @@
* @param socket_fd the socket file descriptor * @param socket_fd the socket file descriptor
* @param data the data to send * @param data the data to send
* @param data_length the length of the data * @param data_length the length of the data
* @param timeout_secs number of seconds before read gives up. Will return 0 data length.
* @returns the number of bytes written * @returns the number of bytes written
*/ */
int libp2p_net_multistream_read(void* stream_context, unsigned char** data, size_t* data_length); int libp2p_net_multistream_read(void* stream_context, unsigned char** data, size_t* data_length, int timeout_secs);
/** /**
* Write to an open multistream host * Write to an open multistream host
* @param socket_fd the socket file descriptor * @param socket_fd the socket file descriptor

View file

@ -12,7 +12,7 @@
int socket_local4(int s, uint32_t *ip, uint16_t *port); int socket_local4(int s, uint32_t *ip, uint16_t *port);
int socket_connect4(int s, uint32_t ip, uint16_t port); int socket_connect4(int s, uint32_t ip, uint16_t port);
int socket_listen(int s, uint32_t *localip, uint16_t *localport); int socket_listen(int s, uint32_t *localip, uint16_t *localport);
ssize_t socket_read(int s, char *buf, size_t len, int flags); ssize_t socket_read(int s, char *buf, size_t len, int flags, int timeout_secs);
ssize_t socket_write(int s, const char *buf, size_t len, int flags); ssize_t socket_write(int s, const char *buf, size_t len, int flags);
/** /**
* Used to send the size of the next transmission for "framed" transmissions. NOTE: This will send in big endian format * Used to send the size of the next transmission for "framed" transmissions. NOTE: This will send in big endian format

View file

@ -15,9 +15,10 @@ struct Stream {
* @param stream the stream context (usually a SessionContext pointer) * @param stream the stream context (usually a SessionContext pointer)
* @param buffer where to put the results * @param buffer where to put the results
* @param bytes_read how many bytes were read * @param bytes_read how many bytes were read
* @param timeout_secs number of seconds before a timeout
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int (*read)(void* stream_context, unsigned char** buffer, size_t* bytes_read); int (*read)(void* stream_context, unsigned char** buffer, size_t* bytes_read, int timeout_secs);
/** /**
* Writes to a stream * Writes to a stream

View file

@ -22,8 +22,8 @@ struct Libp2pMessage {
char* key; // protobuf field 2 char* key; // protobuf field 2
size_t key_size; size_t key_size;
struct Libp2pRecord* record; // protobuf field 3 struct Libp2pRecord* record; // protobuf field 3
struct Libp2pLinkedList* closer_peer_head; // protobuf field 8 struct Libp2pLinkedList* closer_peer_head; // protobuf field 8 linked list of Libp2pPeers
struct Libp2pLinkedList* provider_peer_head; // protobuf field 9 struct Libp2pLinkedList* provider_peer_head; // protobuf field 9 linked list of Libp2pPeers
int32_t cluster_level_raw; // protobuf field 10 int32_t cluster_level_raw; // protobuf field 10
}; };

View file

@ -55,9 +55,10 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data
* @param socket_fd the socket file descriptor * @param socket_fd the socket file descriptor
* @param results where to put the results. NOTE: this memory is allocated * @param results where to put the results. NOTE: this memory is allocated
* @param results_size the size of the results in bytes * @param results_size the size of the results in bytes
* @param timeout_secs the seconds before a timeout
* @returns number of bytes received * @returns number of bytes received
*/ */
int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size) { int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size, int timeout_secs) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct SessionContext* session_context = (struct SessionContext*)stream_context;
struct Stream* stream = session_context->insecure_stream; struct Stream* stream = session_context->insecure_stream;
int bytes = 0; int bytes = 0;
@ -69,7 +70,10 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s
// first read the varint // first read the varint
while(1) { while(1) {
unsigned char c; unsigned char c;
bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0); bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0, timeout_secs);
if (bytes == 0) { // timeout
return 0;
}
pos[0] = c; pos[0] = c;
if (c >> 7 == 0) { if (c >> 7 == 0) {
pos[1] = 0; pos[1] = 0;
@ -83,10 +87,10 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s
left = num_bytes_requested; left = num_bytes_requested;
do { do {
bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0); bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0, timeout_secs);
if (bytes < 0) { if (bytes < 0) {
bytes = 0; bytes = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ( (errno == EAGAIN)) {
// do something intelligent // do something intelligent
} else { } else {
return 0; return 0;
@ -141,7 +145,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
session.default_stream = stream; session.default_stream = stream;
// try to receive the protocol id // try to receive the protocol id
return_result = libp2p_net_multistream_read(&session, &results, &results_size); return_result = libp2p_net_multistream_read(&session, &results, &results_size, 5);
if (return_result == 0 || results_size < 1) if (return_result == 0 || results_size < 1)
goto exit; goto exit;
@ -177,7 +181,7 @@ int libp2p_net_multistream_negotiate(struct Stream* stream) {
if (!libp2p_net_multistream_write(&secure_session, (unsigned char*)protocolID, strlen(protocolID))) if (!libp2p_net_multistream_write(&secure_session, (unsigned char*)protocolID, strlen(protocolID)))
goto exit; goto exit;
// expect the same back // expect the same back
libp2p_net_multistream_read(&secure_session, &results, &results_length); libp2p_net_multistream_read(&secure_session, &results, &results_length, 5);
if (results_length == 0) if (results_length == 0)
goto exit; goto exit;
if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0) if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0)

View file

@ -140,8 +140,13 @@ int socket_listen(int s, uint32_t *localip, uint16_t *localport)
* to use something else before or after it can be done here instead of * to use something else before or after it can be done here instead of
* outside the lib. * outside the lib.
*/ */
ssize_t socket_read(int s, char *buf, size_t len, int flags) ssize_t socket_read(int s, char *buf, size_t len, int flags, int num_secs)
{ {
struct timeval tv;
tv.tv_sec = num_secs;
tv.tv_usec = 0;
setsockopt(s, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(struct timeval));
return recv(s, buf, len, flags); return recv(s, buf, len, flags);
} }

View file

@ -11,7 +11,7 @@ int libp2p_nodeio_upgrade_stream(struct SessionContext* context) {
size_t results_size = 0; size_t results_size = 0;
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)))
goto exit; goto exit;
if (!context->default_stream->read(context, &results, &results_size)) if (!context->default_stream->read(context, &results, &results_size, 5))
goto exit; goto exit;
if (results_size != strlen(protocol)) if (results_size != strlen(protocol))
goto exit; goto exit;
@ -38,7 +38,7 @@ int libp2p_nodeio_upgrade_stream(struct SessionContext* context) {
int libp2p_nodeio_get(struct SessionContext* context, unsigned char* hash, int hash_length, unsigned char** results, size_t* results_size) { int libp2p_nodeio_get(struct SessionContext* context, unsigned char* hash, int hash_length, unsigned char** results, size_t* results_size) {
if (!context->default_stream->write(context, hash, hash_length)) if (!context->default_stream->write(context, hash, hash_length))
return 0; return 0;
if (!context->default_stream->read(context, results, results_size)) if (!context->default_stream->read(context, results, results_size, 5))
return 0; return 0;
return 1; return 1;
} }

View file

@ -31,12 +31,30 @@ struct ProviderStore* libp2p_providerstore_new() {
return out; return out;
} }
void libp2p_providerstore_entry_free(struct ProviderEntry* in) {
if (in != NULL) {
if (in->hash != NULL) {
free(in->hash);
in->hash_size = 0;
}
if (in->peer_id != NULL) {
free(in->peer_id);
in->peer_id_size = 0;
}
free(in);
}
}
/*** /***
* Clean resources used by a ProviderStore * Clean resources used by a ProviderStore
* @param in the ProviderStore to clean up * @param in the ProviderStore to clean up
*/ */
void libp2p_providerstore_free(struct ProviderStore* in) { void libp2p_providerstore_free(struct ProviderStore* in) {
if (in != NULL) { if (in != NULL) {
for(int i = 0; i < in->provider_entries->total; i++) {
struct ProviderEntry* entry = libp2p_utils_vector_get(in->provider_entries, i);
libp2p_providerstore_entry_free(entry);
}
libp2p_utils_vector_free(in->provider_entries); libp2p_utils_vector_free(in->provider_entries);
free(in); free(in);
in = NULL; in = NULL;

View file

@ -42,7 +42,7 @@ int libp2p_routing_dht_upgrade_stream(struct SessionContext* context) {
size_t results_size = 0; size_t results_size = 0;
if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol))) if (!context->default_stream->write(context, (unsigned char*)protocol, strlen(protocol)))
goto exit; goto exit;
if (!context->default_stream->read(context, &results, &results_size)) if (!context->default_stream->read(context, &results, &results_size, 5))
goto exit; goto exit;
if (results_size != strlen(protocol)) if (results_size != strlen(protocol))
goto exit; goto exit;
@ -221,8 +221,10 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
} }
libp2p_logger_error("dht_protocol", "add_provider returning false\n"); libp2p_logger_error("dht_protocol", "add_provider returning false\n");
} }
/*
if (peer != NULL) if (peer != NULL)
libp2p_peer_free(peer); libp2p_peer_free(peer);
*/
return retVal; return retVal;
} }
@ -321,7 +323,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
struct Libp2pMessage* message = NULL; struct Libp2pMessage* message = NULL;
// read from stream // read from stream
if (!session->default_stream->read(session, &buffer, &buffer_size)) if (!session->default_stream->read(session, &buffer, &buffer_size, 5))
goto exit; goto exit;
// unprotobuf // unprotobuf
if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message)) if (!libp2p_message_protobuf_decode(buffer, buffer_size, &message))
@ -362,9 +364,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
free(buffer); free(buffer);
if (result_buffer != NULL) if (result_buffer != NULL)
free(result_buffer); free(result_buffer);
/* JMJ Debugging
if (message != NULL) if (message != NULL)
libp2p_message_free(message); libp2p_message_free(message);
*/
return retVal; return retVal;
} }

View file

@ -463,7 +463,7 @@ int libp2p_secio_unencrypted_write(struct SessionContext* session, unsigned char
* @param results_size the size of the results * @param results_size the size of the results
* @returns the number of bytes read * @returns the number of bytes read
*/ */
int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size) { int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char** results, size_t* results_size, int timeout_secs) {
uint32_t buffer_size; uint32_t buffer_size;
// first read the 4 byte integer // first read the 4 byte integer
@ -472,7 +472,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
int read = 0; int read = 0;
int read_this_time = 0; int read_this_time = 0;
do { do {
read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0); read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), &size[read], 1, 0, timeout_secs);
if (read_this_time < 0) { if (read_this_time < 0) {
read_this_time = 0; read_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
@ -501,7 +501,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
*results = malloc(left); *results = malloc(left);
unsigned char* ptr = *results; unsigned char* ptr = *results;
do { do {
read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0); read_this_time = socket_read(*((int*)session->insecure_stream->socket_descriptor), (char*)&ptr[read], left, 0, timeout_secs);
if (read_this_time < 0) { if (read_this_time < 0) {
read_this_time = 0; read_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
@ -628,13 +628,13 @@ int libp2p_secio_decrypt(const struct SessionContext* session, const unsigned ch
* @param num_bytes the number of bytes read from the stream * @param num_bytes the number of bytes read from the stream
* @returns the number of bytes read * @returns the number of bytes read
*/ */
int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, size_t* num_bytes) { int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, size_t* num_bytes, int timeout_secs) {
struct SessionContext* session = (struct SessionContext*)stream_context; struct SessionContext* session = (struct SessionContext*)stream_context;
// reader uses the remote cipher and mac // reader uses the remote cipher and mac
// read the data // read the data
unsigned char* incoming = NULL; unsigned char* incoming = NULL;
size_t incoming_size = 0; size_t incoming_size = 0;
if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size) <= 0) if (libp2p_secio_unencrypted_read(session, &incoming, &incoming_size, timeout_secs) <= 0)
return 0; return 0;
return libp2p_secio_decrypt(session, incoming, incoming_size, bytes, num_bytes); return libp2p_secio_decrypt(session, incoming, incoming_size, bytes, num_bytes);
} }
@ -691,7 +691,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
if (!remote_requested) { if (!remote_requested) {
// we should get back the secio confirmation // we should get back the secio confirmation
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading protocol response"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading protocol response");
bytes_written = libp2p_net_multistream_read(local_session, &results, &results_size); bytes_written = libp2p_net_multistream_read(local_session, &results, &results_size, 20);
if (bytes_written < 5 || strstr((char*)results, "secio") == NULL) if (bytes_written < 5 || strstr((char*)results, "secio") == NULL)
goto exit; goto exit;
@ -753,7 +753,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
// now receive the proposal from the new connection // now receive the proposal from the new connection
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "receiving propose_in"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "receiving propose_in");
bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size); bytes_written = libp2p_secio_unencrypted_read(local_session, &propose_in_bytes, &propose_in_size, 10);
if (bytes_written <= 0) if (bytes_written <= 0)
goto exit; goto exit;
@ -836,7 +836,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
// receive Exchange packet // receive Exchange packet
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchagne packet"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Reading exchagne packet");
bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size); bytes_written = libp2p_secio_unencrypted_read(local_session, &results, &results_size, 10);
if (bytes_written == 0) if (bytes_written == 0)
goto exit; goto exit;
libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in); libp2p_secio_exchange_protobuf_decode(results, results_size, &exchange_in);
@ -904,7 +904,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
// receive our nonce to verify encryption works // receive our nonce to verify encryption works
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce"); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Receiving our nonce");
int bytes_read = libp2p_secio_encrypted_read(local_session, &results, &results_size); int bytes_read = libp2p_secio_encrypted_read(local_session, &results, &results_size, 10);
if (bytes_read <= 0) { if (bytes_read <= 0) {
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Encrypted read returned %d", bytes_read); libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Encrypted read returned %d", bytes_read);
goto exit; goto exit;

View file

@ -41,7 +41,7 @@ int test_multistream_get_list() {
goto exit; goto exit;
// retrieve response // retrieve response
retVal = libp2p_net_multistream_read(&session, &response, &response_size); retVal = libp2p_net_multistream_read(&session, &response, &response_size, 5);
if (retVal <= 0) if (retVal <= 0)
goto exit; goto exit;