More implementation of bitswap

This commit is contained in:
John Jones 2017-07-27 12:05:41 -05:00
parent e1135fef3b
commit 73d7d5daed
27 changed files with 193 additions and 87 deletions

View file

@ -7,7 +7,7 @@ endif
LFLAGS =
DEPS =
OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o
OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o engine.o
%.o: %.c $(DEPS)
$(CC) -c -o $@ $< $(CFLAGS)

View file

@ -14,7 +14,7 @@
* @param sessionContext the context
* @returns an allocated Exchange structure
*/
struct Exchange* ipfs_bitswap_exchange_start(struct IpfsNode* ipfs_node) {
struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) {
struct Exchange* exchange = (struct Exchange*) malloc(sizeof(struct Exchange));
if (exchange != NULL) {
struct BitswapContext* bitswapContext = (struct BitswapContext*) malloc(sizeof(struct BitswapContext));
@ -22,15 +22,26 @@ struct Exchange* ipfs_bitswap_exchange_start(struct IpfsNode* ipfs_node) {
free(exchange);
return NULL;
}
exchange->exchangeContext = (void*) bitswapContext;
bitswapContext->bitswap_engine = ipfs_bitswap_engine_new();
if (bitswapContext->bitswap_engine == NULL) {
free(bitswapContext);
free(exchange);
return NULL;
}
bitswapContext->localWantlist = NULL;
bitswapContext->peerRequestQueue = NULL;
bitswapContext->ipfsNode = ipfs_node;
exchange->exchangeContext = (void*) bitswapContext;
exchange->IsOnline = ipfs_bitswap_is_online;
exchange->Close = ipfs_bitswap_close;
exchange->HasBlock = ipfs_bitswap_has_block;
exchange->GetBlock = ipfs_bitswap_get_block;
exchange->GetBlocks = ipfs_bitswap_get_blocks;
// Start the threads for the network
ipfs_bitswap_engine_start(bitswapContext);
}
//TODO: Start the threads for the network
return exchange;
}

View file

@ -1,35 +1,63 @@
#include <unistd.h>
#include "ipfs/exchange/bitswap/engine.h"
#include "ipfs/exchange/bitswap/wantlist_queue.h"
#include "ipfs/exchange/bitswap/peer_request_queue.h"
/***
* A separate thread that processes the queue
* @param context the context
* Implementation of the bitswap engine
*/
void ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist);
if (item != NULL) {
// if there is something on the queue process it.
ipfs_bitswap__wantlist_process_entry(context, item);
} else {
// if there is nothing on the queue, wait...
sleep(2);
}
/***
* Allocate resources for a BitswapEngine
* @returns a new struct BitswapEngine
*/
struct BitswapEngine* ipfs_bitswap_engine_new() {
struct BitswapEngine* engine = (struct BitswapEngine*) malloc(sizeof(struct BitswapEngine));
if (engine != NULL) {
engine->shutting_down = 0;
}
return engine;
}
/***
* Deallocate resources from struct BitswapEngine
* @param engine the engine to free
* @returns true(1)
*/
int ipfs_bitswap_engine_free(struct BitswapEngine* engine) {
free(engine);
return 1;
}
/***
* A separate thread that processes the queue
* @param context the context
*/
void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct BitswapPeerQueueEntry* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue);
struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist);
if (item != NULL) {
// if there is something on the queue process it.
ipfs_bitswap_wantlist_process_entry(context, item);
} else {
// if there is nothing on the queue, wait...
sleep(2);
}
}
return NULL;
}
/***
* A separate thread that processes the queue
* @param context the context
*/
void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
struct BitswapContext* context = (struct BitswapContext*)ctx;
// the loop
while (!context->bitswap_engine->shutting_down) {
struct PeerRequest* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue);
if (item != NULL) {
// if there is something on the queue process it.
ipfs_bitswap_peer_request_process_entry(context, item);
@ -38,6 +66,7 @@ void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
sleep(2);
}
}
return NULL;
}
/**
@ -69,8 +98,8 @@ int ipfs_bitswap_engine_start(const struct BitswapContext* context) {
int ipfs_bitswap_engine_stop(const struct BitswapContext* context) {
context->bitswap_engine->shutting_down = 1;
int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread);
int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread);
int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread, NULL);
int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread, NULL);
return !error1 && !error2;
}

View file

@ -584,7 +584,7 @@ int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_le
switch(field_no) {
case (1): {
// a Blocks entry that is just an array of bytes
struct Block* temp = ipfs_blocks_block_new();
struct Block* temp = ipfs_block_new();
if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp->data, &temp->data_length, &bytes_read)) {
return 0;
}

View file

@ -132,7 +132,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR
* @param queue the queue
* @returns the PeerRequest that should be handled next, or NULL if the queue is empty
*/
struct PeerRequest* ipfs_bitswap_peer_request_pop(struct PeerRequestQueue* queue) {
struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* queue) {
struct PeerRequest* retVal = NULL;
if (queue != NULL) {
pthread_mutex_lock(&queue->queue_mutex);
@ -185,3 +185,15 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct
// add to the block array
return 0;
}
/****
* Handle a PeerRequest
* @param context the BitswapContext
* @param request the request to process
* @returns true(1) on succes, otherwise false(0)
*/
int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) {
//TODO: Implement this method
return 0;
}

View file

@ -120,6 +120,26 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue
return NULL;
}
/***
* Pops the top one off the queue
*
* @param wantlist the list
* @returns the WantListQueueEntry
*/
struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_pop(struct WantListQueue* wantlist) {
struct WantListQueueEntry* entry = NULL;
if (wantlist->queue->total == 0)
return entry;
//TODO: This should be a linked list, not an array
pthread_mutex_lock(&wantlist->wantlist_mutex);
entry = (struct WantListQueueEntry*)libp2p_utils_vector_get(wantlist->queue, 0);
libp2p_utils_vector_delete(wantlist->queue, 0);
pthread_mutex_unlock(&wantlist->wantlist_mutex);
return entry;
}
/***
* Initialize a WantListQueueEntry
* @returns a new WantListQueueEntry