2017-07-20 12:57:20 +00:00
/***
* This implements the BitswapNetwork . Members of this network can fill requests and
* smartly handle queues of local and remote requests .
2017-07-24 19:56:30 +00:00
*
* For a somewhat accurate diagram of how this may work , @ see https : //github.com/ipfs/js-ipfs-bitswap
2017-07-20 12:57:20 +00:00
*/
2017-09-20 17:39:26 +00:00
# include <pthread.h>
2017-08-02 13:53:34 +00:00
# include "libp2p/utils/logger.h"
2017-07-20 14:12:31 +00:00
# include "ipfs/exchange/bitswap/network.h"
2017-07-31 13:16:52 +00:00
# include "ipfs/exchange/bitswap/peer_request_queue.h"
2017-07-31 11:43:15 +00:00
/****
* send a message to a particular peer
* @ param context the BitswapContext
* @ param peer the peer that is the recipient
* @ param message the message to send
*/
2017-07-31 13:16:52 +00:00
int ipfs_bitswap_network_send_message ( const struct BitswapContext * context , struct Libp2pPeer * peer , const struct BitswapMessage * message ) {
2017-08-31 11:41:54 +00:00
libp2p_logger_debug ( " bitswap_network " , " Sending bitswap message to %s. \n " , libp2p_peer_id_to_string ( peer ) ) ;
2017-07-31 13:16:52 +00:00
// get a connection to the peer
2017-10-09 15:01:29 +00:00
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED | | peer - > sessionContext = = NULL ) {
2017-10-23 20:22:12 +00:00
libp2p_peer_connect ( context - > ipfsNode - > dialer , peer , context - > ipfsNode - > peerstore , context - > ipfsNode - > repo - > config - > datastore , 10 ) ;
2017-07-31 13:16:52 +00:00
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED )
return 0 ;
}
// protobuf the message
size_t buf_size = ipfs_bitswap_message_protobuf_encode_size ( message ) ;
uint8_t * buf = ( uint8_t * ) malloc ( buf_size + 20 ) ;
if ( buf = = NULL )
return 0 ;
if ( ! ipfs_bitswap_message_protobuf_encode ( message , & buf [ 20 ] , buf_size , & buf_size ) ) {
free ( buf ) ;
return 0 ;
}
// tack on the protocol header
memcpy ( buf , " /ipfs/bitswap/1.1.0 \n " , 20 ) ;
buf_size + = 20 ;
// send it
2017-10-23 14:48:19 +00:00
struct StreamMessage outgoing ;
outgoing . data = buf ;
outgoing . data_size = buf_size ;
int bytes_written = peer - > sessionContext - > default_stream - > write ( peer - > sessionContext , & outgoing ) ;
2017-07-31 13:16:52 +00:00
if ( bytes_written < = 0 ) {
free ( buf ) ;
return 0 ;
}
free ( buf ) ;
return 1 ;
2017-07-31 11:43:15 +00:00
}
2017-07-31 22:59:51 +00:00
/***
* Remove a cid from the queue
* @ param cids the vector of cids
* @ param cid the cid to remove
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-08-03 16:16:58 +00:00
int ipfs_bitswap_network_adjust_cid_queue ( struct Libp2pVector * collection , struct Cid * cid , int cancel ) {
2017-07-31 22:59:51 +00:00
if ( collection = = NULL | | cid = = NULL )
return 0 ;
for ( int i = 0 ; i < collection - > total ; collection + + ) {
2017-08-02 12:04:06 +00:00
const struct CidEntry * current = ( const struct CidEntry * ) libp2p_utils_vector_get ( collection , i ) ;
if ( ipfs_cid_compare ( current - > cid , cid ) = = 0 ) {
2017-08-03 16:16:58 +00:00
if ( cancel )
libp2p_utils_vector_delete ( collection , i ) ;
2017-07-31 22:59:51 +00:00
return 1 ;
}
}
2017-08-03 16:16:58 +00:00
// not found. Add it if we're not cancelling
if ( ! cancel ) {
struct CidEntry * cidEntry = ipfs_bitswap_peer_request_cid_entry_new ( ) ;
cidEntry - > cid = cid ;
cidEntry - > cancel = 0 ;
libp2p_utils_vector_add ( collection , cidEntry ) ;
}
2017-07-31 22:59:51 +00:00
2017-08-03 16:16:58 +00:00
return 0 ;
}
2017-07-31 22:59:51 +00:00
2017-07-31 13:16:52 +00:00
/***
* Handle a raw incoming bitswap message from the network
* @ param node us
* @ param sessionContext the connection context
* @ param bytes the message
* @ param bytes_size the size of the message
* @ returns true ( 1 ) on success , false ( 0 ) otherwise .
*/
int ipfs_bitswap_network_handle_message ( const struct IpfsNode * node , const struct SessionContext * sessionContext , const uint8_t * bytes , size_t bytes_length ) {
struct BitswapContext * bitswapContext = ( struct BitswapContext * ) node - > exchange - > exchangeContext ;
// strip off the protocol header
int start = - 1 ;
for ( int i = 0 ; i < bytes_length ; i + + ) {
if ( bytes [ i ] = = ' \n ' ) {
start = i + 1 ;
break ;
}
}
if ( start = = - 1 )
return 0 ;
// un-protobuf the message
struct BitswapMessage * message = NULL ;
if ( ! ipfs_bitswap_message_protobuf_decode ( & bytes [ start ] , bytes_length - start , & message ) )
return 0 ;
// process the message
// payload - what we want
2017-07-31 20:19:17 +00:00
if ( message - > payload ! = NULL ) {
for ( int i = 0 ; i < message - > payload - > total ; i + + ) {
struct Block * blk = ( struct Block * ) libp2p_utils_vector_get ( message - > payload , i ) ;
2017-08-09 01:40:35 +00:00
// we need a copy of the block so it survives the destruction of the message
node - > exchange - > HasBlock ( node - > exchange , ipfs_block_copy ( blk ) ) ;
2017-07-31 20:19:17 +00:00
}
2017-07-31 13:16:52 +00:00
}
// wantlist - what they want
if ( message - > wantlist ! = NULL & & message - > wantlist - > entries ! = NULL & & message - > wantlist - > entries - > total > 0 ) {
// get the peer
2017-07-31 21:36:52 +00:00
if ( sessionContext - > remote_peer_id = = NULL ) {
ipfs_bitswap_message_free ( message ) ;
return 0 ;
}
2017-08-02 13:53:34 +00:00
struct Libp2pPeer * peer = libp2p_peerstore_get_or_add_peer_by_id ( node - > peerstore , ( unsigned char * ) sessionContext - > remote_peer_id , strlen ( sessionContext - > remote_peer_id ) ) ;
2017-07-31 21:36:52 +00:00
if ( peer = = NULL ) {
2017-08-02 13:53:34 +00:00
libp2p_logger_error ( " bitswap_network " , " Unable to find or add peer %s of length %d to peerstore. \n " , sessionContext - > remote_peer_id , strlen ( sessionContext - > remote_peer_id ) ) ;
2017-07-31 21:36:52 +00:00
ipfs_bitswap_message_free ( message ) ;
return 0 ;
}
2017-08-09 01:40:35 +00:00
// find the queue (adds it if it is not there)
struct PeerRequest * peerRequest = ipfs_peer_request_queue_find_peer ( bitswapContext - > peerRequestQueue , peer ) ;
2017-07-31 13:16:52 +00:00
for ( int i = 0 ; i < message - > wantlist - > entries - > total ; i + + ) {
struct WantlistEntry * entry = ( struct WantlistEntry * ) libp2p_utils_vector_get ( message - > wantlist - > entries , i ) ;
// turn the "block" back into a cid
struct Cid * cid = NULL ;
2017-08-02 13:53:34 +00:00
if ( ! ipfs_cid_protobuf_decode ( entry - > block , entry - > block_size , & cid ) | | cid - > hash_length = = 0 ) {
libp2p_logger_error ( " bitswap_network " , " Message had invalid CID \n " ) ;
2017-07-31 13:16:52 +00:00
ipfs_cid_free ( cid ) ;
2017-08-03 22:46:20 +00:00
ipfs_bitswap_message_free ( message ) ;
2017-07-31 13:16:52 +00:00
return 0 ;
}
2017-08-09 01:40:35 +00:00
ipfs_bitswap_network_adjust_cid_queue ( peerRequest - > cids_they_want , cid , entry - > cancel ) ;
2017-07-31 13:16:52 +00:00
}
}
2017-08-03 22:46:20 +00:00
ipfs_bitswap_message_free ( message ) ;
2017-07-31 13:16:52 +00:00
return 1 ;
}