code cleanup

yamux
jmjatlanta 2017-08-02 08:53:34 -05:00
parent 0638996684
commit 986d054c6c
4 changed files with 55 additions and 35 deletions

View File

@ -5,6 +5,7 @@
* For a somewhat accurate diagram of how this may work, @see https://github.com/ipfs/js-ipfs-bitswap
*/
#include "libp2p/utils/logger.h"
#include "ipfs/exchange/bitswap/network.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
@ -104,23 +105,16 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc
ipfs_bitswap_message_free(message);
return 0;
}
struct Libp2pPeer* temp_peer = libp2p_peer_new();
temp_peer->id_size = strlen(sessionContext->remote_peer_id);
temp_peer->id = malloc(temp_peer->id_size);
if (temp_peer->id == NULL) {
libp2p_peer_free(temp_peer);
ipfs_bitswap_message_free(message);
return 0;
}
memcpy(temp_peer->id, sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id));
struct Libp2pPeer* peer = libp2p_peerstore_get_or_add_peer(node->peerstore, temp_peer);
libp2p_peer_free(temp_peer);
struct Libp2pPeer* peer = libp2p_peerstore_get_or_add_peer_by_id(node->peerstore, (unsigned char*)sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id));
if (peer == NULL) {
libp2p_logger_error("bitswap_network", "Unable to find or add peer %s of length %d to peerstore.\n", sessionContext->remote_peer_id, strlen(sessionContext->remote_peer_id));
ipfs_bitswap_message_free(message);
return 0;
}
// find the queue
struct PeerRequestEntry* queueEntry = ipfs_bitswap_peer_request_queue_find_entry(bitswapContext->peerRequestQueue, peer);
if (queueEntry == NULL) {
// add the queue
struct PeerRequest* peerRequest =ipfs_bitswap_peer_request_new();
peerRequest->peer = peer;
ipfs_bitswap_peer_request_queue_add(bitswapContext->peerRequestQueue, peerRequest);
@ -134,17 +128,18 @@ int ipfs_bitswap_network_handle_message(const struct IpfsNode* node, const struc
struct WantlistEntry* entry = (struct WantlistEntry*) libp2p_utils_vector_get(message->wantlist->entries, i);
// turn the "block" back into a cid
struct Cid* cid = NULL;
if (!ipfs_cid_protobuf_decode(entry->block, entry->block_size, &cid)) {
if (!ipfs_cid_protobuf_decode(entry->block, entry->block_size, &cid) || cid->hash_length == 0) {
libp2p_logger_error("bitswap_network", "Message had invalid CID\n");
ipfs_cid_free(cid);
return 0;
}
if (entry->cancel)
ipfs_bitswap_network_remove_cid_from_queue(queueEntry->current->cids_they_want, cid);
else {
struct CidEntry* entry = ipfs_bitswap_peer_request_cid_entry_new();
entry->cid = cid;
entry->cancel = 0;
libp2p_utils_vector_add(queueEntry->current->cids_they_want, entry);
struct CidEntry* cidEntry = ipfs_bitswap_peer_request_cid_entry_new();
cidEntry->cid = cid;
cidEntry->cancel = 0;
libp2p_utils_vector_add(queueEntry->current->cids_they_want, cidEntry);
}
}
}

View File

@ -160,6 +160,39 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR
return NULL;
}
/***
* Determine if any of the cids in the list are waiting to be filled
* @param cidEntries a Vector of CidEntry objects
* @returns true(1) if we have some waiting, false(0) otherwise
*/
int ipfs_bitswap_peer_request_cids_waiting(struct Libp2pVector* cidEntries) {
if (cidEntries == NULL)
return 0;
for(int i = 0; i < cidEntries->total; i++) {
const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i);
if (entry != NULL && !entry->cancel)
return 1;
}
return 0;
}
/***
* Determine if there is something to process in this request
* @param entry the entry to look at
* @returns true(1) if there is something to do
*/
int ipfs_bitswap_peer_request_something_to_do(struct PeerRequestEntry* entry) {
if (entry != NULL) {
struct PeerRequest* request = entry->current;
if (request->blocks_we_want_to_send->total > 0)
return 1;
if (request->cids_we_want->total > 0)
return 1;
if (ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want))
return 1;
}
return 0;
}
/**
* Pull a PeerRequest off the queue
@ -171,13 +204,17 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue*
if (queue != NULL) {
pthread_mutex_lock(&queue->queue_mutex);
struct PeerRequestEntry* entry = queue->first;
if (entry != NULL) {
if (entry != NULL && ipfs_bitswap_peer_request_something_to_do(entry)) {
retVal = entry->current;
queue->first = queue->first->next;
}
pthread_mutex_unlock(&queue->queue_mutex);
// disable temporarily
// JMJ Debugging
/*
if (entry != NULL)
ipfs_bitswap_peer_request_entry_free(entry);
*/
}
return retVal;
}
@ -246,22 +283,6 @@ int ipfs_bitswap_peer_request_get_blocks_they_want(const struct BitswapContext*
return 0;
}
/***
* Determine if any of the cids in the list are waiting to be filled
* @param cidEntries a Vector of CidEntry objects
* @returns true(1) if we have some waiting, false(0) otherwise
*/
int ipfs_peer_request_cids_waiting(struct Libp2pVector* cidEntries) {
if (cidEntries == NULL)
return 0;
for(int i = 0; i < cidEntries->total; i++) {
const struct CidEntry* entry = (const struct CidEntry*)libp2p_utils_vector_get(cidEntries, i);
if (entry != NULL && !entry->cancel)
return 1;
}
return 0;
}
/****
* Handle a PeerRequest
* @param context the BitswapContext
@ -281,7 +302,7 @@ int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context
}
// determine if we're connected
int connected = request->peer->is_local || request->peer->connection_type == CONNECTION_TYPE_CONNECTED;
int need_to_connect = request->cids_we_want->total != 0 || ipfs_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0;
int need_to_connect = request->cids_we_want->total != 0 || ipfs_bitswap_peer_request_cids_waiting(request->cids_they_want) || request->blocks_we_want_to_send->total != 0;
// determine if we need to connect
if (need_to_connect) {

View File

@ -604,7 +604,7 @@ int test_routing_retrieve_large_file() {
stat(temp_file_name, &buf);
if (buf.st_size != 1000000) {
fprintf(stderr, "File size should be 1000000, but is %lu\n", buf.st_size);
fprintf(stderr, "File size should be 1000000, but is %lu\n", (unsigned long)buf.st_size);
goto exit;
}

View File

@ -306,6 +306,10 @@ static void thread_hold(int sig_id) {
}
#if defined(__APPLE__) && defined(__MACH__)
int pthread_setname_np(const char* name);
#endif
/* What each thread is doing
*
* In principle this is an endless loop. The only time this loop gets interuppted is once