More bitswap implementation

This commit is contained in:
John Jones 2017-07-31 06:43:15 -05:00
parent b3bb857f3a
commit 059a3286c9
14 changed files with 194 additions and 24 deletions

View file

@ -638,5 +638,59 @@ int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_le
return 1;
}
/****
* Add a vector of Cids to the bitswap message
* @param message the message
* @param cids a Libp2pVector of cids
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_message_add_wantlist_items(struct BitswapMessage* message, struct Libp2pVector* cids) {
if (message->wantlist == NULL) {
message->wantlist = ipfs_bitswap_wantlist_new();
if (message->wantlist == NULL)
return 0;
}
if (message->wantlist->entries == NULL) {
message->wantlist->entries = libp2p_utils_vector_new(1);
if (message->wantlist->entries == NULL)
return 0;
}
for(int i = 0; i < cids->total; i++) {
const struct Cid* cid = (const struct Cid*)libp2p_utils_vector_get(cids, i);
struct WantlistEntry* entry = ipfs_bitswap_wantlist_entry_new();
if (!ipfs_cid_protobuf_encode(cid, entry->block, entry->block_size, &entry->block_size)) {
// TODO: we should do more than return a half-baked list
return 0;
}
entry->cancel = 0;
entry->priority = 1;
libp2p_utils_vector_add(message->wantlist->entries, entry);
}
return 1;
}
/***
* Add the blocks to the BitswapMessage
* @param message the message
* @param blocks the requested blocks
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_message_add_blocks(struct BitswapMessage* message, struct Libp2pVector* blocks) {
// bitswap 1.0 uses blocks, bitswap 1.1 uses payload
if (message == NULL)
return 0;
if (message->payload == NULL) {
message->payload = libp2p_utils_vector_new(blocks->total);
if (message->payload == NULL)
return 0;
}
for(int i = 0; i < blocks->total; i++) {
const struct Block* current = (const struct Block*) libp2p_utils_vector_get(blocks, i);
libp2p_utils_vector_add(message->payload, current);
}
return 1;
}

View file

@ -23,3 +23,14 @@ ipfs_bitswap_network_receive_message(struct BitswapContext* context) {
/**
* We want to pop something off the queue
*/
/****
* send a message to a particular peer
* @param context the BitswapContext
* @param peer the peer that is the recipient
* @param message the message to send
*/
int ipfs_bitswap_network_send_message(const struct BitswapContext* context, const struct Libp2pPeer* peer, const struct BitswapMessage* message) {
return 0;
}

View file

@ -7,6 +7,8 @@
#include "libp2p/conn/session.h"
#include "ipfs/cid/cid.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
#include "ipfs/exchange/bitswap/message.h"
#include "ipfs/exchange/bitswap/network.h"
/**
* Allocate resources for a new PeerRequest
@ -17,7 +19,7 @@ struct PeerRequest* ipfs_bitswap_peer_request_new() {
if (request != NULL) {
request->cids = NULL;
request->blocks = NULL;
request->context = NULL;
request->peer = NULL;
}
return request;
}
@ -118,7 +120,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR
if (request != NULL) {
struct PeerRequestEntry* current = queue->first;
while (current != NULL) {
if (libp2p_session_context_compare(current->current->context, request->context) == 0)
if (libp2p_peer_compare(current->current->peer, request->peer) == 0)
return current;
current = current->next;
}
@ -193,7 +195,58 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct
* @returns true(1) on succes, otherwise false(0)
*/
int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) {
//TODO: Implement this method
// determine if we're connected
int connected = request->peer == NULL || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
int need_to_connect = request->cids != NULL;
// determine if we need to connect
if (need_to_connect) {
if (!connected) {
// connect
connected = libp2p_peer_connect(&context->ipfsNode->identity->private_key, request->peer, 0);
}
if (connected) {
// build a message
struct BitswapMessage* msg = ipfs_bitswap_message_new();
// add requests
ipfs_bitswap_message_add_wantlist_items(msg, request->cids);
// add blocks
ipfs_bitswap_message_add_blocks(msg, request->blocks);
// send message
if (ipfs_bitswap_network_send_message(context, request->peer, msg))
return 1;
}
}
return 0;
}
/***
* Find a PeerRequest related to a peer. If one is not found, it is created.
*
* @param peer_request_queue the queue to look through
* @param peer the peer to look for
* @returns a PeerRequestEntry or NULL on error
*/
struct PeerRequest* ipfs_peer_request_queue_find_peer(struct PeerRequestQueue* queue, struct Libp2pPeer* peer) {
struct PeerRequestEntry* entry = queue->first;
while (entry != NULL) {
if (libp2p_peer_compare(entry->current->peer, peer) == 0) {
return entry->current;
}
}
entry = ipfs_bitswap_peer_request_entry_new();
entry->current->peer = peer;
entry->prior = queue->last;
queue->last = entry;
return entry->current;
}

View file

@ -231,15 +231,30 @@ int ipfs_bitswap_wantlist_get_block_locally(struct BitswapContext* context, stru
*
* This will ask the network for who has the file, using the router.
* It will then ask the specific nodes for the file. This method
* does not queue anything. It actually does the work.
* does not queue anything. It actually does the work. The remotes
* will queue the file, but we'll return before they respond.
*
* @param context the BitswapContext
* @param cid the id of the file
* @param block where to put the results
* @returns true(1) if found, false(0) otherwise
* @returns true(1) if we found some providers to ask, false(0) otherwise
*/
int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid, struct Block** block) {
//TODO: Implement this workhorse of a method
int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid) {
// find out who may have the file
struct Libp2pVector* providers = NULL;
if (context->ipfsNode->routing->FindProviders(context->ipfsNode->routing, cid->hash, cid->hash_length, &providers)) {
for(int i = 0; i < providers->total; i++) {
struct Libp2pPeer* current = (struct Libp2pPeer*) libp2p_utils_vector_get(providers, i);
// add this to their queue
struct PeerRequest* queueEntry = ipfs_peer_request_queue_find_peer(context->peerRequestQueue, current);
libp2p_utils_vector_add(queueEntry->cids, cid);
// process this queue
// NOTE: We need to ask the remotes via bitswap, and wait for a response before returning
// there will need to be some fancy stuff to know when we get it back so that this method
// can return with the block
ipfs_bitswap_peer_request_process_entry(context, queueEntry);
}
return 1;
}
return 0;
}
@ -257,7 +272,7 @@ int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct W
return 0;
}
if (local_request && !have_local) {
if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid, &entry->block)) {
if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid)) {
// if we were unsuccessful in retrieving it, put it back in the queue?
// I don't think so. But I'm keeping this counter here until we have
// a final decision. Maybe lower the priority?