diff --git a/include/libp2p/net/stream.h b/include/libp2p/net/stream.h index 93c92a9..60da8c8 100644 --- a/include/libp2p/net/stream.h +++ b/include/libp2p/net/stream.h @@ -37,4 +37,12 @@ struct Stream { * @returns true(1) on success, otherwise false(0) */ int (*close)(void* stream_context); + + /*** + * Checks to see if something is waiting on the stream + * + * @param stream the stream context + * @returns true(1) if something is waiting, false(0) otherwise + */ + int (*peek)(void* stream_context); }; diff --git a/include/libp2p/peer/peer.h b/include/libp2p/peer/peer.h index 310e0f5..fd39993 100644 --- a/include/libp2p/peer/peer.h +++ b/include/libp2p/peer/peer.h @@ -57,6 +57,13 @@ void libp2p_peer_free(struct Libp2pPeer* in); */ int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout); +/*** + * Clean up a bad connection + * @param peer the peer to clean up + * @returns true(1) + */ +int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer); + /** * Make a copy of a peer * @param in what is to be copied diff --git a/net/multistream.c b/net/multistream.c index 6dd6c49..b061e60 100644 --- a/net/multistream.c +++ b/net/multistream.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "libp2p/net/p2pnet.h" #include "libp2p/record/message.h" #include "libp2p/secio/secio.h" @@ -38,6 +39,32 @@ int libp2p_net_multistream_close(void* stream_context) { return 1; } +/*** + * Check the stream to see if there is something to read + * @param stream_context a SessionContext + * @returns number of bytes to be read, or -1 if there was an error + */ +int libp2p_net_multistream_peek(void* stream_context) { + if (stream_context == NULL) + return -1; + + struct SessionContext* session_context = (struct SessionContext*)stream_context; + struct Stream* stream = session_context->default_stream; + if (stream == NULL) + return -1; + + int socket_fd = *((int*)stream->socket_descriptor); + if (socket_fd < 0) + return -1; + + int bytes = 0; + if (ioctl(socket_fd, FIONREAD, &bytes) < 0) { + // Ooff, we're having problems. Don't use this socket again. + return -1; + } + return bytes; +} + /** * Write to an open multistream host * @param stream_context the session context @@ -334,6 +361,7 @@ struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip, out->close = libp2p_net_multistream_close; out->read = libp2p_net_multistream_read; out->write = libp2p_net_multistream_write; + out->peek = libp2p_net_multistream_peek; char str[strlen(ip) + 50]; sprintf(str, "/ip4/%s/tcp/%d", ip, port); out->address = multiaddress_new_from_string(str); diff --git a/peer/peer.c b/peer/peer.c index 4e41228..aa87d49 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -75,6 +75,18 @@ void libp2p_peer_free(struct Libp2pPeer* in) { } } +/*** + * Clean up a bad connection + * @param peer the peer to clean up + * @returns true(1) + */ +int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) { + peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED; + libp2p_session_context_free(peer->sessionContext); + peer->sessionContext = NULL; + return 1; +} + /** * Attempt to connect to the peer, setting connection_type correctly * NOTE: If successful, this will set peer->connection to the stream diff --git a/peer/peerstore.c b/peer/peerstore.c index 8161c4e..66a58af 100644 --- a/peer/peerstore.c +++ b/peer/peerstore.c @@ -132,11 +132,14 @@ int libp2p_peerstore_add_peer(struct Peerstore* peerstore, const struct Libp2pPe } struct PeerEntry* peer_entry = libp2p_peer_entry_new(); if (peer_entry == NULL) { + libp2p_logger_error("peerstore", "Unable to allocate memory for new PeerEntry.\n"); return 0; } peer_entry->peer = libp2p_peer_copy(peer); - if (peer_entry->peer == NULL) + if (peer_entry->peer == NULL) { + libp2p_logger_error("peerstore", "Could not copy peer for PeerEntry.\n"); return 0; + } retVal = libp2p_peerstore_add_peer_entry(peerstore, peer_entry); libp2p_logger_debug("peerstore", "Adding peer %s to peerstore was a success\n", peer->id); } diff --git a/secio/secio.c b/secio/secio.c index 75e6818..de69993 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -782,10 +782,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva struct StretchedKey* k1 = NULL, *k2 = NULL; struct PrivateKey* priv = NULL; struct PublicKey pub_key = {0}; - struct Libp2pPeer* remote_peer = libp2p_peer_new(); - - remote_peer->sessionContext = local_session; - remote_peer->connection_type = CONNECTION_TYPE_CONNECTED; + struct Libp2pPeer* remote_peer = NULL; //TODO: make sure we're not talking to ourself @@ -884,14 +881,32 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva // generate their peer id libp2p_crypto_public_key_to_peer_id(public_key, &local_session->remote_peer_id); - // put peer information in Libp2pPeer struct - remote_peer->id_size = strlen(local_session->remote_peer_id); - if (remote_peer->id_size > 0) { - remote_peer->id = malloc(remote_peer->id_size + 1); - if (remote_peer->id != NULL) { - memcpy(remote_peer->id, local_session->remote_peer_id, remote_peer->id_size); - remote_peer->id[remote_peer->id_size] = 0; + // see if we already have this peer + int new_peer = 0; + remote_peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)local_session->remote_peer_id, strlen(local_session->remote_peer_id)); + if (remote_peer == NULL) { + remote_peer = libp2p_peer_new(); + new_peer = 1; + // put peer information in Libp2pPeer struct + remote_peer->id_size = strlen(local_session->remote_peer_id); + if (remote_peer->id_size > 0) { + remote_peer->id = malloc(remote_peer->id_size + 1); + if (remote_peer->id != NULL) { + memcpy(remote_peer->id, local_session->remote_peer_id, remote_peer->id_size); + remote_peer->id[remote_peer->id_size] = 0; + } } + } else { + libp2p_logger_debug("secio", "Same remote connected. Replacing SessionContext.\n"); + // clean up old session context + libp2p_session_context_free(remote_peer->sessionContext); + } + remote_peer->sessionContext = local_session; + remote_peer->connection_type = CONNECTION_TYPE_CONNECTED; + + if (new_peer) { + libp2p_logger_debug("secio", "New connection. Adding Peer to Peerstore.\n"); + libp2p_peerstore_add_peer(peerstore, remote_peer); } // negotiate encryption parameters NOTE: SelectBest must match, otherwise this won't work @@ -1086,9 +1101,6 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva if (retVal == 1) { libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake success!\n"); - // add this to the peerstore - if (peerstore != NULL) - libp2p_peerstore_add_peer(peerstore, remote_peer); } else { libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n"); libp2p_peer_free(remote_peer); diff --git a/utils/logger.c b/utils/logger.c index 69295a8..5eaa9fd 100644 --- a/utils/logger.c +++ b/utils/logger.c @@ -75,9 +75,12 @@ void libp2p_logger_log(const char* area, int log_level, const char* format, ...) libp2p_logger_init(); if (log_level <= CURRENT_LOGLEVEL) { if (libp2p_logger_watching_class(area)) { + int new_format_size = strlen(format) + strlen(area) + 10; + char new_format[new_format_size]; + sprintf(&new_format[0], "[%s] %s", area, format); va_list argptr; va_start(argptr, format); - vfprintf(stderr, format, argptr); + vfprintf(stderr, new_format, argptr); va_end(argptr); } }