Connecting bitswap and blockstore

This commit is contained in:
John Jones 2017-07-24 14:56:30 -05:00
parent 4368e052e2
commit 9924d5dcf7
12 changed files with 192 additions and 33 deletions

View file

@ -4,16 +4,54 @@
#include "libp2p/crypto/encoding/base32.h" #include "libp2p/crypto/encoding/base32.h"
#include "ipfs/cid/cid.h" #include "ipfs/cid/cid.h"
#include "ipfs/blocks/block.h" #include "ipfs/blocks/block.h"
#include "ipfs/blocks/blockstore.h"
#include "ipfs/datastore/ds_helper.h" #include "ipfs/datastore/ds_helper.h"
#include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/repo/fsrepo/fs_repo.h"
#include "libp2p/os/utils.h" #include "libp2p/os/utils.h"
/***
* Create a new Blockstore struct
* @param fs_repo the FSRepo to use
* @returns the new Blockstore struct, or NULL if there was a problem.
*/
struct Blockstore* ipfs_blockstore_new(const struct FSRepo* fs_repo) {
struct Blockstore* blockstore = (struct Blockstore*) malloc(sizeof(struct Blockstore));
if(blockstore != NULL) {
blockstore->blockstoreContext = (struct BlockstoreContext*) malloc(sizeof(struct BlockstoreContext));
if (blockstore->blockstoreContext == NULL) {
free(blockstore);
return NULL;
}
blockstore->blockstoreContext->fs_repo = fs_repo;
blockstore->Delete = ipfs_blockstore_delete;
blockstore->Get = ipfs_blockstore_get;
blockstore->Has = ipfs_blockstore_has;
blockstore->Put = ipfs_blockstore_put;
}
return blockstore;
}
/**
* Release resources of a Blockstore struct
* @param blockstore the struct to free
* @returns true(1)
*/
int ipfs_blockstore_free(struct Blockstore* blockstore) {
if (blockstore != NULL) {
if (blockstore->blockstoreContext != NULL)
free(blockstore->blockstoreContext);
free(blockstore);
}
return 1;
}
/** /**
* Delete a block based on its Cid * Delete a block based on its Cid
* @param cid the Cid to look for * @param cid the Cid to look for
* @param returns true(1) on success * @param returns true(1) on success
*/ */
int ipfs_blockstore_delete(struct Cid* cid, struct FSRepo* fs_repo) { int ipfs_blockstore_delete(const struct BlockstoreContext* context, struct Cid* cid) {
return 0; return 0;
} }
@ -22,7 +60,7 @@ int ipfs_blockstore_delete(struct Cid* cid, struct FSRepo* fs_repo) {
* @param cid the Cid to look for * @param cid the Cid to look for
* @returns true(1) if found * @returns true(1) if found
*/ */
int ipfs_blockstore_has(struct Cid* cid, struct FSRepo* fs_repo) { int ipfs_blockstore_has(const struct BlockstoreContext* context, struct Cid* cid) {
return 0; return 0;
} }
@ -74,11 +112,11 @@ char* ipfs_blockstore_path_get(const struct FSRepo* fs_repo, const char* filenam
* @param block where to put the data to be returned * @param block where to put the data to be returned
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_blockstore_get(const unsigned char* hash, size_t hash_size, struct Block** block, const struct FSRepo* fs_repo) { int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block) {
// get datastore key, which is a base32 key of the multihash // get datastore key, which is a base32 key of the multihash
unsigned char* key = ipfs_blockstore_hash_to_base32(hash, hash_size); unsigned char* key = ipfs_blockstore_hash_to_base32(cid->hash, cid->hash_length);
char* filename = ipfs_blockstore_path_get(fs_repo, (char*)key); char* filename = ipfs_blockstore_path_get(context->fs_repo, (char*)key);
size_t file_size = os_utils_file_size(filename); size_t file_size = os_utils_file_size(filename);
unsigned char buffer[file_size]; unsigned char buffer[file_size];
@ -89,6 +127,8 @@ int ipfs_blockstore_get(const unsigned char* hash, size_t hash_size, struct Bloc
int retVal = ipfs_blocks_block_protobuf_decode(buffer, bytes_read, block); int retVal = ipfs_blocks_block_protobuf_decode(buffer, bytes_read, block);
(*block)->cid = cid;
free(key); free(key);
free(filename); free(filename);
@ -100,7 +140,7 @@ int ipfs_blockstore_get(const unsigned char* hash, size_t hash_size, struct Bloc
* @param block the block to store * @param block the block to store
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_blockstore_put(struct Block* block, struct FSRepo* fs_repo) { int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block) {
// from blockstore.go line 118 // from blockstore.go line 118
int retVal = 0; int retVal = 0;
@ -123,7 +163,7 @@ int ipfs_blockstore_put(struct Block* block, struct FSRepo* fs_repo) {
} }
// now write byte array to file // now write byte array to file
char* filename = ipfs_blockstore_path_get(fs_repo, (char*)key); char* filename = ipfs_blockstore_path_get(context->fs_repo, (char*)key);
if (filename == NULL) { if (filename == NULL) {
free(key); free(key);
return 0; return 0;

View file

@ -98,7 +98,7 @@ int ipfs_cid_protobuf_decode(unsigned char* buffer, size_t buffer_length, struct
* @param cid where to put the results * @param cid where to put the results
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_cid_new(int version, unsigned char* hash, size_t hash_length, const char codec, struct Cid** ptrToCid) { int ipfs_cid_new(int version, const unsigned char* hash, size_t hash_length, const char codec, struct Cid** ptrToCid) {
// allocate memory // allocate memory
*ptrToCid = (struct Cid*)malloc(sizeof(struct Cid)); *ptrToCid = (struct Cid*)malloc(sizeof(struct Cid));
struct Cid* cid = *ptrToCid; struct Cid* cid = *ptrToCid;

View file

@ -40,6 +40,7 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) {
local_node->identity = fs_repo->config->identity; local_node->identity = fs_repo->config->identity;
local_node->peerstore = libp2p_peerstore_new(local_node->identity->peer_id); local_node->peerstore = libp2p_peerstore_new(local_node->identity->peer_id);
local_node->providerstore = libp2p_providerstore_new(); local_node->providerstore = libp2p_providerstore_new();
local_node->blockstore = ipfs_blockstore_new(fs_repo);
local_node->mode = MODE_OFFLINE; local_node->mode = MODE_OFFLINE;
local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key, NULL); local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key, NULL);

View file

@ -2,6 +2,7 @@
* Methods for the Bitswap exchange * Methods for the Bitswap exchange
*/ */
#include <stdlib.h> #include <stdlib.h>
#include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h" #include "ipfs/exchange/exchange.h"
#include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/exchange/bitswap/bitswap.h"
#include "ipfs/exchange/bitswap/message.h" #include "ipfs/exchange/bitswap/message.h"
@ -11,7 +12,7 @@
* @param sessionContext the context * @param sessionContext the context
* @returns an allocated Exchange structure * @returns an allocated Exchange structure
*/ */
struct Exchange* ipfs_bitswap_new(struct SessionContext* sessionContext) { struct Exchange* ipfs_bitswap_exchange_start(struct SessionContext* sessionContext, struct IpfsNode* ipfs_node) {
struct Exchange* exchange = (struct Exchange*) malloc(sizeof(struct Exchange)); struct Exchange* exchange = (struct Exchange*) malloc(sizeof(struct Exchange));
if (exchange != NULL) { if (exchange != NULL) {
struct BitswapContext* bitswapContext = (struct BitswapContext*) malloc(sizeof(struct BitswapContext)); struct BitswapContext* bitswapContext = (struct BitswapContext*) malloc(sizeof(struct BitswapContext));
@ -21,13 +22,14 @@ struct Exchange* ipfs_bitswap_new(struct SessionContext* sessionContext) {
} }
exchange->exchangeContext = (void*) bitswapContext; exchange->exchangeContext = (void*) bitswapContext;
bitswapContext->sessionContext = sessionContext; bitswapContext->sessionContext = sessionContext;
//TODO: fill in the exchangeContext bitswapContext->ipfsNode = ipfs_node;
exchange->IsOnline = ipfs_bitswap_is_online; exchange->IsOnline = ipfs_bitswap_is_online;
exchange->Close = ipfs_bitswap_close; exchange->Close = ipfs_bitswap_close;
exchange->HasBlock = ipfs_bitswap_has_block; exchange->HasBlock = ipfs_bitswap_has_block;
exchange->GetBlock = ipfs_bitswap_get_block; exchange->GetBlock = ipfs_bitswap_get_block;
exchange->GetBlocks = ipfs_bitswap_get_blocks; exchange->GetBlocks = ipfs_bitswap_get_blocks;
} }
//TODO: Start the threads for the network
return exchange; return exchange;
} }
@ -39,6 +41,7 @@ struct Exchange* ipfs_bitswap_new(struct SessionContext* sessionContext) {
int ipfs_bitswap_free(struct Exchange* exchange) { int ipfs_bitswap_free(struct Exchange* exchange) {
if (exchange != NULL) { if (exchange != NULL) {
if (exchange->exchangeContext != NULL) { if (exchange->exchangeContext != NULL) {
ipfs_bitswap_close(exchange->exchangeContext);
free(exchange->exchangeContext); free(exchange->exchangeContext);
} }
free(exchange); free(exchange);
@ -88,7 +91,13 @@ int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) {
* taking in a pointer to a callback, as this could take a while (or fail). * taking in a pointer to a callback, as this could take a while (or fail).
*/ */
int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) { int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) {
// TODO: Implement this method struct BitswapContext* bitswapContext = (struct BitswapContext*)exchangeContext;
if (bitswapContext != NULL) {
// check locally first
if (bitswapContext->ipfsNode->blockstore->Get(bitswapContext->ipfsNode->blockstore->blockstoreContext, cid, block))
return 1;
// now ask the network
}
return 0; return 0;
} }
@ -99,16 +108,3 @@ int ipfs_bitswap_get_blocks(void* exchangeContext, struct Libp2pVector* Cids, st
// TODO: Implement this method // TODO: Implement this method
return 0; return 0;
} }
/***
* Receive a BitswapMessage from a peer.
* @param exchangeContext the context
* @param peer_id the origin
* @param peer_id_size the size of the peer_id
* @param message the message
* @returns true(1) on success, otherwise false(0)
*/
int ipfs_bitswap_receive_message(void* exchangeContext, unsigned char* peer_id, int peer_id_size, struct BitswapMessage* message) {
return 0;
}

View file

@ -1,8 +1,21 @@
/*** /***
* This implements the BitswapNetwork. Members of this network can fill requests and * This implements the BitswapNetwork. Members of this network can fill requests and
* smartly handle queues of local and remote requests. * smartly handle queues of local and remote requests.
*
* For a somewhat accurate diagram of how this may work, @see https://github.com/ipfs/js-ipfs-bitswap
*/ */
#include "ipfs/exchange/bitswap/network.h" #include "ipfs/exchange/bitswap/network.h"
/***
* The main loop
*/
/**
* We received a BitswapMessage from the network
*/
/**
* We want to pop something off the queue to send to a peer.
* This can be a wantlist or blocks
*/

View file

@ -8,34 +8,63 @@
#include "ipfs/cid/cid.h" #include "ipfs/cid/cid.h"
#include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/repo/fsrepo/fs_repo.h"
struct BlockstoreContext {
const struct FSRepo* fs_repo;
};
struct Blockstore {
struct BlockstoreContext* blockstoreContext;
int (*Delete)(const struct BlockstoreContext* context, struct Cid* cid);
int (*Has)(const struct BlockstoreContext* context, struct Cid* cid);
int (*Get)(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block);
int (*Put)(const struct BlockstoreContext* context, struct Block* block);
};
/***
* Create a new Blockstore struct
* @param fs_repo the FSRepo to use
* @returns the new Blockstore struct, or NULL if there was a problem.
*/
struct Blockstore* ipfs_blockstore_new(const struct FSRepo* fs_repo);
/**
* Release resources of a Blockstore struct
* @param blockstore the struct to free
* @returns true(1)
*/
int ipfs_blockstore_free(struct Blockstore* blockstore);
/** /**
* Delete a block based on its Cid * Delete a block based on its Cid
* @param context the context
* @param cid the Cid to look for * @param cid the Cid to look for
* @param returns true(1) on success * @param returns true(1) on success
*/ */
int ipfs_blockstore_delete(struct Cid* cid, struct FSRepo* fs_repo); int ipfs_blockstore_delete(const struct BlockstoreContext* context, struct Cid* cid);
/*** /***
* Determine if the Cid can be found * Determine if the Cid can be found
* @param context the context
* @param cid the Cid to look for * @param cid the Cid to look for
* @returns true(1) if found * @returns true(1) if found
*/ */
int ipfs_blockstore_has(struct Cid* cid, struct FSRepo* fs_repo); int ipfs_blockstore_has(const struct BlockstoreContext* context, struct Cid* cid);
/*** /***
* Find a block based on its Cid * Find a block based on its Cid
* @param context the context
* @param cid the Cid to look for * @param cid the Cid to look for
* @param block where to put the data to be returned * @param block where to put the data to be returned
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_blockstore_get(const unsigned char* hash, size_t hash_length, struct Block** block, const struct FSRepo* fs_repo); int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block);
/*** /***
* Put a block in the blockstore * Put a block in the blockstore
* @param block the block to store * @param block the block to store
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_blockstore_put(struct Block* block, const struct FSRepo* fs_repo); int ipfs_blockstore_put(const struct BlockstoreContext* context, struct Block* block);
/*** /***
* Put a struct UnixFS in the blockstore * Put a struct UnixFS in the blockstore

View file

@ -72,7 +72,7 @@ size_t ipfs_cid_protobuf_encode_size(struct Cid* incoming);
* @param cid where to put the results * @param cid where to put the results
* @returns true(1) on success * @returns true(1) on success
*/ */
int ipfs_cid_new(int version, unsigned char* hash, size_t hash_length, const char codec, struct Cid** cid); int ipfs_cid_new(int version, const unsigned char* hash, size_t hash_length, const char codec, struct Cid** cid);
/*** /***
* Free the resources from a Cid * Free the resources from a Cid

View file

@ -1,5 +1,6 @@
#pragma once #pragma once
#include "ipfs/blocks/blockstore.h"
#include "ipfs/repo/config/identity.h" #include "ipfs/repo/config/identity.h"
#include "ipfs/repo/fsrepo/fs_repo.h" #include "ipfs/repo/fsrepo/fs_repo.h"
#include "ipfs/routing/routing.h" #include "ipfs/routing/routing.h"
@ -15,6 +16,7 @@ struct IpfsNode {
struct Peerstore* peerstore; struct Peerstore* peerstore;
struct ProviderStore* providerstore; struct ProviderStore* providerstore;
struct IpfsRouting* routing; struct IpfsRouting* routing;
struct Blockstore* blockstore;
//struct Pinner pinning; // an interface //struct Pinner pinning; // an interface
//struct Mount** mounts; //struct Mount** mounts;
// TODO: Add more here // TODO: Add more here

View file

@ -1,13 +1,68 @@
/***
* Bitswap implements the exchange "interface"
* @see ../exchange.h
*/
#include "ipfs/core/ipfs_node.h"
#include "ipfs/exchange/exchange.h"
struct BitswapContext { struct BitswapContext {
struct SessionContext* sessionContext; struct SessionContext* sessionContext;
struct IpfsNode* ipfsNode;
}; };
/**
* Start up the bitswap exchange
* @param sessionContext the context
* @returns an Exchange struct that refers to the exchange
*/
struct Exchange* ipfs_bitswap_exchange_start(struct SessionContext* sessionContext, struct IpfsNode* ipfsNode);
/***
* These are the implementation methods for the exchange "Interface"
*/
/***
* Checks to see if the Bitswap service is online
* @param exhcnageContext a pointer to a BitswapContext
* @reutrns true(1) if online, false(0) otherwise.
*/
int ipfs_bitswap_is_online(void* exchangeContext); int ipfs_bitswap_is_online(void* exchangeContext);
/***
* Closes down the Bitswap network
* @param exchangeContext a pointer to a BitswapContext
* @returns true(1)
*/
int ipfs_bitswap_close(void* exchangeContext); int ipfs_bitswap_close(void* exchangeContext);
/****
* Notify the BitswapNetwork that we have this block
* @param exchangeContext a pointer to a BitswapContext
* @block the block that we have
* @reutrns true(1) if successful, false(0) if not.
*/
int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block); int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block);
/**
* Retrieve a block from the BitswapNetwork
* Note: This may pull the file from the local blockstore.
* Note: If false(0) is returned, block will be NULL
*
* @param exchangeContext a pointer to a BitswapContext
* @param cid the Cid of the block we're looking for
* @param block a pointer to the block when we find it.
* @returns true(1) on success, false(0) otherwise.
*/
int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block); int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block);
int ipfs_bitswap_get_blocks(void* exchangeContext, struct Libp2pVector* Cids, struct Libp2pVector** blocks); /***
* Retrieve a collection of blocks from the BitswapNetwork
* Note: The return of false(0) means that not all blocks were found.
*
* @param exchangeContext a pointer to a BitswapContext
* @param cids a collection of Cid structs
* @param blocks a collection that contains the results.
* @param true(1) on success, false(0) otherwise
*/
int ipfs_bitswap_get_blocks(void* exchangeContext, struct Libp2pVector* cids, struct Libp2pVector** blocks);

View file

@ -1,3 +1,4 @@
#pragma once
/** /**
* This is the definition of an "Exchange" * This is the definition of an "Exchange"
* *
@ -8,10 +9,19 @@
#include "ipfs/cid/cid.h" #include "ipfs/cid/cid.h"
#include "libp2p/utils/vector.h" #include "libp2p/utils/vector.h"
/**
* These are methods that the local IPFS daemon (or client)
* call to communicate with the local repository or network
*/
struct Exchange { struct Exchange {
/** /**
* Retrieve a block from peers within the deadline enforced * Retrieve a block from peers within the deadline enforced
* by the context * by the context
*
* NOTE: Shouldn't the block parameter be a callback (function pointer)?
* Otherwise, this function is going to block. Is that what we want?
*
* @param context the context * @param context the context
* @param cid the hash of the block to retrieve * @param cid the hash of the block to retrieve
* @param block a pointer to the block (allocated by this method if return is true) * @param block a pointer to the block (allocated by this method if return is true)

View file

@ -698,7 +698,11 @@ int ipfs_repo_fsrepo_block_write(struct Block* block, const struct FSRepo* fs_re
* and the base32 encoded multihash as the value. * and the base32 encoded multihash as the value.
*/ */
int retVal = 1; int retVal = 1;
retVal = ipfs_blockstore_put(block, fs_repo); struct Blockstore* blockstore = ipfs_blockstore_new(fs_repo);
if (blockstore == NULL)
return 0;
retVal = ipfs_blockstore_put(blockstore->blockstoreContext, block);
ipfs_blockstore_free(blockstore);
if (retVal == 0) if (retVal == 0)
return 0; return 0;
// take the cid, base32 it, and send both to the datastore // take the cid, base32 it, and send both to the datastore
@ -799,7 +803,17 @@ int ipfs_repo_fsrepo_block_read(const unsigned char* hash, size_t hash_length, s
if (retVal == 0) // maybe it doesn't exist? if (retVal == 0) // maybe it doesn't exist?
return 0; return 0;
// now get the block from the blockstore // now get the block from the blockstore
retVal = ipfs_blockstore_get(hash, hash_length, block, fs_repo); struct Cid* cid = NULL;
if (!ipfs_cid_new(0, hash, hash_length, CID_PROTOBUF, &cid))
return 0;
struct Blockstore* blockstore = ipfs_blockstore_new(fs_repo);
if (blockstore == NULL) {
ipfs_cid_free(cid);
return 0;
}
retVal = ipfs_blockstore_get(blockstore->blockstoreContext, cid, block);
ipfs_blockstore_free(blockstore);
ipfs_cid_free(cid);
return retVal; return retVal;
} }

View file

@ -103,6 +103,5 @@ int test_bitswap_protobuf() {
message->wantlist->full = 1; message->wantlist->full = 1;
retVal = 1; retVal = 1;
exit:
return retVal; return retVal;
} }