From 2232d03854a3f18b860e5a0676dd8fcdad745b2a Mon Sep 17 00:00:00 2001 From: jmjatlanta Date: Thu, 20 Jul 2017 15:03:49 -0500 Subject: [PATCH] Implementing protobuf objects for bitswap messages --- Makefile | 2 + blocks/block.c | 15 +- exchange/Makefile | 18 + exchange/bitswap/Makefile | 18 + exchange/bitswap/bitswap.c | 108 ++++ exchange/bitswap/message.c | 645 ++++++++++++++++++++++++ include/ipfs/blocks/block.h | 9 +- include/ipfs/cid/cid.h | 7 + include/ipfs/exchange/bitswap/bitswap.h | 13 + include/ipfs/exchange/bitswap/message.h | 42 ++ include/ipfs/exchange/bitswap/network.h | 6 +- include/ipfs/exchange/exchange.h | 57 +++ main/Makefile | 1 + test/Makefile | 1 + test/repo/test_repo_fsrepo.h | 4 +- test/storage/test_blocks.h | 5 +- test/storage/test_datastore.h | 4 +- 17 files changed, 934 insertions(+), 21 deletions(-) create mode 100644 exchange/Makefile create mode 100644 exchange/bitswap/Makefile create mode 100644 exchange/bitswap/bitswap.c create mode 100644 exchange/bitswap/message.c create mode 100644 include/ipfs/exchange/bitswap/bitswap.h create mode 100644 include/ipfs/exchange/bitswap/message.h create mode 100644 include/ipfs/exchange/exchange.h diff --git a/Makefile b/Makefile index 1cfb8e7..14f545e 100644 --- a/Makefile +++ b/Makefile @@ -9,6 +9,7 @@ all: cd cmd; make all; cd commands; make all; cd core; make all; + cd exchange; make all; cd importer; make all; cd merkledag; make all; cd multibase; make all; @@ -32,6 +33,7 @@ clean: cd cmd; make clean; cd commands; make clean; cd core; make clean; + cd exchange; make clean; cd importer; make clean; cd merkledag; make clean; cd multibase; make clean; diff --git a/blocks/block.c b/blocks/block.c index d2e4ec4..b49f011 100644 --- a/blocks/block.c +++ b/blocks/block.c @@ -65,7 +65,8 @@ int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t unsigned char* temp_buffer = NULL; size_t temp_size; - if (ipfs_blocks_block_new(block) == 0) + *block = ipfs_blocks_block_new(); + if (*block == NULL) goto exit; while(pos < buffer_length) { @@ -114,16 +115,16 @@ exit: * @param block a pointer to the struct Block that will be created * @returns true(1) on success */ -int ipfs_blocks_block_new(struct Block** block) { +struct Block* ipfs_blocks_block_new() { // allocate memory for structure - (*block) = (struct Block*)malloc(sizeof(struct Block)); - if ((*block) == NULL) + struct Block* block = (struct Block*)malloc(sizeof(struct Block)); + if ( block == NULL) return 0; - (*block)->data = NULL; - (*block)->data_length = 0; + block->data = NULL; + block->data_length = 0; - return 1; + return block; } int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, struct Block* block) { diff --git a/exchange/Makefile b/exchange/Makefile new file mode 100644 index 0000000..247de14 --- /dev/null +++ b/exchange/Makefile @@ -0,0 +1,18 @@ +CC = gcc +CFLAGS = -O0 -I../include -I../../c-libp2p/include -std=c99 + +ifdef DEBUG +CFLAGS += -g3 +endif + +DEPS = +OBJS = + +%.o: %.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) + +all: $(OBJS) + cd bitswap; make all; +clean: + rm -f *.o + cd bitswap; make clean; diff --git a/exchange/bitswap/Makefile b/exchange/bitswap/Makefile new file mode 100644 index 0000000..a4c268c --- /dev/null +++ b/exchange/bitswap/Makefile @@ -0,0 +1,18 @@ +CC = gcc +CFLAGS = -O0 -I../../include -I../../../c-libp2p/include -I../../../c-multiaddr/include -I../../../c-multihash/include -I../../../c-protobuf -Wall -std=c99 + +ifdef DEBUG +CFLAGS += -g3 +endif + +LFLAGS = +DEPS = +OBJS = bitswap.o message.o network.o + +%.o: %.c $(DEPS) + $(CC) -c -o $@ $< $(CFLAGS) + +all: $(OBJS) + +clean: + rm -f *.o diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c new file mode 100644 index 0000000..fa5b304 --- /dev/null +++ b/exchange/bitswap/bitswap.c @@ -0,0 +1,108 @@ +/** + * Methods for the Bitswap exchange + */ +#include +#include "ipfs/exchange/exchange.h" +#include "ipfs/exchange/bitswap/bitswap.h" +#include "ipfs/exchange/bitswap/message.h" + +/** + * Create a new bitswap exchange + * @param sessionContext the context + * @returns an allocated Exchange structure + */ +struct Exchange* ipfs_bitswap_new(struct SessionContext* sessionContext) { + struct Exchange* exchange = (struct Exchange*) malloc(sizeof(struct Exchange)); + if (exchange != NULL) { + struct BitswapContext* bitswapContext = (struct BitswapContext*) malloc(sizeof(struct BitswapContext)); + if (exchange->exchangeContext == NULL) { + free(exchange); + return NULL; + } + exchange->exchangeContext = (void*) bitswapContext; + bitswapContext->sessionContext = sessionContext; + //TODO: fill in the exchangeContext + exchange->IsOnline = ipfs_bitswap_is_online; + exchange->Close = ipfs_bitswap_close; + exchange->HasBlock = ipfs_bitswap_has_block; + exchange->GetBlock = ipfs_bitswap_get_block; + exchange->GetBlocks = ipfs_bitswap_get_blocks; + } + return exchange; +} + +/** + * Clean up resources within an Exchange struct + * @param exchange the exchange to free + * @returns true(1) + */ +int ipfs_bitswap_free(struct Exchange* exchange) { + if (exchange != NULL) { + if (exchange->exchangeContext != NULL) { + free(exchange->exchangeContext); + } + free(exchange); + } + return 1; +} + +/** + * Implements the Exchange->IsOnline method + */ +int ipfs_bitswap_is_online(void* exchangeContext) { + if (exchangeContext != NULL) { + struct BitswapContext* bitswapContext = (struct BitswapContext*)exchangeContext; + //TODO: Is this an accurate way to determine if we're running? + if (bitswapContext->sessionContext != NULL) + return 1; + } + return 0; +} + +/*** + * Implements the Exchange->Close method + */ +int ipfs_bitswap_close(void* exchangeContext) { + //TODO: Implement this method + // Should it close the exchange? + return 0; +} + +/** + * Implements the Exchange->HasBlock method + */ +int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block) { + //TODO: Implement this method + // NOTE: The GO version adds the block to the blockstore. I have yet to + // understand the flow and if this is correct for us. + return 0; +} + +/** + * Implements the Exchange->GetBlock method + */ +int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block) { + // TODO: Implement this method + return 0; +} + +/** + * Implements the Exchange->GetBlocks method + */ +int ipfs_bitswap_get_blocks(void* exchangeContext, struct Libp2pVector* Cids, struct Libp2pVector** blocks) { + // TODO: Implement this method + return 0; +} + +/*** + * Receive a BitswapMessage from a peer. + * @param exchangeContext the context + * @param peer_id the origin + * @param peer_id_size the size of the peer_id + * @param message the message + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_receive_message(void* exchangeContext, unsigned char* peer_id, int peer_id_size, struct BitswapMessage* message) { + + return 0; +} diff --git a/exchange/bitswap/message.c b/exchange/bitswap/message.c new file mode 100644 index 0000000..93d0246 --- /dev/null +++ b/exchange/bitswap/message.c @@ -0,0 +1,645 @@ +#include +#include "protobuf.h" +#include "varint.h" +#include "libp2p/utils/vector.h" +#include "ipfs/blocks/block.h" +#include "ipfs/exchange/bitswap/message.h" + +/*** + * Allocate memory for a struct BitswapBlock + * @returns a new BitswapBlock + */ +struct BitswapBlock* ipfs_bitswap_block_new() { + struct BitswapBlock* block = (struct BitswapBlock*) malloc(sizeof(struct BitswapBlock)); + if (block != NULL) { + block->bytes_size = 0; + block->bytes = NULL; + block->prefix_size = 0; + block->prefix = NULL; + } + return block; +} + +/** + * Deallocate memory for a struct BitswapBlock + * @param block the block to deallocate + * @returns true(1) + */ +int ipfs_bitswap_block_free(struct BitswapBlock* block) { + if (block != NULL) { + if (block->bytes != NULL) + free(block->bytes); + if (block->prefix != NULL) + free(block->prefix); + free(block); + } + return 1; +} + +/** + * Retrieve an estimate of the size of a protobuf'd BitswapBlock + * @returns the approximate (maximum actually) size of a protobuf'd BitswapBlock + */ +size_t ipfs_bitswap_message_block_protobuf_size(struct BitswapBlock* block) { + // protobuf prefix + prefix + bytes = 33 + array sizes + return 33 + block->prefix_size + block->bytes_size; +} + +/*** + * Encode a BitswapBlock + * @param incoming the block to encode + * @param outgoing where to place the results + * @param max_size the maximum allocated space for outgoing + * @param bytes_written the number of bytes written to outgoing + */ +int ipfs_bitswap_message_block_protobuf_encode(struct BitswapBlock* incoming, uint8_t* outgoing, size_t max_size, size_t* bytes_written) { + // 2 WIRETYPE_LENGTH_DELIMITED fields of prefix and bytes + size_t bytes_used; + *bytes_written = 0; + + if (incoming != NULL) { + if (!protobuf_encode_length_delimited(1, WIRETYPE_LENGTH_DELIMITED, (char*)incoming->prefix, incoming->prefix_size, outgoing, max_size, &bytes_used)) + return 0; + *bytes_written += bytes_used; + if (!protobuf_encode_length_delimited(2, WIRETYPE_LENGTH_DELIMITED, (char*)incoming->bytes, incoming->bytes_size, &outgoing[*bytes_written], max_size - (*bytes_written), &bytes_used)) + return 0; + *bytes_written += bytes_used; + } + return 1; +} + +/*** + * Decode a protobuf to a BitswapBlock + * @param buffer the incoming protobuf + * @param buffer_length the length of the incoming protobuf buffer + * @param output a pointer to the BitswapBlock that will be allocated + * @returns true(1) on success, false(0) if not. If false, any memory was deallocated + */ +int ipfs_bitswap_message_block_protobuf_decode(uint8_t* buffer, size_t buffer_length, struct BitswapBlock** output) { + size_t pos = 0; + int retVal = 0; + + *output = NULL; + + // short cut for nulls + if (buffer_length == 0) + return 1; + + *output = (struct BitswapBlock*) malloc(sizeof(struct BitswapBlock)); + if (*output == NULL) + goto exit; + + struct BitswapBlock* block = *output; + + while(pos < buffer_length) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&buffer[pos], buffer_length, &field_no, &field_type, &bytes_read) == 0) { + goto exit; + } + pos += bytes_read; + switch(field_no) { + case (1): + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&block->prefix, &block->prefix_size, &bytes_read)) + goto exit; + break; + case (2): + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&block->bytes, &block->bytes_size, &bytes_read)) + goto exit; + break; + } + } + + retVal = 1; + exit: + if (retVal == 0) { + if (*output != NULL) + free(*output); + *output = NULL; + } + return retVal; +} + +/*** + * Allocate memory for a new WantlistEntry + * @returns the newly allocated WantlistEntry + */ +struct WantlistEntry* ipfs_bitswap_wantlist_entry_new() { + struct WantlistEntry* entry = (struct WantlistEntry*) malloc(sizeof(struct WantlistEntry)); + if (entry == NULL) + return NULL; + + entry->block = NULL; + entry->block_size = 0; + entry->cancel = 0; + entry->priority = 1; + + return entry; +} + +/*** + * Free allocations of a WantlistEntry + * @param entry the WantlistEntry + * @returns true(1) + */ +int ipfs_bitswap_wantlist_entry_free(struct WantlistEntry* entry) { + if (entry != NULL) { + if (entry->block != NULL) + free(entry->block); + free(entry); + entry = NULL; + } + return 1; +} + +/** + * Retrieve an estimate of the size of a protobuf'd WantlistEntry + * @param entry the struct to examine + * @returns the approximate (maximum actually) size of a protobuf'd WantlistEntry + */ +size_t ipfs_bitswap_wantlist_entry_protobuf_encode_size(struct WantlistEntry* entry) { + // protobuf prefix + block + cancel + priority + return 33 + entry->block_size; +} + +/*** + * Encode a WantlistEntry into a Protobuf + * @param entry the WantlistEntry to encode + * @param buffer where to put the results + * @param buffer_length the maximum size of the buffer + * @param bytes_written the number of bytes written into the buffer + * @returns true(1) on success, false(0) otherwise + */ +int ipfs_bitswap_wantlist_entry_protobuf_encode(struct WantlistEntry* entry, unsigned char* buffer, size_t buffer_length, size_t* bytes_written) { + size_t bytes_used; + *bytes_written = 0; + + if (entry != NULL) { + if (!protobuf_encode_length_delimited(1, WIRETYPE_LENGTH_DELIMITED, (char*)entry->block, entry->block_size, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) + return 0; + *bytes_written += bytes_used; + if (!protobuf_encode_varint(2, WIRETYPE_VARINT, entry->cancel, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) + return 0; + *bytes_written += bytes_used; + if (!protobuf_encode_varint(3, WIRETYPE_VARINT, entry->priority, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) + return 0; + *bytes_written += bytes_used; + } + return 1; +} + +/*** + * Decode a protobuf into a struct WantlistEntry + * @param buffer the protobuf buffer + * @param buffer_length the length of the data in the protobuf buffer + * @param output the resultant WantlistEntry + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_wantlist_entry_protobuf_decode(unsigned char* buffer, size_t buffer_length, struct WantlistEntry** output) { + size_t pos = 0; + int retVal = 0; + struct WantlistEntry* entry = NULL; + + *output = NULL; + + // short cut for nulls + if (buffer_length == 0) + return 1; + + *output = (struct WantlistEntry*) malloc(sizeof(struct WantlistEntry)); + if (*output == NULL) + goto exit; + + entry = *output; + + while(pos < buffer_length) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&buffer[pos], buffer_length, &field_no, &field_type, &bytes_read) == 0) { + goto exit; + } + pos += bytes_read; + switch(field_no) { + case (1): + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&entry->block, &entry->block_size, &bytes_read)) + goto exit; + pos += bytes_read; + break; + case (2): + entry->cancel = varint_decode(&buffer[pos], buffer_length - pos, &bytes_read); + pos += bytes_read; + break; + case (3): + entry->priority = varint_decode(&buffer[pos], buffer_length - pos, &bytes_read); + pos += bytes_read; + break; + } + + } + + retVal = 1; + exit: + if (retVal == 0) { + if (entry != NULL) + free(entry); + *output = NULL; + } + return retVal; +} + +/*** + * Allocate memory for a new Bitswap Message WantList + * @returns the allocated struct BitswapWantlist + */ +struct BitswapWantlist* ipfs_bitswap_wantlist_new() { + struct BitswapWantlist* list = (struct BitswapWantlist*) malloc(sizeof(struct BitswapWantlist)); + + if (list != NULL) { + list->entries = NULL; + list->full = 1; + } + + return list; +} + +/** + * Free the resources used by a Wantlist + * @param list the list to free + * @returns true(1) + */ +int ipfs_bitswap_wantlist_free(struct BitswapWantlist* list) { + if (list != NULL) { + if (list->entries != NULL) { + for(int i = 0; i < list->entries->total; i++) { + // free each item in the vector + struct WantlistEntry* entry = (struct WantlistEntry*) libp2p_utils_vector_get(list->entries, i); + ipfs_bitswap_wantlist_entry_free(entry); + } + libp2p_utils_vector_free(list->entries); + } + free(list); + } + return 1; +} + +/*** + * Calculate the maximum size of a protobuf'd BitswapWantlist + * @param list the Wantlist + * @returns the maximum size of the protobuf'd list + */ +size_t ipfs_bitswap_wantlist_protobuf_encode_size(struct BitswapWantlist* list) { + size_t total = 0; + if (list != NULL) { + for(int i = 0; i < list->entries->total; i++) { + struct WantlistEntry* entry = (struct WantlistEntry*) libp2p_utils_vector_get(list->entries, i); + total += ipfs_bitswap_wantlist_entry_protobuf_encode_size(entry); + } + total += 11 + 12 + 11; + } + return total; +} + +/*** + * Encode a BitswapWantlist into a protobuf buffer + * @param list the list to encode + * @param buffer the buffer to fill + * @param buffer_length the length of the allocated buffer + * @param bytes_written the total number of bytes written to the buffer + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_wantlist_protobuf_encode(struct BitswapWantlist* list, unsigned char* buffer, size_t buffer_length, size_t* bytes_written) { + size_t bytes_used = 0; + *bytes_written = 0; + + if (list != NULL) { + // the vector of entries + for(int i = 0; i < list->entries->total; i++) { + struct WantlistEntry* entry = (struct WantlistEntry*) libp2p_utils_vector_get(list->entries, i); + // protobuf the entry + size_t temp_buffer_size = ipfs_bitswap_wantlist_entry_protobuf_encode_size(entry); + uint8_t* temp_buffer = (uint8_t*) malloc(temp_buffer_size); + if (temp_buffer == NULL) + return 0; + if (!ipfs_bitswap_wantlist_entry_protobuf_encode(entry, temp_buffer, temp_buffer_size, &temp_buffer_size)) { + free(temp_buffer); + return 0; + } + // we've got the protobuf'd entry, now put it in the real buffer + if (!protobuf_encode_length_delimited(1, WIRETYPE_LENGTH_DELIMITED, (char*)temp_buffer, temp_buffer_size, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) { + free(temp_buffer); + return 0; + } + // all went okay. Clean up and do it again... + free(temp_buffer); + *bytes_written += bytes_used; + } + // if this is the full list or not... + if (!protobuf_encode_varint(2, WIRETYPE_VARINT, list->full, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) + return 0; + *bytes_written += bytes_used; + } + return 1; +} + +/*** + * Decode a Wantlist from a protobuf + * @param buffer the protobuf + * @param buffer_length the length of the protobuf + * @param output the newly allocated BitswapWantlist + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_wantlist_protobuf_decode(unsigned char* buffer, size_t buffer_length, struct BitswapWantlist** output) { + size_t pos = 0; + + *output = NULL; + + // short cut for nulls + if (buffer_length == 0) + return 1; + + *output = (struct BitswapWantlist*) malloc(sizeof(struct BitswapWantlist)); + if (*output == NULL) + return 0; + + struct BitswapWantlist* list = *output; + + while(pos < buffer_length) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&buffer[pos], buffer_length, &field_no, &field_type, &bytes_read) == 0) { + return 0; + } + pos += bytes_read; + switch(field_no) { + case (1): { + // a WantlistEntry + size_t temp_size = 0; + uint8_t* temp = NULL; + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp, &temp_size, &bytes_read)) { + return 0; + } + struct WantlistEntry* entry = NULL; + if (!ipfs_bitswap_wantlist_entry_protobuf_decode(temp, temp_size, &entry)) { + free(temp); + return 0; + } + free(temp); + if (list->entries == NULL) { + list->entries = libp2p_utils_vector_new(1); + } + libp2p_utils_vector_add(list->entries, (void*)entry); + free(entry); + pos += bytes_read; + break; + } + case (2): { + list->full = varint_decode(&buffer[pos], buffer_length - pos, &bytes_read); + pos += bytes_read; + break; + } + } + } + + return 1; +} + +/*** + * Bitswap Message + * + */ + +/*** + * Allocate memory for a new Bitswap Message + * @returns the allocated struct BitswapMessage + */ +struct BitswapMessage* ipfs_bitswap_message_new() { + struct BitswapMessage* message = (struct BitswapMessage*) malloc(sizeof(struct BitswapMessage)); + + if (message != NULL) { + message->blocks = NULL; + message->payload = NULL; + message->wantlist = NULL; + } + + return message; +} + +/** + * Free the resources used by a BitswapMessage + * @param message the BitswapMessage to free + * @returns true(1) + */ +int ipfs_bitswap_message_free(struct BitswapMessage* message) { + if (message != NULL) { + if (message->blocks != NULL) { + // blocks are just byte arrays in bitswap 1.0.0, so throw it in a struct + // so it can be put in a vector + for(int i = 0; i < message->blocks->total; i++) { + // free each item in the vector + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->blocks, i); + ipfs_blocks_block_free(entry); + } + libp2p_utils_vector_free(message->blocks); + } + if (message->payload != NULL) { + for(int i = 0; i < message->payload->total; i++) { + // free each item in the vector + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->payload, i); + ipfs_blocks_block_free(entry); + } + libp2p_utils_vector_free(message->payload); + } + if (message->wantlist != NULL) { + ipfs_bitswap_wantlist_free(message->wantlist); + } + free(message); + } + return 1; +} + +/*** + * Calculate the maximum size of a protobuf'd BitswapMessage + * @param message the BitswapMessage + * @returns the maximum size of the protobuf'd BitswapMessage + */ +size_t ipfs_bitswap_message_protobuf_encode_size(struct BitswapMessage* message) { + size_t total = 0; + if (message != NULL) { + if (message->blocks != NULL) { + for(int i = 0; i < message->blocks->total; i++) { + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->blocks, i); + total += 11 + entry->data_length; + } + } + if (message->payload != NULL) { + for(int i = 0; i < message->payload->total; i++) { + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->payload, i); + total += 11 + ipfs_blocks_block_protobuf_encode_size(entry); + } + } + if (message->wantlist != NULL) { + total += ipfs_bitswap_wantlist_protobuf_encode_size(message->wantlist); + } + total += 11 + 12 + 11; + } + return total; +} + +/*** + * Encode a BitswapMessage into a protobuf buffer + * @param message the message to encode + * @param buffer the buffer to fill + * @param buffer_length the length of the allocated buffer + * @param bytes_written the total number of bytes written to the buffer + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_message_protobuf_encode(struct BitswapMessage* message, unsigned char* buffer, size_t buffer_length, size_t* bytes_written) { + size_t bytes_used = 0; + *bytes_written = 0; + + if (message != NULL) { + // the vector of blocks that are actually to be turned back into byte arrays + if (message->blocks != NULL) { + for(int i = 0; i < message->blocks->total; i++) { + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->blocks, i); + // blocks are just variable length byte streams + if (!protobuf_encode_length_delimited(1, WIRETYPE_LENGTH_DELIMITED, (char*)entry->data, entry->data_length, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) { + return 0; + } + *bytes_written += bytes_used; + } + } + // the vector of Blocks that are actually blocks + if (message->payload != NULL) { + for(int i = 0; i < message->payload->total; i++) { + struct Block* entry = (struct Block*) libp2p_utils_vector_get(message->payload, i); + // protobuf it + size_t temp_size = ipfs_blocks_block_protobuf_encode_size(entry); + uint8_t* temp = (uint8_t*) malloc(temp_size); + if (!ipfs_blocks_block_protobuf_encode(entry, temp, temp_size, &temp_size)) { + free(temp); + return 0; + } + // put it in the buffer + if (!protobuf_encode_length_delimited(2, WIRETYPE_LENGTH_DELIMITED, (char*)temp, temp_size, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) { + free(temp); + return 0; + } + *bytes_written += bytes_used; + free(temp); + } + } + // the WantList + if (message->wantlist != NULL) { + size_t temp_size = ipfs_bitswap_wantlist_protobuf_encode_size(message->wantlist); + uint8_t* temp = (uint8_t*) malloc(temp_size); + if (!ipfs_bitswap_wantlist_protobuf_encode(message->wantlist, temp, temp_size, &temp_size)) { + free(temp); + return 0; + } + if (!protobuf_encode_length_delimited(3, WIRETYPE_LENGTH_DELIMITED, (char*)temp, temp_size, &buffer[*bytes_written], buffer_length - (*bytes_written), &bytes_used)) { + free(temp); + return 0; + } + *bytes_written += bytes_used; + free(temp); + } + + } + return 1; +} + +/*** + * Decode a BitswapMessage from a protobuf + * @param buffer the protobuf + * @param buffer_length the length of the protobuf + * @param output the newly allocated BitswapMessage + * @returns true(1) on success, otherwise false(0) + */ +int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_length, struct BitswapMessage** output) { + size_t pos = 0; + + *output = NULL; + + // short cut for nulls + if (buffer_length == 0) + return 1; + + *output = (struct BitswapMessage*) malloc(sizeof(struct BitswapMessage)); + if (*output == NULL) + return 0; + + struct BitswapMessage* message = *output; + + while(pos < buffer_length) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&buffer[pos], buffer_length, &field_no, &field_type, &bytes_read) == 0) { + return 0; + } + pos += bytes_read; + switch(field_no) { + case (1): { + // a Blocks entry that is just an array of bytes + struct Block* temp = ipfs_blocks_block_new(); + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp->data, &temp->data_length, &bytes_read)) { + return 0; + } + if (message->blocks == NULL) { + message->blocks = libp2p_utils_vector_new(1); + } + libp2p_utils_vector_add(message->blocks, (void*)temp); + ipfs_blocks_block_free(temp); + pos += bytes_read; + break; + } + case (2): { + // a block entry that is a real block struct + size_t temp_size = 0; + uint8_t* temp = NULL; + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp, &temp_size, &bytes_read)) { + return 0; + } + // we have the bytes, turn it into a Block struct + struct Block* block = NULL; + if (!ipfs_blocks_block_protobuf_decode(temp, temp_size, &block)) { + free(temp); + return 0; + } + free(temp); + if (message->payload == NULL) { + message->payload = libp2p_utils_vector_new(1); + } + libp2p_utils_vector_add(message->payload, (void*)block); + ipfs_blocks_block_free(block); + pos += bytes_read; + break; + } + case(3): { + // a Wantlist + size_t temp_size = 0; + uint8_t* temp = NULL; + if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp, &temp_size, &bytes_read)) { + return 0; + } + // we have the protobuf'd wantlist, now turn it into a Wantlist struct. + if (!ipfs_bitswap_wantlist_protobuf_decode(temp, temp_size, &message->wantlist)) { + free(temp); + return 0; + } + free(temp); + pos += bytes_read; + break; + } + } + } + + return 1; +} + + + diff --git a/include/ipfs/blocks/block.h b/include/ipfs/blocks/block.h index 9d27cdb..c5cc679 100644 --- a/include/ipfs/blocks/block.h +++ b/include/ipfs/blocks/block.h @@ -15,13 +15,10 @@ struct Block { }; /*** - * Create a new block based on the incoming data. - * @param data the data to base the block on - * @param data_size the length of the data array - * @param block a pointer to the struct Block that will be created - * @returns true(1) on success + * Create a new block + * @returns a new allocated Block struct */ -int ipfs_blocks_block_new(struct Block** block); +struct Block* ipfs_blocks_block_new(); int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, struct Block* block); diff --git a/include/ipfs/cid/cid.h b/include/ipfs/cid/cid.h index dfff375..7d99772 100644 --- a/include/ipfs/cid/cid.h +++ b/include/ipfs/cid/cid.h @@ -19,6 +19,13 @@ #define CID_ZCASH_BLOCK 0xc0 #define CID_ZCASH_TX 0xc1 +/*** + * A note about CID versions: + * Version 0 only contained the multihash address. The extra parameters of multibase, + * multicodec, cid-version were implied (base58btc, protobuf-mdag, and cidv0 + * respectively) are implied. + */ + struct Cid { int version; char codec; diff --git a/include/ipfs/exchange/bitswap/bitswap.h b/include/ipfs/exchange/bitswap/bitswap.h new file mode 100644 index 0000000..59218fa --- /dev/null +++ b/include/ipfs/exchange/bitswap/bitswap.h @@ -0,0 +1,13 @@ +struct BitswapContext { + struct SessionContext* sessionContext; +}; + +int ipfs_bitswap_is_online(void* exchangeContext); + +int ipfs_bitswap_close(void* exchangeContext); + +int ipfs_bitswap_has_block(void* exchangeContext, struct Block* block); + +int ipfs_bitswap_get_block(void* exchangeContext, struct Cid* cid, struct Block** block); + +int ipfs_bitswap_get_blocks(void* exchangeContext, struct Libp2pVector* Cids, struct Libp2pVector** blocks); diff --git a/include/ipfs/exchange/bitswap/message.h b/include/ipfs/exchange/bitswap/message.h new file mode 100644 index 0000000..c06adbf --- /dev/null +++ b/include/ipfs/exchange/bitswap/message.h @@ -0,0 +1,42 @@ +/*** + * A protobuf-able Bitswap Message + */ +#include +#include +#include "libp2p/utils/vector.h" + +struct WantlistEntry { + // optional string block = 1, the block cid (cidV0 in bitswap 1.0.0, cidV1 in bitswap 1.1.0 + unsigned char* block; + size_t block_size; + // optional int32 priority = 2, the priority (normalized). default to 1 + uint32_t priority; + // optional bool cancel = 3, whether this revokes an entry + uint8_t cancel; +}; + +struct BitswapWantlist { + // repeated WantlistEntry entries = 1, a list of wantlist entries + struct Libp2pVector* entries; + // optional bool full = 2, whether this is the full wantlist. default to false + uint8_t full; +}; + +struct BitswapBlock { + // optional bytes prefix = 1, // CID prefix (cid version, multicodec, and multihash prefix (type + length)) + uint8_t* prefix; + size_t prefix_size; + // optional bytes data = 2 + uint8_t* bytes; + size_t bytes_size; +}; + +struct BitswapMessage { + // optional Wantlist wantlist = 1 + struct BitswapWantlist* wantlist; + // repeated bytes blocks = 2, used to send Blocks in bitswap 1.0.0 + struct Libp2pVector* blocks; + // repeated Block payload = 3, used to send Blocks in bitswap 1.1.0 + struct Libp2pVector* payload; + +}; diff --git a/include/ipfs/exchange/bitswap/network.h b/include/ipfs/exchange/bitswap/network.h index 5eac404..f1b1f8a 100644 --- a/include/ipfs/exchange/bitswap/network.h +++ b/include/ipfs/exchange/bitswap/network.h @@ -3,6 +3,9 @@ * smartly handle queues of local and remote requests. */ +#include "libp2p/conn/session.h" +#include "ipfs/exchange/bitswap/message.h" + struct BitswapRouting { /** * Find the provider of a key asyncronously @@ -39,7 +42,8 @@ struct BitswapNetwork { * @param receiver the struct that contains function pointers for receiving messages * @returns true(1) on success, otherwise false(0) */ - int (*SetDelegate)(struct BitswapReceiver* receiver); + //TODO: Implement this + //int (*SetDelegate)(struct BitswapReceiver* receiver); /** * Attempt a connection to a particular peer diff --git a/include/ipfs/exchange/exchange.h b/include/ipfs/exchange/exchange.h new file mode 100644 index 0000000..522a8f0 --- /dev/null +++ b/include/ipfs/exchange/exchange.h @@ -0,0 +1,57 @@ +/** + * This is the definition of an "Exchange" + * + * Anything that implements the Exchange interface can be used as + * an IPFS block exchange protocol. + */ +#include "ipfs/blocks/block.h" +#include "ipfs/cid/cid.h" +#include "libp2p/utils/vector.h" + +struct Exchange { + /** + * Retrieve a block from peers within the deadline enforced + * by the context + * @param context the context + * @param cid the hash of the block to retrieve + * @param block a pointer to the block (allocated by this method if return is true) + * @returns true(1) on success, false(0) otherwise + */ + int (*GetBlock)(void* exchangeContext, struct Cid* cid, struct Block** block); + + /** + * Retrieve several blocks + * @param context the context + * @param Cids a vector of hashes for the blocks to be retrieved + * @param blocks a pointer to a vector of retrieved blocks (will be NULL on error) + * @returns true(1) on success, otherwise false(0) + */ + int (*GetBlocks)(void* exchangeContext, struct Libp2pVector* Cids, struct Libp2pVector** blocks); + + /** + * Announces the existance of a block to this bitswap service. The service will + * potentially notify its peers. + * NOTE: This is mainly designed to announce blocks added by non-bitswap methods (i.e. the local user) + * @param block the block being announced + * @returns true(1) on success, false(0) if not + */ + int (*HasBlock)(void* exchangeContext, struct Block* block); + + /** + * Determine if we're online + * @returns true(1) if we're online + */ + int (*IsOnline)(void* exchangeContext); + + /** + * Close up the exchange, and go offline + * @returns true(1); + */ + int (*Close)(void* exchangeContext); + + /** + * Used by each implementation to maintain state + * (will be cast-ed to an implementation-specific structure) + */ + void* exchangeContext; +}; diff --git a/main/Makefile b/main/Makefile index bd66805..e7d14f8 100644 --- a/main/Makefile +++ b/main/Makefile @@ -11,6 +11,7 @@ OBJS = main.o \ ../datastore/ds_helper.o \ ../datastore/key.o \ ../dnslink/*.o \ + ../exchange/bitswap/*.o \ ../flatfs/flatfs.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ ../path/path.o \ diff --git a/test/Makefile b/test/Makefile index c5cc181..6a6dfae 100644 --- a/test/Makefile +++ b/test/Makefile @@ -14,6 +14,7 @@ OBJS = testit.o test_helper.o \ ../core/ping.o \ ../core/ipfs_node.o \ ../datastore/ds_helper.o \ + ../exchange/bitswap/*.o \ ../flatfs/flatfs.o \ ../importer/importer.o ../importer/exporter.o ../importer/resolver.o \ ../merkledag/merkledag.o ../merkledag/node.o \ diff --git a/test/repo/test_repo_fsrepo.h b/test/repo/test_repo_fsrepo.h index 9f2f0c7..eee35a8 100644 --- a/test/repo/test_repo_fsrepo.h +++ b/test/repo/test_repo_fsrepo.h @@ -47,8 +47,8 @@ int test_repo_fsrepo_write_read_block() { } // create and write the block - retVal = ipfs_blocks_block_new(&block); - if (retVal == 0) { + block = ipfs_blocks_block_new(); + if (block == NULL) { ipfs_repo_fsrepo_free(fs_repo); return 0; } diff --git a/test/storage/test_blocks.h b/test/storage/test_blocks.h index 9a88e96..6d9f5b8 100644 --- a/test/storage/test_blocks.h +++ b/test/storage/test_blocks.h @@ -3,9 +3,8 @@ int test_blocks_new() { const unsigned char* input = (const unsigned char*)"Hello, World!"; int retVal = 0; - struct Block* block; - retVal = ipfs_blocks_block_new(&block); - if (retVal == 0) + struct Block* block = ipfs_blocks_block_new(); + if (block == NULL) return 0; retVal = ipfs_blocks_block_add_data(input, strlen((const char*)input) + 1, block); diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index 4d4e05c..d46139f 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -23,8 +23,8 @@ int test_ipfs_datastore_put() { return 0; // build the block - retVal = ipfs_blocks_block_new(&block); - if (retVal == 0) + block = ipfs_blocks_block_new(); + if (block == NULL) return 0; retVal = ipfs_blocks_block_add_data(input, strlen((char*)input), block);