Beginnings of the multithreaded engine

This engine has 2 threads. One to process the request queue, the other
to gather up and build peer messages and send them.
This commit is contained in:
John Jones 2017-07-27 08:38:57 -05:00
parent 10aa932e08
commit e1135fef3b
12 changed files with 258 additions and 13 deletions

76
exchange/bitswap/engine.c Normal file
View file

@ -0,0 +1,76 @@
#include "ipfs/exchange/bitswap/engine.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
/***
* A separate thread that processes the queue
* @param context the context
*/
void ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist);
if (item != NULL) {
// if there is something on the queue process it.
ipfs_bitswap__wantlist_process_entry(context, item);
} else {
// if there is nothing on the queue, wait...
sleep(2);
}
}
}
/***
* A separate thread that processes the queue
* @param context the context
*/
void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct BitswapPeerQueueEntry* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue);
if (item != NULL) {
// if there is something on the queue process it.
ipfs_bitswap_peer_request_process_entry(context, item);
} else {
// if there is nothing on the queue, wait...
sleep(2);
}
}
}
/**
* Starts the bitswap engine that processes queue items. There
* should only be one of these per ipfs instance.
*
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_engine_start(const struct BitswapContext* context) {
context->bitswap_engine->shutting_down = 0;
// fire off the threads
if (pthread_create(&context->bitswap_engine->wantlist_processor_thread, NULL, ipfs_bitswap_engine_wantlist_processor_start, (void*)context)) {
return 0;
}
if (pthread_create(&context->bitswap_engine->peer_request_processor_thread, NULL, ipfs_bitswap_engine_peer_request_processor_start, (void*)context)) {
ipfs_bitswap_engine_stop(context);
return 0;
}
return 1;
}
/***
* Shut down the engine
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_engine_stop(const struct BitswapContext* context) {
context->bitswap_engine->shutting_down = 1;
int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread);
int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread);
return !error1 && !error2;
}

View file

@ -14,8 +14,12 @@
/**
* We received a BitswapMessage from the network
*/
/*
ipfs_bitswap_network_receive_message(struct BitswapContext* context) {
}
*/
/**
* We want to pop something off the queue to send to a peer.
* This can be a wantlist or blocks
* We want to pop something off the queue
*/

View file

@ -4,6 +4,7 @@
*/
#include <stdlib.h>
#include "libp2p/conn/session.h"
#include "ipfs/cid/cid.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
@ -14,8 +15,9 @@
struct PeerRequest* ipfs_bitswap_peer_request_new() {
struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest));
if (request != NULL) {
request->peer_id = 0;
request->cid = NULL;
request->cids = NULL;
request->blocks = NULL;
request->context = NULL;
}
return request;
}
@ -116,9 +118,9 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR
if (request != NULL) {
struct PeerRequestEntry* current = queue->first;
while (current != NULL) {
if (current->current->peer_id == request->peer_id && ipfs_cid_compare(request->cid, current->current->cid) == 0) {
if (libp2p_session_context_compare(current->current->context, request->context) == 0)
return current;
}
current = current->next;
}
}
return NULL;
@ -171,3 +173,15 @@ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) {
return 1;
}
/***
* Add a block to the appropriate peer's queue
* @param queue the queue
* @param who the session context that identifies the peer
* @param block the block
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block) {
// find the right entry
// add to the block array
return 0;
}

View file

@ -2,6 +2,11 @@
#include "libp2p/conn/session.h"
#include "libp2p/utils/vector.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
/**
* Implementation of the WantlistQueue
*/
/**
* remove this session from the lists of sessions that are looking for this WantListQueueEntry
@ -126,6 +131,7 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_entry_new() {
entry->cid = NULL;
entry->priority = 0;
entry->sessionsRequesting = NULL;
entry->attempts = 0;
}
return entry;
}
@ -173,3 +179,86 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const
}
}
/**
* determine if any of the sessions are referring to the local node
* @param sessions a vector of WantlistSession
* @returns true(1) if any of the sessions are local, false otherwise
*/
int ipfs_bitswap_wantlist_local_request(struct Libp2pVector* sessions)
{
for(int i = 0; i < sessions->total; i++) {
struct WantListSession* curr = (struct WantListSession*) libp2p_utils_vector_get(sessions, i);
if (curr != NULL && curr->type == WANTLIST_SESSION_TYPE_LOCAL)
return 1;
}
return 0;
}
/***
* Attempt to retrieve a block from the local blockstore
*
* @param context the BitswapContext
* @param cid the id to look for
* @param block where to put the results
* @returns true(1) if found, false(0) otherwise
*/
int ipfs_bitswap_wantlist_get_block_locally(struct BitswapContext* context, struct Cid* cid, struct Block** block) {
return context->ipfsNode->blockstore->Get(context->ipfsNode->blockstore->blockstoreContext, cid, block);
}
/***
* Retrieve a block. The only information we have is the cid
*
* This will ask the network for who has the file, using the router.
* It will then ask the specific nodes for the file. This method
* does not queue anything. It actually does the work.
*
* @param context the BitswapContext
* @param cid the id of the file
* @param block where to put the results
* @returns true(1) if found, false(0) otherwise
*/
int ipfs_bitswap_wantlist_get_block_remote(struct BitswapContext* context, struct Cid* cid, struct Block** block) {
//TODO: Implement this workhorse of a method
return 0;
}
/**
* Called by the Bitswap engine, this processes an item on the WantListQueue
* @param context the context
* @param entry the WantListQueueEntry
* @returns true(1) on success, false(0) if not.
*/
int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct WantListQueueEntry* entry) {
int local_request = ipfs_bitswap_wantlist_local_request(entry->sessionsRequesting);
int have_local = ipfs_bitswap_wantlist_get_block_locally(context, entry->cid, &entry->block);
// should we go get it?
if (!local_request && !have_local) {
return 0;
}
if (local_request && !have_local) {
if (!ipfs_bitswap_wantlist_get_block_remote(context, entry->cid, &entry->block)) {
// if we were unsuccessful in retrieving it, put it back in the queue?
// I don't think so. But I'm keeping this counter here until we have
// a final decision. Maybe lower the priority?
entry->attempts++;
return 0;
}
}
if (entry->block != NULL) {
// okay we have the block.
// fulfill the requests
for(size_t i = 0; i < entry->sessionsRequesting->total; i++) {
// TODO: Review this code.
struct WantListSession* session = (struct WantListSession*) libp2p_utils_vector_get(entry->sessionsRequesting, i);
if (session->type == WANTLIST_SESSION_TYPE_LOCAL) {
context->ipfsNode->exchange->HasBlock(context->ipfsNode->exchange, entry->block);
} else {
struct SessionContext* sessionContext = (struct SessionContext*) session->context;
ipfs_bitswap_peer_request_queue_fill(context->peerRequestQueue, sessionContext, entry->block);
}
}
}
return 0;
}

View file

@ -16,6 +16,9 @@ struct Blockstore {
struct BlockstoreContext* blockstoreContext;
int (*Delete)(const struct BlockstoreContext* context, struct Cid* cid);
int (*Has)(const struct BlockstoreContext* context, struct Cid* cid);
/**
* Retrieve a block from the blockstore
*/
int (*Get)(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block);
int (*Put)(const struct BlockstoreContext* context, struct Block* block);
};

View file

@ -1,11 +1,12 @@
#pragma once
#include "libp2p/peer/peerstore.h"
#include "libp2p/peer/providerstore.h"
#include "ipfs/blocks/blockstore.h"
#include "ipfs/exchange/exchange.h"
#include "ipfs/repo/config/identity.h"
#include "ipfs/repo/fsrepo/fs_repo.h"
#include "ipfs/routing/routing.h"
#include "libp2p/peer/peerstore.h"
#include "libp2p/peer/providerstore.h"
enum NodeMode { MODE_OFFLINE, MODE_ONLINE };
@ -17,6 +18,7 @@ struct IpfsNode {
struct ProviderStore* providerstore;
struct IpfsRouting* routing;
struct Blockstore* blockstore;
struct Exchange* exchange;
//struct Pinner pinning; // an interface
//struct Mount** mounts;
// TODO: Add more here

View file

@ -7,10 +7,13 @@
#include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/engine.h"
struct BitswapContext {
struct IpfsNode* ipfsNode;
struct WantListQueue* localWantlist;
struct PeerRequestQueue* peerRequestQueue;
struct BitswapEngine* bitswap_engine;
};
/**

View file

@ -0,0 +1,29 @@
#pragma once
#include <pthread.h>
//#include "ipfs/exchange/bitswap/bitswap.h" we must forward declare here, as BitswapContext has a reference to BitswapEngine
struct BitswapContext;
struct BitswapEngine {
int shutting_down;
pthread_t wantlist_processor_thread;
pthread_t peer_request_processor_thread;
};
/**
* Starts the bitswap engine that processes queue items. There
* should only be one of these per ipfs instance.
*
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_engine_start(const struct BitswapContext* context);
/***
* Shut down the engine
*
* @param context the context
* @returns true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_engine_stop(const struct BitswapContext* context);

View file

@ -1,13 +1,16 @@
/***
* A queue for requests from remote peers
* NOTE: This should handle multiple threads
* A queue for requests to/from remote peers
* NOTE: This must handle multiple threads
*/
#include <pthread.h>
#include "ipfs/blocks/block.h"
struct PeerRequest {
int peer_id;
struct Cid* cid;
pthread_mutex_t request_mutex;
struct SessionContext* context;
struct Libp2pVector* cids;
struct Libp2pVector* blocks;
};
struct PeerRequestEntry {
@ -37,6 +40,7 @@ int ipfs_bitswap_peer_request_free(struct PeerRequest* request);
/**
* Allocate resources for a new queue
* @returns a new PeerRequestQueue
*/
struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new();
@ -78,6 +82,15 @@ struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue*
*/
struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request);
/***
* Add a block to the appropriate peer's queue
* @param queue the queue
* @param who the session context that identifies the peer
* @param block the block
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct SessionContext* who, struct Block* block);
/***
* Allocate resources for a PeerRequestEntry struct
* @returns the allocated struct or NULL if there was a problem

View file

@ -22,7 +22,7 @@ struct WantListQueueEntry* ipfs_bitswap_want_manager_add(const struct BitswapCon
int ipfs_bitswap_want_manager_received(const struct BitswapContext* context, const struct Cid* cid);
/***
* retrieve a block from the WantManager. NOTE: a call to want_manager_received should be done first
* retrieve a block from the WantManager.
* @param context the context
* @param cid the Cid to get
* @param block a pointer to the block that will be allocated

View file

@ -8,6 +8,7 @@
#include <pthread.h>
#include "ipfs/cid/cid.h"
#include "ipfs/blocks/block.h"
#include "ipfs/exchange/bitswap/bitswap.h"
enum WantListSessionType { WANTLIST_SESSION_TYPE_LOCAL, WANTLIST_SESSION_TYPE_REMOTE };
@ -22,6 +23,7 @@ struct WantListQueueEntry {
// a vector of WantListSessions
struct Libp2pVector* sessionsRequesting;
struct Block* block;
int attempts;
};
struct WantListQueue {
@ -88,3 +90,10 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue
*/
int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const struct WantListSession* b);
/**
* Called by the Bitswap engine, this processes an item on the WantListQueue
* @param context the context
* @param entry the WantListQueueEntry
* @returns true(1) on success, false(0) if not.
*/
int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct WantListQueueEntry* entry);

View file

@ -28,6 +28,8 @@ int test_bitswap_peer_request_queue_new() {
}
int test_bitswap_peer_request_queue_find() {
return 0;
/*
int retVal = 0;
struct PeerRequestQueue* queue = NULL;
struct PeerRequest* request1 = NULL;
@ -72,4 +74,5 @@ int test_bitswap_peer_request_queue_find() {
// clean up
ipfs_bitswap_peer_request_queue_free(queue);
return retVal;
*/
}