From a810f94757885fff7bffdabddeeb8d9d731a6d97 Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Mon, 18 Sep 2017 06:32:37 -0500 Subject: [PATCH] implementation of dht_send_message_nearest_x --- include/libp2p/peer/peer.h | 2 +- include/libp2p/routing/dht_protocol.h | 29 ++++++-- include/libp2p/secio/secio.h | 4 +- peer/peer.c | 2 +- routing/dht_protocol.c | 95 +++++++++++++++++++++++++-- secio/secio.c | 4 +- 6 files changed, 118 insertions(+), 18 deletions(-) diff --git a/include/libp2p/peer/peer.h b/include/libp2p/peer/peer.h index bc92c60..2871e42 100644 --- a/include/libp2p/peer/peer.h +++ b/include/libp2p/peer/peer.h @@ -56,7 +56,7 @@ void libp2p_peer_free(struct Libp2pPeer* in); * @param timeout number of seconds before giving up * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); +int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); /*** * Clean up a bad connection diff --git a/include/libp2p/routing/dht_protocol.h b/include/libp2p/routing/dht_protocol.h index 3f54f85..c5868c0 100644 --- a/include/libp2p/routing/dht_protocol.h +++ b/include/libp2p/routing/dht_protocol.h @@ -36,13 +36,32 @@ int libp2p_routing_dht_handshake(struct SessionContext* context); */ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore); +/*** + * Send a kademlia message + * NOTE: this call upgrades the stream to /ipfs/kad/1.0.0 + * @param context the context + * @param message the message + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message); + +/** + * Attempt to receive a kademlia message + * NOTE: This call assumes that a send_message was sent + * @param sessionContext the context + * @param result where to put the results + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, struct KademliaMessage** result); + /** * Used to send a message to the nearest x peers * - * @param local_peer the local peer - * @param providerstore the collection of providers + * @param private_key the private key of the local peer + * @param peerstore the collection of peers + * @param datastore a connection to the datastore * @param msg the message to send - * @returns true(1) on success, false(0) otherwise + * @returns true(1) if we sent to at least 1, false(0) otherwise */ -int libp2p_routing_dht_send_message(struct Libp2pPeer* local_peer, struct ProviderStore* providerstore, struct KademliaMessage* msg); - +int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, + struct Datastore* datastore, struct KademliaMessage* msg, int numToSend); diff --git a/include/libp2p/secio/secio.h b/include/libp2p/secio/secio.h index 8e9a0dd..c61b8b8 100644 --- a/include/libp2p/secio/secio.h +++ b/include/libp2p/secio/secio.h @@ -21,7 +21,7 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv * @param remote_requested the other side is who asked for the upgrade * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SessionContext* session, struct RsaPrivateKey* private_key, struct Peerstore* peerstore); +int libp2p_secio_handshake(struct SessionContext* session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore); /*** * Initiates a secio handshake. Use this method when you want to initiate a secio @@ -31,7 +31,7 @@ int libp2p_secio_handshake(struct SessionContext* session, struct RsaPrivateKey* * @param peer_store the peer store * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_initiate_handshake(struct SessionContext* session_context, struct RsaPrivateKey* private_key, struct Peerstore* peer_store); +int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store); /*** * Send the protocol string to the remote stream diff --git a/peer/peer.c b/peer/peer.c index 6332248..1536f67 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -96,7 +96,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) { * @param peerstore if connection is successfull, will add peer to peerstore * @returns true(1) on success, false(0) if we could not connect */ -int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { +int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer)); time_t now, prev = time(NULL); // find an appropriate address diff --git a/routing/dht_protocol.c b/routing/dht_protocol.c index f4a4a9f..0766805 100644 --- a/routing/dht_protocol.c +++ b/routing/dht_protocol.c @@ -485,13 +485,94 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee } /** - * Used to send a message to the nearest x peers - * - * @param local_peer the local peer - * @param providerstore the collection of providers - * @param msg the message to send + * Attempt to receive a kademlia message + * NOTE: This call assumes that a send_message was sent + * @param sessionContext the context + * @param result where to put the results * @returns true(1) on success, false(0) otherwise */ -int libp2p_routing_dht_send_message(struct Libp2pPeer* local_peer, struct ProviderStore* providerstore, struct KademliaMessage* msg) { - return 0; +int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, struct KademliaMessage** result) { + uint8_t* results = NULL; + size_t results_size = 0; + + if (!sessionContext->default_stream->read(sessionContext, &results, &results_size, 5)) { + libp2p_logger_error("online", "Attempted to read from Kademlia stream, but could not.\n"); + goto exit; + } + + // see if we can unprotobuf + if (!libp2p_message_protobuf_decode(results, results_size, result)) { + libp2p_logger_error("online", "Received kademlia response, but cannot decode it.\n"); + goto exit; + } + exit: + return result != NULL; +} + +/*** + * Send a kademlia message + * NOTE: this call upgrades the stream to /ipfs/kad/1.0.0 + * @param context the context + * @param message the message + * @returns true(1) on success, false(0) otherwise + */ +int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message) { + size_t protobuf_size = 0, retVal = 0; + unsigned char* protobuf = NULL; + + protobuf_size = libp2p_message_protobuf_encode_size(message); + protobuf = (unsigned char*)malloc(protobuf_size); + libp2p_message_protobuf_encode(message, &protobuf[0], protobuf_size, &protobuf_size); + + + // upgrade to kademlia protocol + if (!libp2p_routing_dht_upgrade_stream(sessionContext)) { + libp2p_logger_error("dht_protocol", "send_message: Unable to upgrade to kademlia stream.\n"); + goto exit; + } + + // send the message + if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) { + libp2p_logger_error("dht_protocol", "send_message: Attempted to write to Kademlia stream, but could not.\n"); + goto exit; + } + retVal = 1; + exit: + if (protobuf != NULL) + free(protobuf); + return retVal; +} + +/** + * Used to send a message to the nearest x peers + * + * @param private_key the private key of the local peer + * @param peerstore the collection of peers + * @param datastore a connection to the datastore + * @param msg the message to send + * @returns true(1) if we sent to at least 1, false(0) otherwise + */ +int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore, + struct Datastore* datastore, struct KademliaMessage* msg, int numToSend) { + // TODO: Calculate "Nearest" + // but for now, grab x peers, and send to them + int numSent = 0; + struct Libp2pLinkedList* llpeer_entry = peerstore->head_entry; + while (llpeer_entry != NULL) { + struct PeerEntry* entry = llpeer_entry->item; + if (entry == NULL) + break; + struct Libp2pPeer* remote_peer = entry->peer; + // connect (if not connected) + if (libp2p_peer_connect(private_key, remote_peer, peerstore, datastore, 5)) { + // send message + if (libp2p_routing_dht_send_message(remote_peer->sessionContext, msg)) + numSent++; + } + if (numSent >= numToSend) + break; + // grab next entry + llpeer_entry = llpeer_entry->next; + } + return numSent > 0; } diff --git a/secio/secio.c b/secio/secio.c index 46e1eda..d5a4631 100644 --- a/secio/secio.c +++ b/secio/secio.c @@ -83,7 +83,7 @@ int libp2p_secio_shutdown(void* context) { * @param peer_store the peer store * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_initiate_handshake(struct SessionContext* session_context, struct RsaPrivateKey* private_key, struct Peerstore* peer_store) { +int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store) { if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { return libp2p_secio_handshake(session_context, private_key, peer_store); } @@ -855,7 +855,7 @@ int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, siz * @param peerstore the collection of peers * @returns true(1) on success, false(0) otherwise */ -int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { +int libp2p_secio_handshake(struct SessionContext* local_session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { int retVal = 0; size_t results_size = 0, bytes_written = 0; unsigned char* propose_in_bytes = NULL; // the remote protobuf