implementation of dht_send_message_nearest_x

This commit is contained in:
jmjatlanta 2017-09-18 06:32:37 -05:00
parent 96ed7bc511
commit a810f94757
6 changed files with 118 additions and 18 deletions

View file

@ -56,7 +56,7 @@ void libp2p_peer_free(struct Libp2pPeer* in);
* @param timeout number of seconds before giving up * @param timeout number of seconds before giving up
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout); int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore* datastore, int timeout);
/*** /***
* Clean up a bad connection * Clean up a bad connection

View file

@ -36,13 +36,32 @@ int libp2p_routing_dht_handshake(struct SessionContext* context);
*/ */
int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore); int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Peerstore* peerstore, struct ProviderStore* providerstore);
/***
* Send a kademlia message
* NOTE: this call upgrades the stream to /ipfs/kad/1.0.0
* @param context the context
* @param message the message
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message);
/**
* Attempt to receive a kademlia message
* NOTE: This call assumes that a send_message was sent
* @param sessionContext the context
* @param result where to put the results
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, struct KademliaMessage** result);
/** /**
* Used to send a message to the nearest x peers * Used to send a message to the nearest x peers
* *
* @param local_peer the local peer * @param private_key the private key of the local peer
* @param providerstore the collection of providers * @param peerstore the collection of peers
* @param datastore a connection to the datastore
* @param msg the message to send * @param msg the message to send
* @returns true(1) on success, false(0) otherwise * @returns true(1) if we sent to at least 1, false(0) otherwise
*/ */
int libp2p_routing_dht_send_message(struct Libp2pPeer* local_peer, struct ProviderStore* providerstore, struct KademliaMessage* msg); int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore,
struct Datastore* datastore, struct KademliaMessage* msg, int numToSend);

View file

@ -21,7 +21,7 @@ struct Libp2pProtocolHandler* libp2p_secio_build_protocol_handler(struct RsaPriv
* @param remote_requested the other side is who asked for the upgrade * @param remote_requested the other side is who asked for the upgrade
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SessionContext* session, struct RsaPrivateKey* private_key, struct Peerstore* peerstore); int libp2p_secio_handshake(struct SessionContext* session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore);
/*** /***
* Initiates a secio handshake. Use this method when you want to initiate a secio * Initiates a secio handshake. Use this method when you want to initiate a secio
@ -31,7 +31,7 @@ int libp2p_secio_handshake(struct SessionContext* session, struct RsaPrivateKey*
* @param peer_store the peer store * @param peer_store the peer store
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_initiate_handshake(struct SessionContext* session_context, struct RsaPrivateKey* private_key, struct Peerstore* peer_store); int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store);
/*** /***
* Send the protocol string to the remote stream * Send the protocol string to the remote stream

View file

@ -96,7 +96,7 @@ int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) {
* @param peerstore if connection is successfull, will add peer to peerstore * @param peerstore if connection is successfull, will add peer to peerstore
* @returns true(1) on success, false(0) if we could not connect * @returns true(1) on success, false(0) if we could not connect
*/ */
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) { int libp2p_peer_connect(const struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, struct Datastore *datastore, int timeout) {
libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer)); libp2p_logger_debug("peer", "Attemping to connect to %s.\n", libp2p_peer_id_to_string(peer));
time_t now, prev = time(NULL); time_t now, prev = time(NULL);
// find an appropriate address // find an appropriate address

View file

@ -485,13 +485,94 @@ int libp2p_routing_dht_handle_message(struct SessionContext* session, struct Pee
} }
/** /**
* Used to send a message to the nearest x peers * Attempt to receive a kademlia message
* * NOTE: This call assumes that a send_message was sent
* @param local_peer the local peer * @param sessionContext the context
* @param providerstore the collection of providers * @param result where to put the results
* @param msg the message to send
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_routing_dht_send_message(struct Libp2pPeer* local_peer, struct ProviderStore* providerstore, struct KademliaMessage* msg) { int libp2p_routing_dht_receive_message(struct SessionContext* sessionContext, struct KademliaMessage** result) {
return 0; uint8_t* results = NULL;
size_t results_size = 0;
if (!sessionContext->default_stream->read(sessionContext, &results, &results_size, 5)) {
libp2p_logger_error("online", "Attempted to read from Kademlia stream, but could not.\n");
goto exit;
}
// see if we can unprotobuf
if (!libp2p_message_protobuf_decode(results, results_size, result)) {
libp2p_logger_error("online", "Received kademlia response, but cannot decode it.\n");
goto exit;
}
exit:
return result != NULL;
}
/***
* Send a kademlia message
* NOTE: this call upgrades the stream to /ipfs/kad/1.0.0
* @param context the context
* @param message the message
* @returns true(1) on success, false(0) otherwise
*/
int libp2p_routing_dht_send_message(struct SessionContext* sessionContext, struct KademliaMessage* message) {
size_t protobuf_size = 0, retVal = 0;
unsigned char* protobuf = NULL;
protobuf_size = libp2p_message_protobuf_encode_size(message);
protobuf = (unsigned char*)malloc(protobuf_size);
libp2p_message_protobuf_encode(message, &protobuf[0], protobuf_size, &protobuf_size);
// upgrade to kademlia protocol
if (!libp2p_routing_dht_upgrade_stream(sessionContext)) {
libp2p_logger_error("dht_protocol", "send_message: Unable to upgrade to kademlia stream.\n");
goto exit;
}
// send the message
if (!sessionContext->default_stream->write(sessionContext, protobuf, protobuf_size)) {
libp2p_logger_error("dht_protocol", "send_message: Attempted to write to Kademlia stream, but could not.\n");
goto exit;
}
retVal = 1;
exit:
if (protobuf != NULL)
free(protobuf);
return retVal;
}
/**
* Used to send a message to the nearest x peers
*
* @param private_key the private key of the local peer
* @param peerstore the collection of peers
* @param datastore a connection to the datastore
* @param msg the message to send
* @returns true(1) if we sent to at least 1, false(0) otherwise
*/
int libp2p_routing_dht_send_message_nearest_x(const struct RsaPrivateKey* private_key, struct Peerstore* peerstore,
struct Datastore* datastore, struct KademliaMessage* msg, int numToSend) {
// TODO: Calculate "Nearest"
// but for now, grab x peers, and send to them
int numSent = 0;
struct Libp2pLinkedList* llpeer_entry = peerstore->head_entry;
while (llpeer_entry != NULL) {
struct PeerEntry* entry = llpeer_entry->item;
if (entry == NULL)
break;
struct Libp2pPeer* remote_peer = entry->peer;
// connect (if not connected)
if (libp2p_peer_connect(private_key, remote_peer, peerstore, datastore, 5)) {
// send message
if (libp2p_routing_dht_send_message(remote_peer->sessionContext, msg))
numSent++;
}
if (numSent >= numToSend)
break;
// grab next entry
llpeer_entry = llpeer_entry->next;
}
return numSent > 0;
} }

View file

@ -83,7 +83,7 @@ int libp2p_secio_shutdown(void* context) {
* @param peer_store the peer store * @param peer_store the peer store
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_initiate_handshake(struct SessionContext* session_context, struct RsaPrivateKey* private_key, struct Peerstore* peer_store) { int libp2p_secio_initiate_handshake(struct SessionContext* session_context, const struct RsaPrivateKey* private_key, struct Peerstore* peer_store) {
if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) { if (libp2p_secio_send_protocol(session_context) && libp2p_secio_receive_protocol(session_context)) {
return libp2p_secio_handshake(session_context, private_key, peer_store); return libp2p_secio_handshake(session_context, private_key, peer_store);
} }
@ -855,7 +855,7 @@ int libp2p_secio_encrypted_read(void* stream_context, unsigned char** bytes, siz
* @param peerstore the collection of peers * @param peerstore the collection of peers
* @returns true(1) on success, false(0) otherwise * @returns true(1) on success, false(0) otherwise
*/ */
int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPrivateKey* private_key, struct Peerstore* peerstore) { int libp2p_secio_handshake(struct SessionContext* local_session, const struct RsaPrivateKey* private_key, struct Peerstore* peerstore) {
int retVal = 0; int retVal = 0;
size_t results_size = 0, bytes_written = 0; size_t results_size = 0, bytes_written = 0;
unsigned char* propose_in_bytes = NULL; // the remote protobuf unsigned char* propose_in_bytes = NULL; // the remote protobuf