Adding bitswap request queue
This commit is contained in:
parent
2bb70b01be
commit
a63910e0d7
8 changed files with 314 additions and 1 deletions
24
cid/cid.c
24
cid/cid.c
|
@ -248,3 +248,27 @@ int ipfs_cid_cast(const unsigned char* incoming, size_t incoming_size, struct Ci
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare two cids
|
||||||
|
* @param a side A
|
||||||
|
* @param b side B
|
||||||
|
* @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal
|
||||||
|
*/
|
||||||
|
int ipfs_cid_compare(struct Cid* a, struct Cid* b) {
|
||||||
|
if (a->version != b->version) {
|
||||||
|
return b->version - a->version;
|
||||||
|
}
|
||||||
|
if (a->codec != b->codec) {
|
||||||
|
return ((int)b->codec - (int)a->codec);
|
||||||
|
}
|
||||||
|
if (a->hash_length != b->hash_length) {
|
||||||
|
return b->hash_length - a->hash_length;
|
||||||
|
}
|
||||||
|
for(size_t i = 0; i < a->hash_length; i++) {
|
||||||
|
if (a->hash[i] != b->hash[i]) {
|
||||||
|
return ((int)b->hash[i] - (int)a->hash[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ endif
|
||||||
|
|
||||||
LFLAGS =
|
LFLAGS =
|
||||||
DEPS =
|
DEPS =
|
||||||
OBJS = bitswap.o message.o network.o
|
OBJS = bitswap.o message.o network.o peer_request_queue.o
|
||||||
|
|
||||||
%.o: %.c $(DEPS)
|
%.o: %.c $(DEPS)
|
||||||
$(CC) -c -o $@ $< $(CFLAGS)
|
$(CC) -c -o $@ $< $(CFLAGS)
|
||||||
|
|
|
@ -70,6 +70,10 @@ int ipfs_bitswap_close(void* exchangeContext) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the Exchange->HasBlock method
|
* Implements the Exchange->HasBlock method
|
||||||
|
* Some notes from the GO version say that this is normally called by user
|
||||||
|
* interaction (i.e. user added a file).
|
||||||
|
* But this does not make sense right now, as the GO code looks like it
|
||||||
|
* adds the block to the blockstore. This still has to be sorted.
|
||||||
*/
|
*/
|
||||||
int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) {
|
int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) {
|
||||||
//TODO: Implement this method
|
//TODO: Implement this method
|
||||||
|
@ -80,6 +84,8 @@ int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the Exchange->GetBlock method
|
* Implements the Exchange->GetBlock method
|
||||||
|
* We're asking for this method to get the block from peers. Perhaps this should be
|
||||||
|
* taking in a pointer to a callback, as this could take a while (or fail).
|
||||||
*/
|
*/
|
||||||
int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) {
|
int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) {
|
||||||
// TODO: Implement this method
|
// TODO: Implement this method
|
||||||
|
|
165
exchange/bitswap/peer_request_queue.c
Normal file
165
exchange/bitswap/peer_request_queue.c
Normal file
|
@ -0,0 +1,165 @@
|
||||||
|
/***
|
||||||
|
* A queue for requests from remote peers
|
||||||
|
* NOTE: This should handle multiple threads
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include "ipfs/cid/cid.h"
|
||||||
|
#include "ipfs/exchange/bitswap/peer_request_queue.h"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate resources for a new PeerRequest
|
||||||
|
* @returns a new PeerRequest struct or NULL if there was a problem
|
||||||
|
*/
|
||||||
|
struct PeerRequest* ipfs_bitswap_peer_request_new() {
|
||||||
|
struct PeerRequest* request = (struct PeerRequest*) malloc(sizeof(struct PeerRequest));
|
||||||
|
if (request != NULL) {
|
||||||
|
request->peer_id = 0;
|
||||||
|
request->cid = NULL;
|
||||||
|
}
|
||||||
|
return request;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free resources from a PeerRequest
|
||||||
|
* @param request the request to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_free(struct PeerRequest* request) {
|
||||||
|
free(request);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate resources for a new queue
|
||||||
|
*/
|
||||||
|
struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new() {
|
||||||
|
struct PeerRequestQueue* queue = malloc(sizeof(struct PeerRequestQueue));
|
||||||
|
queue->first = NULL;
|
||||||
|
queue->last = NULL;
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free all resources related to the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_queue_free(struct PeerRequestQueue* queue) {
|
||||||
|
pthread_mutex_lock(&queue->queue_mutex);
|
||||||
|
struct PeerRequestEntry* current = queue->last;
|
||||||
|
while (current != NULL) {
|
||||||
|
ipfs_bitswap_peer_request_entry_free(current);
|
||||||
|
current = current->prior;
|
||||||
|
}
|
||||||
|
pthread_mutex_unlock(&queue->queue_mutex);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a peer request to the end of the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @param request the request
|
||||||
|
* @returns true(1) on success, otherwise false
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_queue_add(struct PeerRequestQueue* queue, struct PeerRequest* request) {
|
||||||
|
if (request != NULL) {
|
||||||
|
struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_entry_new();
|
||||||
|
entry->current = request;
|
||||||
|
pthread_mutex_lock(&queue->queue_mutex);
|
||||||
|
entry->prior = queue->last;
|
||||||
|
queue->last = entry;
|
||||||
|
pthread_mutex_unlock(&queue->queue_mutex);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a peer request from the queue, no mather where it is
|
||||||
|
* @param queue the queue
|
||||||
|
* @param request the request
|
||||||
|
* @returns true(1) on success, otherwise false(0)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_queue_remove(struct PeerRequestQueue* queue, struct PeerRequest* request) {
|
||||||
|
if (request != NULL) {
|
||||||
|
struct PeerRequestEntry* entry = ipfs_bitswap_peer_request_queue_find_entry(queue, request);
|
||||||
|
if (entry != NULL) {
|
||||||
|
pthread_mutex_lock(&queue->queue_mutex);
|
||||||
|
// remove the entry's link, and hook prior and next together
|
||||||
|
entry->prior->next = entry->next;
|
||||||
|
entry->prior = NULL;
|
||||||
|
entry->next = NULL;
|
||||||
|
ipfs_bitswap_peer_request_entry_free(entry);
|
||||||
|
pthread_mutex_unlock(&queue->queue_mutex);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds a PeerRequestEntry that contains the specified PeerRequest
|
||||||
|
* @param queue the queue to look through
|
||||||
|
* @param request what we're looking for
|
||||||
|
* @returns the PeerRequestEntry or NULL if not found
|
||||||
|
*/
|
||||||
|
struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request) {
|
||||||
|
if (request != NULL) {
|
||||||
|
struct PeerRequestEntry* current = queue->first;
|
||||||
|
while (current != NULL) {
|
||||||
|
if (current->current->peer_id == request->peer_id && ipfs_cid_compare(request->cid, current->current->cid) == 0) {
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pull a PeerRequest off the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @returns the PeerRequest that should be handled next, or NULL if the queue is empty
|
||||||
|
*/
|
||||||
|
struct PeerRequest* ipfs_bitswap_peer_request_pop(struct PeerRequestQueue* queue) {
|
||||||
|
struct PeerRequest* retVal = NULL;
|
||||||
|
if (queue != NULL) {
|
||||||
|
pthread_mutex_lock(&queue->queue_mutex);
|
||||||
|
struct PeerRequestEntry* entry = queue->first;
|
||||||
|
retVal = entry->current;
|
||||||
|
queue->first = queue->first->next;
|
||||||
|
pthread_mutex_unlock(&queue->queue_mutex);
|
||||||
|
ipfs_bitswap_peer_request_entry_free(entry);
|
||||||
|
}
|
||||||
|
return retVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Allocate resources for a PeerRequestEntry struct
|
||||||
|
* @returns the allocated struct or NULL if there was a problem
|
||||||
|
*/
|
||||||
|
struct PeerRequestEntry* ipfs_bitswap_peer_request_entry_new() {
|
||||||
|
struct PeerRequestEntry* entry = (struct PeerRequestEntry*) malloc(sizeof(struct PeerRequestEntry));
|
||||||
|
if (entry != NULL) {
|
||||||
|
entry->current = NULL;
|
||||||
|
entry->next = NULL;
|
||||||
|
entry->prior = NULL;
|
||||||
|
}
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Frees resources allocated
|
||||||
|
* NOTE: This does not free the embedded PeerRequest (should it?)
|
||||||
|
* @param entry the PeerRequestEntry to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry) {
|
||||||
|
entry->next = NULL;
|
||||||
|
entry->prior = NULL;
|
||||||
|
entry->current = NULL;
|
||||||
|
free(entry);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
|
@ -116,4 +116,12 @@ int ipfs_cid_set_len (struct CidSet *set);
|
||||||
unsigned char **ipfs_cid_set_keys (struct CidSet *set);
|
unsigned char **ipfs_cid_set_keys (struct CidSet *set);
|
||||||
int ipfs_cid_set_foreach (struct CidSet *set, int (*func)(struct Cid *));
|
int ipfs_cid_set_foreach (struct CidSet *set, int (*func)(struct Cid *));
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compare two cids
|
||||||
|
* @param a side A
|
||||||
|
* @param b side B
|
||||||
|
* @returns < 0 if side A is greater, > 0 if side B is greater, or 0 if equal
|
||||||
|
*/
|
||||||
|
int ipfs_cid_compare(struct Cid* a, struct Cid* b);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
92
include/ipfs/exchange/bitswap/peer_request_queue.h
Normal file
92
include/ipfs/exchange/bitswap/peer_request_queue.h
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
/***
|
||||||
|
* A queue for requests from remote peers
|
||||||
|
* NOTE: This should handle multiple threads
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
struct PeerRequest {
|
||||||
|
int peer_id;
|
||||||
|
struct Cid* cid;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct PeerRequestEntry {
|
||||||
|
struct PeerRequestEntry* prior;
|
||||||
|
struct PeerRequest* current;
|
||||||
|
struct PeerRequestEntry* next;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct PeerRequestQueue {
|
||||||
|
pthread_mutex_t queue_mutex;
|
||||||
|
struct PeerRequestEntry* first;
|
||||||
|
struct PeerRequestEntry* last;
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate resources for a new PeerRequest
|
||||||
|
* @returns a new PeerRequest struct or NULL if there was a problem
|
||||||
|
*/
|
||||||
|
struct PeerRequest* ipfs_bitswap_peer_request_new();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free resources from a PeerRequest
|
||||||
|
* @param request the request to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_free(struct PeerRequest* request);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Allocate resources for a new queue
|
||||||
|
*/
|
||||||
|
struct PeerRequestQueue* ipfs_bitswap_peer_request_queue_new();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Free all resources related to the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_queue_free(struct PeerRequestQueue* queue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a peer request to the end of the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @param request the request
|
||||||
|
* @returns true(1) on success, otherwise false
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_queue_add(struct PeerRequestQueue* queue, struct PeerRequest* request);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a peer request from the queue, no mather where it is
|
||||||
|
* @param queue the queue
|
||||||
|
* @param request the request
|
||||||
|
* @returns true(1) on success, otherwise false(0)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_quque_remove(struct PeerRequestQueue* queue, struct PeerRequest* request);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pull a PeerRequest off the queue
|
||||||
|
* @param queue the queue
|
||||||
|
* @returns the PeerRequest that should be handled next.
|
||||||
|
*/
|
||||||
|
struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* queue);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds a PeerRequestEntry that contains the specified PeerRequest
|
||||||
|
* @param queue the queue
|
||||||
|
* @param request what we're looking for
|
||||||
|
* @returns the PeerRequestEntry or NULL if not found
|
||||||
|
*/
|
||||||
|
struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerRequestQueue* queue, struct PeerRequest* request);
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Allocate resources for a PeerRequestEntry struct
|
||||||
|
* @returns the allocated struct or NULL if there was a problem
|
||||||
|
*/
|
||||||
|
struct PeerRequestEntry* ipfs_bitswap_peer_request_entry_new();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Frees resources allocated
|
||||||
|
* @param entry the PeerRequestEntry to free
|
||||||
|
* @returns true(1)
|
||||||
|
*/
|
||||||
|
int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry);
|
15
test/exchange/test_bitswap_request_queue.h
Normal file
15
test/exchange/test_bitswap_request_queue.h
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include "ipfs/exchange/bitswap/peer_request_queue.h"
|
||||||
|
|
||||||
|
/***
|
||||||
|
* Create a queue, do some work, free the queue, make sure valgrind likes it.
|
||||||
|
*/
|
||||||
|
int test_bitswap_peer_request_queue_new() {
|
||||||
|
// create a queue
|
||||||
|
struct PeerRequestQueue* queue = ipfs_bitswap_peer_request_queue_new();
|
||||||
|
struct PeerRequest* request = ipfs_bitswap_peer_request_new();
|
||||||
|
ipfs_bitswap_peer_request_queue_add(queue, request);
|
||||||
|
// clean up
|
||||||
|
ipfs_bitswap_peer_request_queue_free(queue);
|
||||||
|
return 1;
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
#include "cid/test_cid.h"
|
#include "cid/test_cid.h"
|
||||||
#include "cmd/ipfs/test_init.h"
|
#include "cmd/ipfs/test_init.h"
|
||||||
#include "exchange/test_bitswap.h"
|
#include "exchange/test_bitswap.h"
|
||||||
|
#include "exchange/test_bitswap_request_queue.h"
|
||||||
#include "flatfs/test_flatfs.h"
|
#include "flatfs/test_flatfs.h"
|
||||||
#include "merkledag/test_merkledag.h"
|
#include "merkledag/test_merkledag.h"
|
||||||
#include "node/test_node.h"
|
#include "node/test_node.h"
|
||||||
|
@ -34,6 +35,7 @@ int testit(const char* name, int (*func)(void)) {
|
||||||
|
|
||||||
const char* names[] = {
|
const char* names[] = {
|
||||||
"test_bitswap_new_free",
|
"test_bitswap_new_free",
|
||||||
|
"test_bitswap_peer_request_queue_new",
|
||||||
"test_cid_new_free",
|
"test_cid_new_free",
|
||||||
"test_cid_cast_multihash",
|
"test_cid_cast_multihash",
|
||||||
"test_cid_cast_non_multihash",
|
"test_cid_cast_non_multihash",
|
||||||
|
@ -85,6 +87,7 @@ const char* names[] = {
|
||||||
|
|
||||||
int (*funcs[])(void) = {
|
int (*funcs[])(void) = {
|
||||||
test_bitswap_new_free,
|
test_bitswap_new_free,
|
||||||
|
test_bitswap_peer_request_queue_new,
|
||||||
test_cid_new_free,
|
test_cid_new_free,
|
||||||
test_cid_cast_multihash,
|
test_cid_cast_multihash,
|
||||||
test_cid_cast_non_multihash,
|
test_cid_cast_non_multihash,
|
||||||
|
|
Loading…
Reference in a new issue