Debugging journalio process

This commit is contained in:
jmjatlanta 2017-08-30 11:09:28 -05:00
parent ecb9f984ba
commit f0d82129ab
14 changed files with 169 additions and 47 deletions

View file

@ -40,6 +40,15 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data
*/ */
struct Stream* libp2p_net_multistream_connect(const char* hostname, int port); struct Stream* libp2p_net_multistream_connect(const char* hostname, int port);
/**
* Connect to a multistream host, and this includes the multistream handshaking.
* @param hostname the host
* @param port the port
* @param timeout_secs number of secs before timeout
* @returns the socket file descriptor of the connection, or -1 on error
*/
struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, int port, int timeout_secs);
/** /**
* Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function. * Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function.
* Servers should send the protocol ID, and then expect it back. * Servers should send the protocol ID, and then expect it back.
@ -56,7 +65,7 @@ int libp2p_net_multistream_negotiate(struct SessionContext* session);
* @param fd the socket file descriptor * @param fd the socket file descriptor
* @returns true(1) on success, false(0) if not * @returns true(1) on success, false(0) if not
*/ */
struct Libp2pMessage* libp2p_net_multistream_get_message(struct Stream* stream); struct KademliaMessage* libp2p_net_multistream_get_message(struct Stream* stream);
struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port); struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, int port);

View file

@ -10,6 +10,7 @@ int socket_read_select4(int socket_fd, int num_seconds);
int socket_accept4(int s, uint32_t *ip, uint16_t *port); int socket_accept4(int s, uint32_t *ip, uint16_t *port);
int socket_local4(int s, uint32_t *ip, uint16_t *port); int socket_local4(int s, uint32_t *ip, uint16_t *port);
int socket_connect4(int s, uint32_t ip, uint16_t port); int socket_connect4(int s, uint32_t ip, uint16_t port);
int socket_connect4_with_timeout(int s, uint32_t ip, uint16_t port, int timeout_secs);
int socket_listen(int s, uint32_t *localip, uint16_t *localport); int socket_listen(int s, uint32_t *localip, uint16_t *localport);
/*** /***

View file

@ -22,7 +22,7 @@ struct Stream {
/** /**
* Writes to a stream * Writes to a stream
* @param stream the stream context * @param stream the stream context (usually a SessionContext pointer)
* @param buffer what to write * @param buffer what to write
* @param how much to write * @param how much to write
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise

View file

@ -83,10 +83,18 @@ int libp2p_peer_compare(const struct Libp2pPeer* a, const struct Libp2pPeer* b);
/*** /***
* Determine if the passed in peer and id match * Determine if the passed in peer and id match
* @param in the peer to check * @param in the peer to check
* @param peer_id peer id, zero terminated string * @param peer_id peer id
* @param peer_size length of peer_id
* @returns true if peer matches * @returns true if peer matches
*/ */
int libp2p_peer_matches_id(struct Libp2pPeer* in, const unsigned char* peer_id); int libp2p_peer_matches_id(struct Libp2pPeer* in, const unsigned char* peer_id, int peer_size);
/***
* Convert peer id to null terminated string
* @param in the peer object
* @returns the peer id as a null terminated string
*/
char* libp2p_peer_id_to_string(struct Libp2pPeer* in);
/*** /***
* Determine if we are currently connected to this peer * Determine if we are currently connected to this peer

View file

@ -17,7 +17,7 @@ enum MessageType {
MESSAGE_TYPE_PING = 5 MESSAGE_TYPE_PING = 5
}; };
struct Libp2pMessage { struct KademliaMessage {
enum MessageType message_type; // protobuf field 1 (a varint) enum MessageType message_type; // protobuf field 1 (a varint)
char* key; // protobuf field 2 char* key; // protobuf field 2
size_t key_size; size_t key_size;
@ -31,20 +31,20 @@ struct Libp2pMessage {
* create a new Libp2pMessage struct * create a new Libp2pMessage struct
* @returns a new Libp2pMessage with default settings * @returns a new Libp2pMessage with default settings
*/ */
struct Libp2pMessage* libp2p_message_new(); struct KademliaMessage* libp2p_message_new();
/** /**
* Deallocate memory from a Message struct * Deallocate memory from a Message struct
* @param in the struct * @param in the struct
*/ */
void libp2p_message_free(struct Libp2pMessage* in); void libp2p_message_free(struct KademliaMessage* in);
/** /**
* determine the size necessary for a message struct to be protobuf'd * determine the size necessary for a message struct to be protobuf'd
* @param in the struct to be protobuf'd * @param in the struct to be protobuf'd
* @returns the size required * @returns the size required
*/ */
size_t libp2p_message_protobuf_encode_size(const struct Libp2pMessage* in); size_t libp2p_message_protobuf_encode_size(const struct KademliaMessage* in);
/** /**
* Encode a Message into a protobuf * Encode a Message into a protobuf
@ -54,7 +54,7 @@ size_t libp2p_message_protobuf_encode_size(const struct Libp2pMessage* in);
* @param bytes_written will hold the number of bytes written to buffer * @param bytes_written will hold the number of bytes written to buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_message_protobuf_encode(const struct Libp2pMessage* in, unsigned char* buffer, size_t max_buffer_size, size_t* bytes_written); int libp2p_message_protobuf_encode(const struct KademliaMessage* in, unsigned char* buffer, size_t max_buffer_size, size_t* bytes_written);
/** /**
* Convert a Libp2pMessage into protobuf format, * Convert a Libp2pMessage into protobuf format,
@ -64,7 +64,7 @@ int libp2p_message_protobuf_encode(const struct Libp2pMessage* in, unsigned char
* @param buffer_size the size written into buffer * @param buffer_size the size written into buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_message_protobuf_allocate_and_encode(const struct Libp2pMessage* in, unsigned char **buffer, size_t* buffer_size); int libp2p_message_protobuf_allocate_and_encode(const struct KademliaMessage* in, unsigned char **buffer, size_t* buffer_size);
/** /**
* turn a protobuf back into a message * turn a protobuf back into a message
@ -73,5 +73,5 @@ int libp2p_message_protobuf_allocate_and_encode(const struct Libp2pMessage* in,
* @param out the message * @param out the message
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_message_protobuf_decode(unsigned char* buffer, size_t buffer_size, struct Libp2pMessage** out); int libp2p_message_protobuf_decode(unsigned char* buffer, size_t buffer_size, struct KademliaMessage** out);

View file

@ -226,6 +226,17 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s
* @returns the socket file descriptor of the connection, or -1 on error * @returns the socket file descriptor of the connection, or -1 on error
*/ */
struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) { struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
return libp2p_net_multistream_connect_with_timeout(hostname, port, multistream_default_timeout);
}
/**
* Connect to a multistream host, and this includes the multistream handshaking.
* @param hostname the host
* @param port the port
* @param timeout_secs number of secs before timeout
* @returns the socket file descriptor of the connection, or -1 on error
*/
struct Stream* libp2p_net_multistream_connect_with_timeout(const char* hostname, int port, int timeout_secs) {
int retVal = -1, return_result = -1, socket = -1; int retVal = -1, return_result = -1, socket = -1;
unsigned char* results = NULL; unsigned char* results = NULL;
size_t results_size; size_t results_size;
@ -236,7 +247,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
socket = socket_open4(); socket = socket_open4();
// connect // connect
if (socket_connect4(socket, ip, port) != 0) if (socket_connect4_with_timeout(socket, ip, port, timeout_secs) != 0)
goto exit; goto exit;
// send the multistream handshake // send the multistream handshake
@ -252,7 +263,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
session.default_stream = stream; session.default_stream = stream;
// try to receive the protocol id // try to receive the protocol id
return_result = libp2p_net_multistream_read(&session, &results, &results_size, multistream_default_timeout); return_result = libp2p_net_multistream_read(&session, &results, &results_size, timeout_secs);
if (return_result == 0 || results_size < 1) if (return_result == 0 || results_size < 1)
goto exit; goto exit;

View file

@ -1,4 +1,5 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h>
#include "libp2p/utils/logger.h" #include "libp2p/utils/logger.h"
#include "libp2p/net/protocol.h" #include "libp2p/net/protocol.h"
@ -30,8 +31,12 @@ const struct Libp2pProtocolHandler* protocol_compare(const unsigned char* incomi
int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) { int libp2p_protocol_marshal(const unsigned char* incoming, size_t incoming_size, struct SessionContext* session, struct Libp2pVector* handlers) {
const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers); const struct Libp2pProtocolHandler* handler = protocol_compare(incoming, incoming_size, handlers);
if (handler == NULL) { if (handler == NULL) {
libp2p_logger_error("protocol", "Unable to find handler.\n"); char str[incoming_size + 1];
memcpy(str, incoming, incoming_size);
str[incoming_size] = 0;
libp2p_logger_error("protocol", "Unable to find handler for %s.\n", str);
return -1; return -1;
} }
//TODO: strip off the protocol?
return handler->HandleMessage(incoming, incoming_size, session, handler->context); return handler->HandleMessage(incoming, incoming_size, session, handler->context);
} }

View file

@ -8,7 +8,9 @@
#include <netdb.h> #include <netdb.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h>
#include "libp2p/utils/logger.h"
#include "libp2p/net/p2pnet.h" #include "libp2p/net/p2pnet.h"
/** /**
@ -103,16 +105,80 @@ int socket_local4(int s, uint32_t *ip, uint16_t *port)
* @param port the port number * @param port the port number
* @return 0 on success, otherwise -1 * @return 0 on success, otherwise -1
*/ */
int socket_connect4(int s, uint32_t ip, uint16_t port) int socket_connect4(int s, uint32_t ip, uint16_t port) {
return socket_connect4_with_timeout(s, ip, port, 10);
}
/***
* start a client connection.
* @param s the socket number
* @param ip the ip address
* @param port the port number
* @param timeout_secs the number of seconds before timeout
* @return 0 on success, otherwise -1
*/
int socket_connect4_with_timeout(int s, uint32_t ip, uint16_t port, int timeout_secs)
{ {
struct sockaddr_in sa; struct sockaddr_in sa;
//long args; // fctl args with O_NONBLOCK
//long orig_args; // fctl args;
memset(&sa, 0, sizeof sa); memset(&sa, 0, sizeof sa);
sa.sin_family = AF_INET; sa.sin_family = AF_INET;
sa.sin_port = htons(port); sa.sin_port = htons(port);
sa.sin_addr.s_addr = ip; sa.sin_addr.s_addr = ip;
return connect(s, (struct sockaddr *) &sa, sizeof sa); /*
// set to non blocking
orig_args = fcntl(s, F_GETFL, NULL);
if (orig_args < 0) {
// unable to get flags
libp2p_logger_error("socket", "Unable to get socket flags on connect.\n");
return -1;
}
args = orig_args;
args |= O_NONBLOCK;
if (args != orig_args) {
libp2p_logger_debug("socket", "Setting socket to non-blocking on connect.\n");
if (fcntl(s, F_SETFL, args) < 0) {
// unable to set flags
return -1;
}
} else {
libp2p_logger_debug("socket", "Socket already non-blocking during connect.\n");
}
*/
// connect
int retVal = connect(s, (struct sockaddr *) &sa, sizeof sa);
if (retVal == -1 && errno == EINPROGRESS) {
libp2p_logger_debug("socket", "Socket connect unsuccessful. Waiting to try again.\n");
// wait for timeout
sleep(timeout_secs);
retVal = connect(s, (struct sockaddr *) &sa, sizeof sa);
if (retVal == -1 && errno == EALREADY) {
libp2p_logger_debug("socket", "Socket connect completed.\n");
retVal = 0;
} else {
libp2p_logger_debug("socket", "Socket connect worked on second try.\n");
}
} else {
if (retVal == -1) {
libp2p_logger_debug("socket", "Socket connect failed with error %d.\n", errno);
}
}
/*
if ( retVal == 0 && args != orig_args) {
// set back to blocking
libp2p_logger_debug("socket", "Setting socket back to blocking.\n");
args = fcntl(s, F_GETFL, NULL);
args &= (~O_NONBLOCK);
fcntl(s, F_SETFL, args);
}
*/
return retVal;
} }
/** /**

View file

@ -35,9 +35,9 @@ struct Libp2pPeer* libp2p_peer_new_from_multiaddress(const struct MultiAddress*
struct Libp2pPeer* out = libp2p_peer_new(); struct Libp2pPeer* out = libp2p_peer_new();
char* id = multiaddress_get_peer_id(in); char* id = multiaddress_get_peer_id(in);
if (id != NULL) { if (id != NULL) {
out->id_size = strlen(id) + 1; out->id_size = strlen(id);
out->id = malloc(out->id_size); out->id = malloc(out->id_size);
strcpy(out->id, id); memcpy(out->id, id, out->id_size);
free(id); free(id);
} }
out->addr_head = libp2p_utils_linked_list_new(); out->addr_head = libp2p_utils_linked_list_new();
@ -52,9 +52,9 @@ struct Libp2pPeer* libp2p_peer_new_from_multiaddress(const struct MultiAddress*
void libp2p_peer_free(struct Libp2pPeer* in) { void libp2p_peer_free(struct Libp2pPeer* in) {
if (in != NULL) { if (in != NULL) {
if (in->addr_head != NULL && in->addr_head->item != NULL) { if (in->addr_head != NULL && in->addr_head->item != NULL) {
libp2p_logger_debug("peer", "Freeing peer %s\n", ((struct MultiAddress*)in->addr_head->item)->string); //libp2p_logger_debug("peer", "Freeing peer %s\n", ((struct MultiAddress*)in->addr_head->item)->string);
} else { } else {
libp2p_logger_debug("peer", "Freeing peer with no multiaddress.\n"); //libp2p_logger_debug("peer", "Freeing peer with no multiaddress.\n");
} }
if (in->id != NULL) if (in->id != NULL)
free(in->id); free(in->id);
@ -97,6 +97,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) {
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout) { int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout) {
libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer));
time_t now, prev = time(NULL); time_t now, prev = time(NULL);
// find an appropriate address // find an appropriate address
struct Libp2pLinkedList* current_address = peer->addr_head; struct Libp2pLinkedList* current_address = peer->addr_head;
@ -108,8 +109,9 @@ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* pee
continue; continue;
int port = multiaddress_get_ip_port(ma); int port = multiaddress_get_ip_port(ma);
peer->sessionContext = libp2p_session_context_new(); peer->sessionContext = libp2p_session_context_new();
peer->sessionContext->insecure_stream = libp2p_net_multistream_connect(ip, port); peer->sessionContext->insecure_stream = libp2p_net_multistream_connect_with_timeout(ip, port, timeout);
if (peer->sessionContext->insecure_stream == NULL) { if (peer->sessionContext->insecure_stream == NULL) {
libp2p_logger_debug("peer", "Unable to connect to IP %s and port %d for peer %s.\n", ip, port, libp2p_peer_id_to_string(peer));
free(ip); free(ip);
return 0; return 0;
} }
@ -118,6 +120,7 @@ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* pee
peer->connection_type = CONNECTION_TYPE_CONNECTED; peer->connection_type = CONNECTION_TYPE_CONNECTED;
} }
if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) { if (libp2p_secio_initiate_handshake(peer->sessionContext, privateKey, peerstore) <= 0) {
libp2p_logger_error("peer", "Attempted secio handshake, but failed for peer %s.\n", libp2p_peer_id_to_string(peer));
free(ip); free(ip);
return 0; return 0;
} }
@ -127,7 +130,11 @@ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* pee
if (now >= (prev + timeout)) if (now >= (prev + timeout))
break; break;
} // trying to connect } // trying to connect
return peer->connection_type == CONNECTION_TYPE_CONNECTED; int retVal = peer->connection_type == CONNECTION_TYPE_CONNECTED;
if (!retVal) {
libp2p_logger_debug("peer", "Attempted connect to %s but failed.\n", libp2p_peer_id_to_string(peer));
}
return retVal;
} }
/** /**
@ -176,14 +183,26 @@ struct Libp2pPeer* libp2p_peer_copy(const struct Libp2pPeer* in) {
* @param peer_id peer id, zero terminated string * @param peer_id peer id, zero terminated string
* @returns true if peer matches * @returns true if peer matches
*/ */
int libp2p_peer_matches_id(struct Libp2pPeer* in, const unsigned char* peer_id) { int libp2p_peer_matches_id(struct Libp2pPeer* in, const unsigned char* peer_id, int peer_size) {
if (strlen((char*)peer_id) == in->id_size) { if (peer_size == in->id_size) {
if (strncmp(in->id, (char*)peer_id, in->id_size) == 0) if (strncmp(in->id, (char*)peer_id, in->id_size) == 0)
return 1; return 1;
} }
return 0; return 0;
} }
static char string_retval[100];
/***
* Convert peer id to null terminated string
* @param in the peer object
* @returns the peer id as a null terminated string
*/
char* libp2p_peer_id_to_string(struct Libp2pPeer* in) {
memcpy(string_retval, in->id, in->id_size);
string_retval[in->id_size] = 0;
return string_retval;
}
/*** /***
* Determine if we are currently connected to this peer * Determine if we are currently connected to this peer
* @param in the peer to check * @param in the peer to check

View file

@ -16,8 +16,8 @@
* Allocate memory for a message * Allocate memory for a message
* @returns a new, allocated Libp2pMessage struct * @returns a new, allocated Libp2pMessage struct
*/ */
struct Libp2pMessage* libp2p_message_new() { struct KademliaMessage* libp2p_message_new() {
struct Libp2pMessage* out = (struct Libp2pMessage*)malloc(sizeof(struct Libp2pMessage)); struct KademliaMessage* out = (struct KademliaMessage*)malloc(sizeof(struct KademliaMessage));
if (out != NULL) { if (out != NULL) {
out->closer_peer_head = NULL; out->closer_peer_head = NULL;
out->cluster_level_raw = 0; out->cluster_level_raw = 0;
@ -34,7 +34,7 @@ struct Libp2pMessage* libp2p_message_new() {
* Frees all resources related to a Libp2pMessage * Frees all resources related to a Libp2pMessage
* @param in the incoming message * @param in the incoming message
*/ */
void libp2p_message_free(struct Libp2pMessage* in) { void libp2p_message_free(struct KademliaMessage* in) {
if (in != NULL) { if (in != NULL) {
// a linked list of peer structs // a linked list of peer structs
struct Libp2pLinkedList* current = in->closer_peer_head; struct Libp2pLinkedList* current = in->closer_peer_head;
@ -63,7 +63,7 @@ void libp2p_message_free(struct Libp2pMessage* in) {
} }
} }
size_t libp2p_message_protobuf_encode_size(const struct Libp2pMessage* in) { size_t libp2p_message_protobuf_encode_size(const struct KademliaMessage* in) {
// message type // message type
size_t retVal = 11; size_t retVal = 11;
// clusterlevelraw // clusterlevelraw
@ -95,7 +95,7 @@ size_t libp2p_message_protobuf_encode_size(const struct Libp2pMessage* in) {
* @param buffer_size the size of the buffer * @param buffer_size the size of the buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_message_protobuf_allocate_and_encode(const struct Libp2pMessage* in, unsigned char **buffer, size_t *buffer_size) { int libp2p_message_protobuf_allocate_and_encode(const struct KademliaMessage* in, unsigned char **buffer, size_t *buffer_size) {
*buffer_size = libp2p_message_protobuf_encode_size(in); *buffer_size = libp2p_message_protobuf_encode_size(in);
*buffer = malloc(*buffer_size); *buffer = malloc(*buffer_size);
if (*buffer == NULL) { if (*buffer == NULL) {
@ -111,7 +111,7 @@ int libp2p_message_protobuf_allocate_and_encode(const struct Libp2pMessage* in,
return retVal; return retVal;
} }
int libp2p_message_protobuf_encode(const struct Libp2pMessage* in, unsigned char* buffer, size_t max_buffer_size, size_t* bytes_written) { int libp2p_message_protobuf_encode(const struct KademliaMessage* in, unsigned char* buffer, size_t max_buffer_size, size_t* bytes_written) {
// data & data_size // data & data_size
size_t bytes_used = 0; size_t bytes_used = 0;
*bytes_written = 0; *bytes_written = 0;
@ -190,7 +190,7 @@ int libp2p_message_protobuf_encode(const struct Libp2pMessage* in, unsigned char
return 1; return 1;
} }
int libp2p_message_protobuf_decode(unsigned char* in, size_t in_size, struct Libp2pMessage** out) { int libp2p_message_protobuf_decode(unsigned char* in, size_t in_size, struct KademliaMessage** out) {
size_t pos = 0; size_t pos = 0;
int retVal = 0; int retVal = 0;
size_t buffer_size = 0; size_t buffer_size = 0;
@ -201,7 +201,7 @@ int libp2p_message_protobuf_decode(unsigned char* in, size_t in_size, struct Lib
struct Libp2pLinkedList* current_item = NULL; struct Libp2pLinkedList* current_item = NULL;
struct Libp2pLinkedList* last_closer = NULL; struct Libp2pLinkedList* last_closer = NULL;
struct Libp2pLinkedList* last_provider = NULL; struct Libp2pLinkedList* last_provider = NULL;
struct Libp2pMessage* ptr = NULL; struct KademliaMessage* ptr = NULL;
if ( (*out = libp2p_message_new()) == NULL) if ( (*out = libp2p_message_new()) == NULL)
goto exit; goto exit;

View file

@ -6,11 +6,11 @@
*/ */
int libp2p_record_handler_ping(struct Libp2pPeer* peer, struct Libp2pMessage* message) { int libp2p_record_handler_ping(struct Libp2pPeer* peer, struct KademliaMessage* message) {
return 0; return 0;
} }
int libp2p_record_message_handle(struct Libp2pPeer* peer, struct Libp2pMessage* message) { int libp2p_record_message_handle(struct Libp2pPeer* peer, struct KademliaMessage* message) {
switch (message->message_type) { switch (message->message_type) {
case (MESSAGE_TYPE_PING): case (MESSAGE_TYPE_PING):
return libp2p_record_handler_ping(peer, message); return libp2p_record_handler_ping(peer, message);

View file

@ -61,7 +61,7 @@ struct Libp2pProtocolHandler* libp2p_routing_dht_build_protocol_handler(struct P
* @param buffer_size the size of the results * @param buffer_size the size of the results
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_routing_dht_protobuf_message(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) { int libp2p_routing_dht_protobuf_message(struct KademliaMessage* message, unsigned char** buffer, size_t *buffer_size) {
*buffer_size = libp2p_message_protobuf_encode_size(message); *buffer_size = libp2p_message_protobuf_encode_size(message);
*buffer = malloc(*buffer_size); *buffer = malloc(*buffer_size);
if (!libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size)) { if (!libp2p_message_protobuf_encode(message, *buffer, *buffer_size, buffer_size)) {
@ -124,7 +124,7 @@ int libp2p_routing_dht_handshake(struct SessionContext* context) {
* @param buffer_size the length of the results * @param buffer_size the length of the results
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char** buffer, size_t *buffer_size) { int libp2p_routing_dht_handle_ping(struct KademliaMessage* message, unsigned char** buffer, size_t *buffer_size) {
// just turn message back into a protobuf and send it back... // just turn message back into a protobuf and send it back...
return libp2p_routing_dht_protobuf_message(message, buffer, buffer_size); return libp2p_routing_dht_protobuf_message(message, buffer, buffer_size);
} }
@ -137,7 +137,7 @@ int libp2p_routing_dht_handle_ping(struct Libp2pMessage* message, unsigned char*
* @param providerstore the list of peers that can provide things * @param providerstore the list of peers that can provide things
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct Libp2pMessage* message, struct Peerstore* peerstore, int libp2p_routing_dht_handle_get_providers(struct SessionContext* session, struct KademliaMessage* message, struct Peerstore* peerstore,
struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) { struct ProviderStore* providerstore, unsigned char** results, size_t* results_size) {
unsigned char* peer_id = NULL; unsigned char* peer_id = NULL;
int peer_id_size = 0; int peer_id_size = 0;
@ -228,7 +228,7 @@ struct MultiAddress* libp2p_routing_dht_find_peer_ip_multiaddress(struct Libp2pL
* @param result_buffer_size the size of the result buffer * @param result_buffer_size the size of the result buffer
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct Libp2pMessage* message, int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) { struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t* result_buffer_size) {
int retVal = 0; int retVal = 0;
struct Libp2pPeer *peer = NULL; struct Libp2pPeer *peer = NULL;
@ -315,7 +315,7 @@ int libp2p_routing_dht_handle_add_provider(struct SessionContext* session, struc
* @param result_buffer_size the size of the results * @param result_buffer_size the size of the results
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct Libp2pMessage* message, int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
struct Datastore* datastore = session->datastore; struct Datastore* datastore = session->datastore;
@ -359,7 +359,7 @@ int libp2p_routing_dht_handle_get_value(struct SessionContext* session, struct L
* @param result_buffer_size the size of the results * @param result_buffer_size the size of the results
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct Libp2pMessage* message, int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
//TODO: implement this //TODO: implement this
return 0; return 0;
@ -375,7 +375,7 @@ int libp2p_routing_dht_handle_put_value(struct SessionContext* session, struct L
* @param result_buffer_size the size of the results * @param result_buffer_size the size of the results
* @returns true(1) on success, otherwise false(0) * @returns true(1) on success, otherwise false(0)
*/ */
int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct Libp2pMessage* message, int libp2p_routing_dht_handle_find_node(struct SessionContext* session, struct KademliaMessage* message,
struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) { struct Peerstore* peerstore, struct ProviderStore* providerstore, unsigned char** result_buffer, size_t *result_buffer_size) {
// look through peer store // look through peer store
struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)message->key, message->key_size); struct Libp2pPeer* peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)message->key, message->key_size);
@ -402,7 +402,7 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
unsigned char* buffer = NULL, *result_buffer = NULL; unsigned char* buffer = NULL, *result_buffer = NULL;
size_t buffer_size = 0, result_buffer_size = 0; size_t buffer_size = 0, result_buffer_size = 0;
int retVal = 0; int retVal = 0;
struct Libp2pMessage* message = NULL; struct KademliaMessage* message = NULL;
// read from stream // read from stream
if (!session->default_stream->read(session, &buffer, &buffer_size, 5)) if (!session->default_stream->read(session, &buffer, &buffer_size, 5))

View file

@ -580,6 +580,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
read_this_time = 0; read_this_time = 0;
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable // TODO: use epoll or select to wait for socket to be writable
libp2p_logger_debug("secio", "Attempted read, but got EAGAIN or EWOULDBLOCK. Code %d.\n", errno);
return 0; return 0;
} else { } else {
libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno)); libp2p_logger_error("secio", "Error in libp2p_secio_unencrypted_read: %s\n", strerror(errno));
@ -587,16 +588,17 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
} }
} }
if (read == 0 && size[0] == 10) { if (read == 0 && size[0] == 10) {
// a spurious \n libp2p_logger_error("secio", "Spurrious newline found.\n");
// write over this value by not adding it
} else { } else {
left = left - read_this_time; left = left - read_this_time;
read += read_this_time; read += read_this_time;
} }
} while (left > 0); } while (left > 0);
buffer_size = ntohl(buffer_size); buffer_size = ntohl(buffer_size);
if (buffer_size == 0) if (buffer_size == 0) {
libp2p_logger_error("secio", "unencrypted read buffer size is 0.\n");
return 0; return 0;
}
// now read the number of bytes we've found, minus the 4 that we just read // now read the number of bytes we've found, minus the 4 that we just read
left = buffer_size; left = buffer_size;
@ -611,6 +613,7 @@ int libp2p_secio_unencrypted_read(struct SessionContext* session, unsigned char*
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) { if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
// TODO: use epoll or select to wait for socket to be writable // TODO: use epoll or select to wait for socket to be writable
} else { } else {
libp2p_logger_error("secio", "read from socket returned %d.\n", errno);
return 0; return 0;
} }
} }

View file

@ -208,8 +208,8 @@ int test_record_peer_protobuf() {
int test_record_message_protobuf() { int test_record_message_protobuf() {
int retVal = 0; int retVal = 0;
struct Libp2pPeer* closer_peer = NULL; struct Libp2pPeer* closer_peer = NULL;
struct Libp2pMessage* message = NULL; struct KademliaMessage* message = NULL;
struct Libp2pMessage* result = NULL; struct KademliaMessage* result = NULL;
struct MultiAddress *ma_result = NULL; struct MultiAddress *ma_result = NULL;
char* buffer = NULL; char* buffer = NULL;
size_t buffer_len = 0; size_t buffer_len = 0;