Handling details of bitswap

This commit is contained in:
jmjatlanta 2017-08-03 11:16:58 -05:00
parent e58909b875
commit d1d4d19fa8
8 changed files with 177 additions and 69 deletions

View file

@ -3,6 +3,7 @@
*/
#include <stdlib.h>
#include <unistd.h> // for sleep()
#include "libp2p/utils/logger.h"
#include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/bitswap.h"
@ -42,6 +43,7 @@ struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) {
// Start the threads for the network
ipfs_bitswap_engine_start(bitswapContext);
libp2p_logger_debug("bitswap", "Bitswap engine started\n");
}
return exchange;
}

View file

@ -1,4 +1,5 @@
#include <unistd.h>
#include "libp2p/utils/logger.h"
#include "ipfs/core/null.h"
#include "ipfs/exchange/bitswap/engine.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
@ -57,22 +58,80 @@ void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct PeerRequest* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue);
if (item != NULL) {
// did they send us something over the network?
unsigned char* buffer = NULL;
size_t buffer_len = 0;
if (item->peer->sessionContext->default_stream->read(item->peer->sessionContext, &buffer, &buffer_len, 1)) {
// handle it
ipfs_multistream_marshal(buffer, buffer_len, item->peer->sessionContext, context->ipfsNode);
}
struct Libp2pLinkedList* current = context->ipfsNode->peerstore->head_entry;
int did_some_processing = 0;
while (1) {
if (context->bitswap_engine->shutting_down) // system shutting down
break;
// if there is something on the queue process it.
ipfs_bitswap_peer_request_process_entry(context, item);
if (current == NULL) { // the PeerStore is empty
libp2p_logger_debug("bitswap_engine", "Peerstore is empty. Pausing.\n");
sleep(1);
continue;
}
if (current->item == NULL) {
// error
libp2p_logger_error("bitswap_engine", "Peerstore has a null entry.\n");
break;
}
// see if they want something
struct Libp2pPeer* current_peer_entry = ((struct PeerEntry*)current->item)->peer;
if (current_peer_entry == NULL) {
// error
libp2p_logger_error("bitswap_engine", "Peerstore has an item that is a null peer.\n");
break;
}
if (current_peer_entry->connection_type == CONNECTION_TYPE_CONNECTED) {
libp2p_logger_debug("bitswap_engine", "We're connected to this peer. Lets see if there is a message waiting for us.\n");
int retVal = current_peer_entry->sessionContext->default_stream->peek(current_peer_entry->sessionContext);
if (retVal < 0) {
libp2p_logger_debug("bitswap_engine", "We thought we were connected, but Peek reported an error.\n");
libp2p_peer_handle_connection_error(current_peer_entry);
} else if (retVal > 0) {
libp2p_logger_debug("bitswap_engine", "Something waiting on network for peer %s.\n", current_peer_entry->id);
unsigned char* buffer = NULL;
size_t buffer_len = 0;
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
// handle it
int retVal = ipfs_multistream_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode);
did_some_processing = 1;
if (retVal == -1) {
// there was a problem. Clean up
current_peer_entry->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
libp2p_session_context_free(current_peer_entry->sessionContext);
}
}
}
} else {
// if there is nothing on the queue, wait...
sleep(2);
if (current_peer_entry->is_local) {
//libp2p_logger_debug("bitswap_engine", "Local peer %s. Skipping.\n", current_peer_entry->id);
} else
libp2p_logger_debug("bitswap_engine", "We are not connected to this peer %s.\n", current_peer_entry->id);
}
// attempt to get queue and process
struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_queue_find_entry(context->peerRequestQueue, current_peer_entry);
if (entry != NULL) {
libp2p_logger_debug("bitswap_engine", "Processing queue for peer %s.\n", current_peer_entry->id);
// we have a queue. Do some queue processing
struct PeerRequest* item = entry->current;
if (item != NULL) {
// if there is something on the queue process it.
if (ipfs_bitswap_peer_request_process_entry(context, item))
did_some_processing = 1;
}
}
// get next peer (or reset to head entry)
if (current->next == NULL) {
current = context->ipfsNode->peerstore->head_entry;
if (!did_some_processing) {
// we did nothing in this run through the peerstore. sleep for a sec
sleep(1);
}
did_some_processing = 0;
}
else {
libp2p_logger_debug("bitswap_engine", "Moving on to the next peer.\n");
current = current->next;
}
}
return NULL;

View file

@ -50,22 +50,30 @@ int ipfs_bitswap_network_send_message(const struct BitswapContext* context, stru
* @param cid the cid to remove
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_network_remove_cid_from_queue(struct Libp2pVector* collection, struct Cid* cid) {
int ipfs_bitswap_network_adjust_cid_queue(struct Libp2pVector* collection, struct Cid* cid, int cancel) {
if (collection == NULL || cid == NULL)
return 0;
for(int i = 0; i < collection->total; collection++) {
const struct CidEntry* current = (const struct CidEntry*)libp2p_utils_vector_get(collection, i);
if (ipfs_cid_compare(current->cid, cid) == 0) {
libp2p_utils_vector_delete(collection, i);
if (cancel)
libp2p_utils_vector_delete(collection, i);
return 1;
}
}
// not found. Add it if we're not cancelling
if (!cancel) {
struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new();
cidEntry->cid = cid;
cidEntry->cancel = 0;
libp2p_utils_vector_add(collection, cidEntry);
}
return 0;
}
/***
* Handle a raw incoming bitswap message from the network
* @param node us
@ -133,14 +141,7 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc
ipfs_cid_free(cid);
return 0;
}
if (entry->cancel)
ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid);
else {
struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new();
cidEntry->cid = cid;
cidEntry->cancel = 0;
libp2p_utils_vector_add(queueEntry->current->cids_they_want, cidEntry);
}
ipfs_bitswap_network_adjust_cid_queue(queueEntry->current->cids_they_want, cid, entry->cancel);
}
}
return 1;

View file

@ -5,6 +5,7 @@
#include <stdlib.h>
#include "libp2p/conn/session.h"
#include "libp2p/utils/logger.h"
#include "ipfs/cid/cid.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
#include "ipfs/exchange/bitswap/message.h"
@ -191,6 +192,19 @@ int ipfs_bitswap_peer_request_something_to_do(struct PeerRequestEntry* entry) {
return 1;
if (ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want))
return 1;
// is there something waiting for us on the network?
if (request->peer->connection_type == CONNECTION_TYPE_CONNECTED) {
int retVal = request->peer->sessionContext->default_stream->peek(request->peer->sessionContext);
if (retVal < 0) {
libp2p_logger_debug("peer_request_queue", "Connection returned %d. Marking connection NOT CONNECTED.\n", retVal);
libp2p_peer_handle_connection_error(request->peer);
return 0;
}
if (retVal > 0) {
libp2p_logger_debug("peer_request_queue", "We have something to read. %d bytes.\n", retVal);
}
return retVal;
}
}
return 0;
}
@ -206,8 +220,8 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue*
pthread_mutex_lock(&queue->queue_mutex);
struct PeerRequestEntry* entry = queue->first;
if (entry != NULL) {
retVal = entry->current;
if (ipfs_bitswap_peer_request_something_to_do(entry)) {
retVal = entry->current;
// move to the end of the queue
if (queue->first->next != NULL) {
queue->first = queue->first->next;
@ -295,7 +309,7 @@ int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext*
* Handle a PeerRequest
* @param context the BitswapContext
* @param request the request to process
* @returns true(1) on succes, otherwise false(0)
* @returns true(1) if something was done, otherwise false(0)
*/
int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) {
// determine if we have enough information to continue