diff --git a/blocks/block.c b/blocks/block.c index aaa5c66..e839a7a 100644 --- a/blocks/block.c +++ b/blocks/block.c @@ -65,7 +65,7 @@ int ipfs_blocks_block_protobuf_decode(const unsigned char* buffer, const size_t unsigned char* temp_buffer = NULL; size_t temp_size; - *block = ipfs_blocks_block_new(); + *block = ipfs_block_new(); if (*block == NULL) goto exit; @@ -115,7 +115,7 @@ exit: * @param block a pointer to the struct Block that will be created * @returns true(1) on success */ -struct Block* ipfs_blocks_block_new() { +struct Block* ipfs_block_new() { // allocate memory for structure struct Block* block = (struct Block*)malloc(sizeof(struct Block)); @@ -172,7 +172,7 @@ int ipfs_block_free(struct Block* block) { * @returns a new Block that is a copy */ struct Block* ipfs_block_copy(struct Block* original) { - struct Block* copy = ipfs_blocks_block_new(); + struct Block* copy = ipfs_block_new(); copy->data_length = original->data_length; copy->data = (unsigned char*) malloc(original->data_length); memcpy(copy->data, original->data, original->data_length); diff --git a/blocks/blockstore.c b/blocks/blockstore.c index 8408afa..d377555 100644 --- a/blocks/blockstore.c +++ b/blocks/blockstore.c @@ -113,6 +113,7 @@ char* ipfs_blockstore_path_get(const struct FSRepo* fs_repo, const char* filenam * @returns true(1) on success */ int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid, struct Block** block) { + int retVal = 0; // get datastore key, which is a base32 key of the multihash unsigned char* key = ipfs_blockstore_hash_to_base32(cid->hash, cid->hash_length); @@ -122,13 +123,19 @@ int ipfs_blockstore_get(const struct BlockstoreContext* context, struct Cid* cid unsigned char buffer[file_size]; FILE* file = fopen(filename, "rb"); + if (file == NULL) + goto exit; + size_t bytes_read = fread(buffer, 1, file_size, file); fclose(file); - int retVal = ipfs_blocks_block_protobuf_decode(buffer, bytes_read, block); + if (!ipfs_blocks_block_protobuf_decode(buffer, bytes_read, block)) + goto exit; (*block)->cid = ipfs_cid_copy(cid); + retVal = 1; + exit: free(key); free(filename); diff --git a/core/daemon.c b/core/daemon.c index dadfeab..a34e649 100644 --- a/core/daemon.c +++ b/core/daemon.c @@ -19,9 +19,6 @@ int ipfs_daemon_start(char* repo_path) { struct IpfsNodeListenParams listen_param; struct MultiAddress* ma = NULL; - // Debugging JMJ - libp2p_logger_add_class("null"); - libp2p_logger_info("daemon", "Initializing daemon...\n"); struct IpfsNode* local_node = NULL; diff --git a/core/ipfs_node.c b/core/ipfs_node.c index 50ae916..88115a5 100644 --- a/core/ipfs_node.c +++ b/core/ipfs_node.c @@ -1,6 +1,7 @@ #include #include "ipfs/core/ipfs_node.h" +#include "ipfs/exchange/bitswap/bitswap.h" /*** * build an online IpfsNode @@ -21,6 +22,7 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) { local_node->providerstore = NULL; local_node->repo = NULL; local_node->routing = NULL; + local_node->exchange = NULL; // build the struct if (!ipfs_repo_fsrepo_new(repo_path, NULL, &fs_repo)) { @@ -42,7 +44,8 @@ int ipfs_node_online_new(const char* repo_path, struct IpfsNode** node) { local_node->providerstore = libp2p_providerstore_new(); local_node->blockstore = ipfs_blockstore_new(fs_repo); local_node->mode = MODE_OFFLINE; - local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key, NULL); + local_node->routing = ipfs_routing_new_online(local_node, &fs_repo->config->identity->private_key); + local_node->exchange = ipfs_bitswap_new(local_node); return 1; } @@ -66,6 +69,9 @@ int ipfs_node_free(struct IpfsNode* node) { if (node->blockstore != NULL) { ipfs_blockstore_free(node->blockstore); } + if (node->exchange != NULL) { + node->exchange->Close(node->exchange); + } free(node); } return 1; diff --git a/core/ping.c b/core/ping.c index 7d09267..bf89be6 100644 --- a/core/ping.c +++ b/core/ping.c @@ -22,7 +22,6 @@ int ipfs_ping (int argc, char **argv) { int retVal = 0; struct IpfsNode local_node; - struct Stream* stream = NULL; struct Libp2pPeer* peer_to_ping = NULL; char* id = NULL; struct FSRepo* fs_repo = NULL; @@ -51,7 +50,7 @@ int ipfs_ping (int argc, char **argv) local_node.identity = fs_repo->config->identity; local_node.repo = fs_repo; local_node.mode = MODE_ONLINE; - local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key, stream); + local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key); local_node.peerstore = libp2p_peerstore_new(local_node.identity->peer_id); local_node.providerstore = libp2p_providerstore_new(); diff --git a/exchange/bitswap/Makefile b/exchange/bitswap/Makefile index e4c3441..39840ec 100644 --- a/exchange/bitswap/Makefile +++ b/exchange/bitswap/Makefile @@ -7,7 +7,7 @@ endif LFLAGS = DEPS = -OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o +OBJS = bitswap.o message.o network.o peer_request_queue.o want_manager.o wantlist_queue.o engine.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/exchange/bitswap/bitswap.c b/exchange/bitswap/bitswap.c index 9f9adfe..1456800 100644 --- a/exchange/bitswap/bitswap.c +++ b/exchange/bitswap/bitswap.c @@ -14,7 +14,7 @@ * @param sessionContext the context * @returns an allocated Exchange structure */ -struct Exchange* ipfs_bitswap_exchange_start(struct IpfsNode* ipfs_node) { +struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfs_node) { struct Exchange* exchange = (struct Exchange*) malloc(sizeof(struct Exchange)); if (exchange != NULL) { struct BitswapContext* bitswapContext = (struct BitswapContext*) malloc(sizeof(struct BitswapContext)); @@ -22,15 +22,26 @@ struct Exchange* ipfs_bitswap_exchange_start(struct IpfsNode* ipfs_node) { free(exchange); return NULL; } - exchange->exchangeContext = (void*) bitswapContext; + bitswapContext->bitswap_engine = ipfs_bitswap_engine_new(); + if (bitswapContext->bitswap_engine == NULL) { + free(bitswapContext); + free(exchange); + return NULL; + } + bitswapContext->localWantlist = NULL; + bitswapContext->peerRequestQueue = NULL; bitswapContext->ipfsNode = ipfs_node; + + exchange->exchangeContext = (void*) bitswapContext; exchange->IsOnline = ipfs_bitswap_is_online; exchange->Close = ipfs_bitswap_close; exchange->HasBlock = ipfs_bitswap_has_block; exchange->GetBlock = ipfs_bitswap_get_block; exchange->GetBlocks = ipfs_bitswap_get_blocks; + + // Start the threads for the network + ipfs_bitswap_engine_start(bitswapContext); } - //TODO: Start the threads for the network return exchange; } diff --git a/exchange/bitswap/engine.c b/exchange/bitswap/engine.c index f86d075..4877673 100644 --- a/exchange/bitswap/engine.c +++ b/exchange/bitswap/engine.c @@ -1,35 +1,63 @@ +#include #include "ipfs/exchange/bitswap/engine.h" #include "ipfs/exchange/bitswap/wantlist_queue.h" #include "ipfs/exchange/bitswap/peer_request_queue.h" /*** - * A separate thread that processes the queue - * @param context the context + * Implementation of the bitswap engine */ -void ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { - struct BitswapContext* context = (struct BitswapContext*)ctx; - // the loop - while (!context->bitswap_engine->shutting_down) { - struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist); - if (item != NULL) { - // if there is something on the queue process it. - ipfs_bitswap__wantlist_process_entry(context, item); - } else { - // if there is nothing on the queue, wait... - sleep(2); - } + +/*** + * Allocate resources for a BitswapEngine + * @returns a new struct BitswapEngine + */ +struct BitswapEngine* ipfs_bitswap_engine_new() { + struct BitswapEngine* engine = (struct BitswapEngine*) malloc(sizeof(struct BitswapEngine)); + if (engine != NULL) { + engine->shutting_down = 0; } + return engine; +} + +/*** + * Deallocate resources from struct BitswapEngine + * @param engine the engine to free + * @returns true(1) + */ +int ipfs_bitswap_engine_free(struct BitswapEngine* engine) { + free(engine); + return 1; } /*** * A separate thread that processes the queue * @param context the context */ -void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { +void* ipfs_bitswap_engine_wantlist_processor_start(void* ctx) { struct BitswapContext* context = (struct BitswapContext*)ctx; // the loop while (!context->bitswap_engine->shutting_down) { - struct BitswapPeerQueueEntry* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue); + struct WantListQueueEntry* item = ipfs_bitswap_wantlist_queue_pop(context->localWantlist); + if (item != NULL) { + // if there is something on the queue process it. + ipfs_bitswap_wantlist_process_entry(context, item); + } else { + // if there is nothing on the queue, wait... + sleep(2); + } + } + return NULL; +} + +/*** + * A separate thread that processes the queue + * @param context the context + */ +void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { + struct BitswapContext* context = (struct BitswapContext*)ctx; + // the loop + while (!context->bitswap_engine->shutting_down) { + struct PeerRequest* item = ipfs_bitswap_peer_request_queue_pop(context->peerRequestQueue); if (item != NULL) { // if there is something on the queue process it. ipfs_bitswap_peer_request_process_entry(context, item); @@ -38,6 +66,7 @@ void ipfs_bitswap_engine_peer_request_processor_start(void* ctx) { sleep(2); } } + return NULL; } /** @@ -69,8 +98,8 @@ int ipfs_bitswap_engine_start(const struct BitswapContext* context) { int ipfs_bitswap_engine_stop(const struct BitswapContext* context) { context->bitswap_engine->shutting_down = 1; - int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread); - int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread); + int error1 = pthread_join(context->bitswap_engine->wantlist_processor_thread, NULL); + int error2 = pthread_join(context->bitswap_engine->peer_request_processor_thread, NULL); return !error1 && !error2; } diff --git a/exchange/bitswap/message.c b/exchange/bitswap/message.c index 3982e3f..768e7e2 100644 --- a/exchange/bitswap/message.c +++ b/exchange/bitswap/message.c @@ -584,7 +584,7 @@ int ipfs_bitswap_message_protobuf_decode(unsigned char* buffer, size_t buffer_le switch(field_no) { case (1): { // a Blocks entry that is just an array of bytes - struct Block* temp = ipfs_blocks_block_new(); + struct Block* temp = ipfs_block_new(); if (!protobuf_decode_length_delimited(&buffer[pos], buffer_length - pos, (char**)&temp->data, &temp->data_length, &bytes_read)) { return 0; } diff --git a/exchange/bitswap/peer_request_queue.c b/exchange/bitswap/peer_request_queue.c index 2c29efd..82faebd 100644 --- a/exchange/bitswap/peer_request_queue.c +++ b/exchange/bitswap/peer_request_queue.c @@ -132,7 +132,7 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_queue_find_entry(struct PeerR * @param queue the queue * @returns the PeerRequest that should be handled next, or NULL if the queue is empty */ -struct PeerRequest* ipfs_bitswap_peer_request_pop(struct PeerRequestQueue* queue) { +struct PeerRequest* ipfs_bitswap_peer_request_queue_pop(struct PeerRequestQueue* queue) { struct PeerRequest* retVal = NULL; if (queue != NULL) { pthread_mutex_lock(&queue->queue_mutex); @@ -185,3 +185,15 @@ int ipfs_bitswap_peer_request_queue_fill(struct PeerRequestQueue* queue, struct // add to the block array return 0; } + +/**** + * Handle a PeerRequest + * @param context the BitswapContext + * @param request the request to process + * @returns true(1) on succes, otherwise false(0) + */ +int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request) { + //TODO: Implement this method + return 0; +} + diff --git a/exchange/bitswap/wantlist_queue.c b/exchange/bitswap/wantlist_queue.c index 0de1460..760fcf2 100644 --- a/exchange/bitswap/wantlist_queue.c +++ b/exchange/bitswap/wantlist_queue.c @@ -120,6 +120,26 @@ struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_find(struct WantListQueue return NULL; } +/*** + * Pops the top one off the queue + * + * @param wantlist the list + * @returns the WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_pop(struct WantListQueue* wantlist) { + struct WantListQueueEntry* entry = NULL; + + if (wantlist->queue->total == 0) + return entry; + + //TODO: This should be a linked list, not an array + pthread_mutex_lock(&wantlist->wantlist_mutex); + entry = (struct WantListQueueEntry*)libp2p_utils_vector_get(wantlist->queue, 0); + libp2p_utils_vector_delete(wantlist->queue, 0); + pthread_mutex_unlock(&wantlist->wantlist_mutex); + return entry; +} + /*** * Initialize a WantListQueueEntry * @returns a new WantListQueueEntry diff --git a/include/ipfs/blocks/block.h b/include/ipfs/blocks/block.h index 2ff176b..e2deb46 100644 --- a/include/ipfs/blocks/block.h +++ b/include/ipfs/blocks/block.h @@ -18,7 +18,7 @@ struct Block { * Create a new block * @returns a new allocated Block struct */ -struct Block* ipfs_blocks_block_new(); +struct Block* ipfs_block_new(); int ipfs_blocks_block_add_data(const unsigned char* data, size_t data_size, struct Block* block); diff --git a/include/ipfs/exchange/bitswap/bitswap.h b/include/ipfs/exchange/bitswap/bitswap.h index 460f86d..5425c39 100644 --- a/include/ipfs/exchange/bitswap/bitswap.h +++ b/include/ipfs/exchange/bitswap/bitswap.h @@ -21,7 +21,7 @@ struct BitswapContext { * @param ipfsNode the context * @returns an Exchange struct that refers to the exchange */ -struct Exchange* ipfs_bitswap_exchange_start(struct IpfsNode* ipfsNode); +struct Exchange* ipfs_bitswap_new(struct IpfsNode* ipfsNode); /*** * These are the implementation methods for the exchange "Interface" diff --git a/include/ipfs/exchange/bitswap/engine.h b/include/ipfs/exchange/bitswap/engine.h index cfb9001..a3684eb 100644 --- a/include/ipfs/exchange/bitswap/engine.h +++ b/include/ipfs/exchange/bitswap/engine.h @@ -11,6 +11,19 @@ struct BitswapEngine { pthread_t peer_request_processor_thread; }; +/*** + * Allocate resources for a BitswapEngine + * @returns a new struct BitswapEngine + */ +struct BitswapEngine* ipfs_bitswap_engine_new(); + +/*** + * Deallocate resources from struct BitswapEngine + * @param engine the engine to free + * @returns true(1) + */ +int ipfs_bitswap_engine_free(struct BitswapEngine* engine); + /** * Starts the bitswap engine that processes queue items. There * should only be one of these per ipfs instance. diff --git a/include/ipfs/exchange/bitswap/peer_request_queue.h b/include/ipfs/exchange/bitswap/peer_request_queue.h index 9443cf8..be15ca3 100644 --- a/include/ipfs/exchange/bitswap/peer_request_queue.h +++ b/include/ipfs/exchange/bitswap/peer_request_queue.h @@ -4,6 +4,7 @@ */ #include +#include "ipfs/exchange/bitswap/bitswap.h" #include "ipfs/blocks/block.h" struct PeerRequest { @@ -103,3 +104,11 @@ struct PeerRequestEntry* ipfs_bitswap_peer_request_entry_new(); * @returns true(1) */ int ipfs_bitswap_peer_request_entry_free(struct PeerRequestEntry* entry); + +/**** + * Handle a PeerRequest + * @param context the BitswapContext + * @param request the request to process + * @returns true(1) on succes, otherwise false(0) + */ +int ipfs_bitswap_peer_request_process_entry(const struct BitswapContext* context, struct PeerRequest* request); diff --git a/include/ipfs/exchange/bitswap/wantlist_queue.h b/include/ipfs/exchange/bitswap/wantlist_queue.h index 4b5cba0..05ba823 100644 --- a/include/ipfs/exchange/bitswap/wantlist_queue.h +++ b/include/ipfs/exchange/bitswap/wantlist_queue.h @@ -97,3 +97,12 @@ int ipfs_bitswap_wantlist_session_compare(const struct WantListSession* a, const * @returns true(1) on success, false(0) if not. */ int ipfs_bitswap_wantlist_process_entry(struct BitswapContext* context, struct WantListQueueEntry* entry); + +/*** + * Pops the top one off the queue + * + * @param wantlist the list + * @returns the WantListQueueEntry + */ +struct WantListQueueEntry* ipfs_bitswap_wantlist_queue_pop(struct WantListQueue* wantlist); + diff --git a/include/ipfs/routing/routing.h b/include/ipfs/routing/routing.h index d32cd84..2267c24 100644 --- a/include/ipfs/routing/routing.h +++ b/include/ipfs/routing/routing.h @@ -12,7 +12,6 @@ struct IpfsRouting { struct IpfsNode* local_node; size_t ds_len; struct RsaPrivateKey* sk; - struct Stream* stream; /** * Put a value in the datastore @@ -81,10 +80,10 @@ typedef struct IpfsRouting ipfs_routing; // offline routing routines. ipfs_routing* ipfs_routing_new_offline (struct IpfsNode* local_node, struct RsaPrivateKey *private_key); // online using secio, should probably be deprecated -ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key, struct Stream* stream); +ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey* private_key); int ipfs_routing_online_free(ipfs_routing*); // online using DHT/kademlia, the recommended router -ipfs_routing* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key, struct Stream* stream); +ipfs_routing* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key); // generic routines int ipfs_routing_generic_put_value (ipfs_routing* offlineRouting, const unsigned char *key, size_t key_size, const void *val, size_t vlen); int ipfs_routing_generic_get_value (ipfs_routing* offlineRouting, const unsigned char *key, size_t key_size, void **val, size_t *vlen); diff --git a/routing/k_routing.c b/routing/k_routing.c index 020c32e..05949c1 100644 --- a/routing/k_routing.c +++ b/routing/k_routing.c @@ -148,7 +148,7 @@ int ipfs_routing_kademlia_bootstrap(struct IpfsRouting* routing) { return 1; } -struct IpfsRouting* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key, struct Stream* stream) { +struct IpfsRouting* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struct RsaPrivateKey* private_key) { char kademlia_id[21]; // generate kademlia compatible id by getting first 20 chars of peer id if (local_node->identity->peer_id == NULL || strlen(local_node->identity->peer_id) < 20) { @@ -160,7 +160,6 @@ struct IpfsRouting* ipfs_routing_new_kademlia(struct IpfsNode* local_node, struc if (routing != NULL) { routing->local_node = local_node; routing->sk = private_key; - routing->stream = stream; routing->PutValue = ipfs_routing_kademlia_put_value; routing->GetValue = ipfs_routing_kademlia_get_value; routing->FindProviders = ipfs_routing_kademlia_find_providers; diff --git a/routing/offline.c b/routing/offline.c index 97c61f3..bbf642a 100644 --- a/routing/offline.c +++ b/routing/offline.c @@ -103,7 +103,6 @@ struct IpfsRouting* ipfs_routing_new_offline (struct IpfsNode* local_node, struc if (offlineRouting) { offlineRouting->local_node = local_node; offlineRouting->sk = private_key; - offlineRouting->stream = NULL; offlineRouting->PutValue = ipfs_routing_generic_put_value; offlineRouting->GetValue = ipfs_routing_generic_get_value; diff --git a/routing/online.c b/routing/online.c index 02c145d..b461bfd 100644 --- a/routing/online.c +++ b/routing/online.c @@ -20,27 +20,24 @@ * @param message what to send * @returns what was received */ -struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct Stream* stream, struct Libp2pMessage* message) { +struct Libp2pMessage* ipfs_routing_online_send_receive_message(struct SessionContext* sessionContext, struct Libp2pMessage* message) { size_t protobuf_size = 0, results_size = 0; unsigned char* protobuf = NULL, *results = NULL; struct Libp2pMessage* return_message = NULL; - struct SessionContext session_context; protobuf_size = libp2p_message_protobuf_encode_size(message); protobuf = (unsigned char*)malloc(protobuf_size); libp2p_message_protobuf_encode(message, &protobuf[0], protobuf_size, &protobuf_size); - session_context.default_stream = stream; - session_context.insecure_stream = stream; // upgrade to kademlia protocol - if (!libp2p_routing_dht_upgrade_stream(&session_context)) { + if (!libp2p_routing_dht_upgrade_stream(sessionContext)) { goto exit; } // send the message, and expect the same back - session_context.default_stream->write(&session_context, protobuf, protobuf_size); - session_context.default_stream->read(&session_context, &results, &results_size, 5); + sessionContext->default_stream->write(sessionContext->default_stream, protobuf, protobuf_size); + sessionContext->default_stream->read(sessionContext->default_stream, &results, &results_size, 5); // see if we can unprotobuf if (!libp2p_message_protobuf_decode(results, results_size, &return_message)) @@ -79,7 +76,7 @@ int ipfs_routing_online_find_remote_providers(struct IpfsRouting* routing, const if (peer->connection_type == CONNECTION_TYPE_CONNECTED) { // Ask for hash, if it has it, break out of the loop and stop looking libp2p_logger_debug("online", "FindRemoteProviders: Asking for who can provide\n"); - struct Libp2pMessage* return_message = ipfs_routing_online_send_receive_message(peer->connection, message); + struct Libp2pMessage* return_message = ipfs_routing_online_send_receive_message(peer->sessionContext, message); if (return_message != NULL && return_message->provider_peer_head != NULL) { libp2p_logger_debug("online", "FindRemoteProviders: Return value is not null\n"); found = 1; @@ -160,7 +157,7 @@ int ipfs_routing_online_ask_peer_for_peer(struct Libp2pPeer* whoToAsk, const uns goto exit; memcpy(message->key, peer_id, peer_id_size); - return_message = ipfs_routing_online_send_receive_message(whoToAsk->connection, message); + return_message = ipfs_routing_online_send_receive_message(whoToAsk->sessionContext, message); if (return_message == NULL) { // some kind of network error whoToAsk->connection_type = CONNECTION_TYPE_NOT_CONNECTED; @@ -263,7 +260,7 @@ int ipfs_routing_online_provide(struct IpfsRouting* routing, const unsigned char struct Libp2pPeer* current_peer = current_peer_entry->peer; if (current_peer->connection_type == CONNECTION_TYPE_CONNECTED) { // ignoring results is okay this time - struct Libp2pMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->connection, msg); + struct Libp2pMessage* rslt = ipfs_routing_online_send_receive_message(current_peer->sessionContext, msg); if (rslt != NULL) libp2p_message_free(rslt); } else if(current_peer->addr_head == NULL @@ -343,7 +340,7 @@ int ipfs_routing_online_get_peer_value(ipfs_routing* routing, const struct Libp2 msg->message_type = MESSAGE_TYPE_GET_VALUE; // send message and receive results - struct Libp2pMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->connection, msg); + struct Libp2pMessage* ret_msg = ipfs_routing_online_send_receive_message(peer->sessionContext, msg); libp2p_message_free(msg); if (ret_msg == NULL) @@ -486,7 +483,7 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { if (peer == NULL) { return -1; // this should never happen } - if (peer->connection == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) + if (peer->sessionContext == NULL) { // should always be true unless we added it twice (TODO: we should prevent that earlier) libp2p_peer_connect(peer, 5); } } @@ -499,16 +496,14 @@ int ipfs_routing_online_bootstrap(struct IpfsRouting* routing) { * Create a new ipfs_routing struct for online clients * @param fs_repo the repo * @param private_key the local private key - * @param stream the stream to put in the struct * @reurns the ipfs_routing struct that handles messages */ -ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey *private_key, struct Stream* stream) { +ipfs_routing* ipfs_routing_new_online (struct IpfsNode* local_node, struct RsaPrivateKey *private_key) { ipfs_routing *onlineRouting = malloc (sizeof(ipfs_routing)); if (onlineRouting) { onlineRouting->local_node = local_node; onlineRouting->sk = private_key; - onlineRouting->stream = stream; onlineRouting->PutValue = ipfs_routing_generic_put_value; onlineRouting->GetValue = ipfs_routing_online_get_value; diff --git a/test/exchange/test_bitswap.h b/test/exchange/test_bitswap.h index 1b1eeb3..3b464f4 100644 --- a/test/exchange/test_bitswap.h +++ b/test/exchange/test_bitswap.h @@ -66,7 +66,7 @@ int test_bitswap_new_free() { } // payload message->payload = libp2p_utils_vector_new(1); - block = ipfs_blocks_block_new(); + block = ipfs_block_new(); block->data_length = 25; libp2p_utils_vector_add(message->payload, block); block = (struct Block*)libp2p_utils_vector_get(message->payload, 0); @@ -141,7 +141,7 @@ int test_bitswap_retrieve_file() goto exit; // fire up the exchange - exchange = ipfs_bitswap_exchange_start(localNode); + exchange = ipfs_bitswap_new(localNode); // attempt to retrieve the file if (!exchange->GetBlock(exchange, cid, &block)) { @@ -179,8 +179,8 @@ int test_bitswap_retrieve_file_third_party() { libp2p_logger_add_class("peerstore"); libp2p_logger_add_class("exporter"); libp2p_logger_add_class("peer"); - libp2p_logger_add_class("test_bitswap"); */ + libp2p_logger_add_class("test_bitswap"); // clean out repository char* ipfs_path = "/tmp/test1"; @@ -190,26 +190,27 @@ int test_bitswap_retrieve_file_third_party() { int thread1_started = 0, thread2_started = 0; struct MultiAddress* ma_peer1 = NULL; struct Libp2pVector* ma_vector2 = NULL, *ma_vector3 = NULL; - struct HashtableNode* node = NULL, *result_node = NULL; + struct HashtableNode* node = NULL; + struct Block* result = NULL; + struct Cid* cid = NULL; // create peer 1 + libp2p_logger_debug("test_bitswap", "Firing up daemon 1.\n"); drop_and_build_repository(ipfs_path, 4001, NULL, &peer_id_1); char multiaddress_string[255]; sprintf(multiaddress_string, "/ip4/127.0.0.1/tcp/4001/ipfs/%s", peer_id_1); ma_peer1 = multiaddress_new_from_string(multiaddress_string); // start the daemon in a separate thread - libp2p_logger_debug("test_bitswap", "Firing up daemon 1.\n"); if (pthread_create(&thread1, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0) { - fprintf(stderr, "Unable to start thread 1\n"); + libp2p_logger_error("test_bitswap", "Unable to start thread 1\n"); goto exit; } thread1_started = 1; - // wait for everything to start up - // JMJ debugging = sleep(3); // create peer 2 + libp2p_logger_debug("test_routing", "Firing up daemon 2.\n"); ipfs_path = "/tmp/test2"; // create a vector to hold peer1's multiaddress so we can connect as a peer ma_vector2 = libp2p_utils_vector_new(1); @@ -226,19 +227,16 @@ int test_bitswap_retrieve_file_third_party() { ipfs_import_file(NULL, "/home/parallels/ipfstest/hello_world.txt", &node, ipfs_node2, &bytes_written, 0); ipfs_node_free(ipfs_node2); // start the daemon in a separate thread - libp2p_logger_debug("test_routing", "Firing up daemon 2.\n"); if (pthread_create(&thread2, NULL, test_routing_daemon_start, (void*)ipfs_path) < 0) { - fprintf(stderr, "Unable to start thread 2\n"); + libp2p_logger_error("test_bitswap", "Unable to start thread 2\n"); goto exit; } thread2_started = 1; - // wait for everything to start up - // JMJ debugging = sleep(3); - libp2p_logger_debug("test_routing", "Firing up the 3rd client\n"); // create my peer, peer 3 + libp2p_logger_debug("test_routing", "Firing up the 3rd client\n"); ipfs_path = "/tmp/test3"; ma_peer1 = multiaddress_new_from_string(multiaddress_string); ma_vector3 = libp2p_utils_vector_new(1); @@ -249,18 +247,20 @@ int test_bitswap_retrieve_file_third_party() { ipfs_node3->routing->Bootstrap(ipfs_node3->routing); - if (!ipfs_exporter_get_node(ipfs_node3, node->hash, node->hash_size, &result_node)) { - fprintf(stderr, "Get_Node returned false\n"); + // this does the heavy lifting... + cid = ipfs_cid_new(0, node->hash, node->hash_size, CID_PROTOBUF); + if (!ipfs_node3->exchange->GetBlock(ipfs_node3->exchange, cid, &result)) { + libp2p_logger_error("test_bitswap", "GetBlock returned false\n"); goto exit; } - if (node->hash_size != result_node->hash_size) { - fprintf(stderr, "Node hash sizes do not match. Should be %lu but is %lu\n", node->hash_size, result_node->hash_size); + if (node->hash_size != result->cid->hash_length) { + libp2p_logger_error("test_bitswap", "Node hash sizes do not match. Should be %lu but is %lu\n", node->hash_size, result->cid->hash_length); goto exit; } - if (node->data_size != result_node->data_size) { - fprintf(stderr, "Result sizes do not match. Should be %lu but is %lu\n", node->data_size, result_node->data_size); + if (node->data_size != result->data_length) { + libp2p_logger_error("test_bitswap", "Result sizes do not match. Should be %lu but is %lu\n", node->data_size, result->data_length); goto exit; } @@ -287,8 +287,10 @@ int test_bitswap_retrieve_file_third_party() { } if (node != NULL) ipfs_hashtable_node_free(node); - if (result_node != NULL) - ipfs_hashtable_node_free(result_node); + if (result != NULL) + ipfs_block_free(result); + if (cid != NULL) + ipfs_cid_free(cid); return retVal; } diff --git a/test/repo/test_repo_fsrepo.h b/test/repo/test_repo_fsrepo.h index e588e8a..a109dce 100644 --- a/test/repo/test_repo_fsrepo.h +++ b/test/repo/test_repo_fsrepo.h @@ -47,7 +47,7 @@ int test_repo_fsrepo_write_read_block() { } // create and write the block - block = ipfs_blocks_block_new(); + block = ipfs_block_new(); if (block == NULL) { ipfs_repo_fsrepo_free(fs_repo); return 0; diff --git a/test/routing/test_routing.h b/test/routing/test_routing.h index 1294cff..c341dae 100644 --- a/test/routing/test_routing.h +++ b/test/routing/test_routing.h @@ -93,7 +93,7 @@ int test_routing_find_peer() { local_node.providerstore = NULL; local_node.repo = fs_repo; local_node.identity = fs_repo->config->identity; - local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key, NULL); + local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key); local_node.routing->Bootstrap(local_node.routing); @@ -215,7 +215,7 @@ int test_routing_find_providers() { local_node.providerstore = libp2p_providerstore_new(); local_node.repo = fs_repo; local_node.identity = fs_repo->config->identity; - local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key, NULL); + local_node.routing = ipfs_routing_new_online(&local_node, &fs_repo->config->identity->private_key); local_node.routing->Bootstrap(local_node.routing); diff --git a/test/routing/test_supernode.h b/test/routing/test_supernode.h index 3ea345a..2b3c814 100644 --- a/test/routing/test_supernode.h +++ b/test/routing/test_supernode.h @@ -59,7 +59,6 @@ int test_routing_supernode_get_remote_value() { struct FSRepo* fs_repo = NULL; struct IpfsNode* ipfs_node = NULL; struct Libp2pPeer this_peer; - struct Stream* stream = NULL; const unsigned char* orig_multihash = (unsigned char*)"QmYAXgX8ARiriupMQsbGXtKdDyGzWry1YV3sycKw1qqmgH"; size_t hash_size = 100; unsigned char hash[hash_size]; @@ -97,7 +96,7 @@ int test_routing_supernode_get_remote_value() { // add bootstrap peer for kademlia struct MultiAddress* remote = multiaddress_new_from_string("/ip4/127.0.0.1/udp/5001"); libp2p_utils_vector_add(ipfs_node->repo->config->bootstrap_peers, remote); - ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key, stream); + ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key); if (ipfs_node->routing == NULL) @@ -159,7 +158,6 @@ int test_routing_supernode_get_value() { int retVal = 0; struct FSRepo* fs_repo = NULL; struct IpfsNode* ipfs_node = NULL; - struct Stream* stream = NULL; int file_size = 1000; unsigned char bytes[file_size]; char* fullFileName = "/tmp/temp_file.bin"; @@ -194,7 +192,7 @@ int test_routing_supernode_get_value() { this_peer.addr_head = libp2p_utils_linked_list_new(); this_peer.addr_head->item = multiaddress_new_from_string("/ip4/127.0.0.1/tcp/4001"); libp2p_peerstore_add_peer(ipfs_node->peerstore, &this_peer); - ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key, stream); + ipfs_node->routing = ipfs_routing_new_kademlia(ipfs_node, &fs_repo->config->identity->private_key); if (ipfs_node->routing == NULL) goto exit; diff --git a/test/storage/test_blocks.h b/test/storage/test_blocks.h index 9df126e..af5e9cf 100644 --- a/test/storage/test_blocks.h +++ b/test/storage/test_blocks.h @@ -3,7 +3,7 @@ int test_blocks_new() { const unsigned char* input = (const unsigned char*)"Hello, World!"; int retVal = 0; - struct Block* block = ipfs_blocks_block_new(); + struct Block* block = ipfs_block_new(); if (block == NULL) return 0; diff --git a/test/storage/test_datastore.h b/test/storage/test_datastore.h index 74e8f13..6c64ce0 100644 --- a/test/storage/test_datastore.h +++ b/test/storage/test_datastore.h @@ -23,7 +23,7 @@ int test_ipfs_datastore_put() { return 0; // build the block - block = ipfs_blocks_block_new(); + block = ipfs_block_new(); if (block == NULL) return 0; diff --git a/test/testit.c b/test/testit.c index c260c11..182d568 100644 --- a/test/testit.c +++ b/test/testit.c @@ -37,6 +37,7 @@ const char* names[] = { "test_bitswap_new_free", "test_bitswap_peer_request_queue_new", "test_bitswap_retrieve_file", + "test_bitswap_retrieve_file_third_party", "test_cid_new_free", "test_cid_cast_multihash", "test_cid_cast_non_multihash", @@ -90,6 +91,7 @@ int (*funcs[])(void) = { test_bitswap_new_free, test_bitswap_peer_request_queue_new, test_bitswap_retrieve_file, + test_bitswap_retrieve_file_third_party, test_cid_new_free, test_cid_cast_multihash, test_cid_cast_non_multihash,