2017-07-20 12:57:20 +00:00
/***
* This implements the BitswapNetwork . Members of this network can fill requests and
* smartly handle queues of local and remote requests .
2017-07-24 19:56:30 +00:00
*
* For a somewhat accurate diagram of how this may work , @ see https : //github.com/ipfs/js-ipfs-bitswap
2017-07-20 12:57:20 +00:00
*/
2017-08-02 13:53:34 +00:00
# include "libp2p/utils/logger.h"
2017-07-20 14:12:31 +00:00
# include "ipfs/exchange/bitswap/network.h"
2017-07-31 13:16:52 +00:00
# include "ipfs/exchange/bitswap/peer_request_queue.h"
2017-07-31 11:43:15 +00:00
/****
* send a message to a particular peer
* @ param context the BitswapContext
* @ param peer the peer that is the recipient
* @ param message the message to send
*/
2017-07-31 13:16:52 +00:00
int ipfs_bitswap_network_send_message ( const struct BitswapContext * context , struct Libp2pPeer * peer , const struct BitswapMessage * message ) {
// get a connection to the peer
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED ) {
2017-07-31 21:36:52 +00:00
libp2p_peer_connect ( & context - > ipfsNode - > identity - > private_key , peer , context - > ipfsNode - > peerstore , 10 ) ;
2017-07-31 13:16:52 +00:00
if ( peer - > connection_type ! = CONNECTION_TYPE_CONNECTED )
return 0 ;
}
// protobuf the message
size_t buf_size = ipfs_bitswap_message_protobuf_encode_size ( message ) ;
uint8_t * buf = ( uint8_t * ) malloc ( buf_size + 20 ) ;
if ( buf = = NULL )
return 0 ;
if ( ! ipfs_bitswap_message_protobuf_encode ( message , & buf [ 20 ] , buf_size , & buf_size ) ) {
free ( buf ) ;
return 0 ;
}
// tack on the protocol header
memcpy ( buf , " /ipfs/bitswap/1.1.0 \n " , 20 ) ;
buf_size + = 20 ;
// send it
2017-07-31 18:54:09 +00:00
int bytes_written = peer - > sessionContext - > default_stream - > write ( peer - > sessionContext , buf , buf_size ) ;
2017-07-31 13:16:52 +00:00
if ( bytes_written < = 0 ) {
free ( buf ) ;
return 0 ;
}
free ( buf ) ;
return 1 ;
2017-07-31 11:43:15 +00:00
}
2017-07-31 22:59:51 +00:00
/***
* Remove a cid from the queue
* @ param cids the vector of cids
* @ param cid the cid to remove
* @ returns true ( 1 ) on success , false ( 0 ) otherwise
*/
2017-08-03 16:16:58 +00:00
int ipfs_bitswap_network_adjust_cid_queue ( struct Libp2pVector * collection , struct Cid * cid , int cancel ) {
2017-07-31 22:59:51 +00:00
if ( collection = = NULL | | cid = = NULL )
return 0 ;
for ( int i = 0 ; i < collection - > total ; collection + + ) {
2017-08-02 12:04:06 +00:00
const struct CidEntry * current = ( const struct CidEntry * ) libp2p_utils_vector_get ( collection , i ) ;
if ( ipfs_cid_compare ( current - > cid , cid ) = = 0 ) {
2017-08-03 16:16:58 +00:00
if ( cancel )
libp2p_utils_vector_delete ( collection , i ) ;
2017-07-31 22:59:51 +00:00
return 1 ;
}
}
2017-08-03 16:16:58 +00:00
// not found. Add it if we're not cancelling
if ( ! cancel ) {
struct CidEntry * cidEntry = ipfs_bitswap_peer_request_cid_entry_new ( ) ;
cidEntry - > cid = cid ;
cidEntry - > cancel = 0 ;
libp2p_utils_vector_add ( collection , cidEntry ) ;
}
2017-07-31 22:59:51 +00:00
2017-08-03 16:16:58 +00:00
return 0 ;
}
2017-07-31 22:59:51 +00:00
2017-07-31 13:16:52 +00:00
/***
* Handle a raw incoming bitswap message from the network
* @ param node us
* @ param sessionContext the connection context
* @ param bytes the message
* @ param bytes_size the size of the message
* @ returns true ( 1 ) on success , false ( 0 ) otherwise .
*/
int ipfs_bitswap_network_handle_message ( const struct IpfsNode * node , const struct SessionContext * sessionContext , const uint8_t * bytes , size_t bytes_length ) {
struct BitswapContext * bitswapContext = ( struct BitswapContext * ) node - > exchange - > exchangeContext ;
// strip off the protocol header
int start = - 1 ;
for ( int i = 0 ; i < bytes_length ; i + + ) {
if ( bytes [ i ] = = ' \n ' ) {
start = i + 1 ;
break ;
}
}
if ( start = = - 1 )
return 0 ;
// un-protobuf the message
struct BitswapMessage * message = NULL ;
if ( ! ipfs_bitswap_message_protobuf_decode ( & bytes [ start ] , bytes_length - start , & message ) )
return 0 ;
// process the message
// payload - what we want
2017-07-31 20:19:17 +00:00
if ( message - > payload ! = NULL ) {
for ( int i = 0 ; i < message - > payload - > total ; i + + ) {
struct Block * blk = ( struct Block * ) libp2p_utils_vector_get ( message - > payload , i ) ;
node - > exchange - > HasBlock ( node - > exchange , blk ) ;
}
2017-07-31 13:16:52 +00:00
}
// wantlist - what they want
if ( message - > wantlist ! = NULL & & message - > wantlist - > entries ! = NULL & & message - > wantlist - > entries - > total > 0 ) {
// get the peer
2017-07-31 21:36:52 +00:00
if ( sessionContext - > remote_peer_id = = NULL ) {
ipfs_bitswap_message_free ( message ) ;
return 0 ;
}
2017-08-02 13:53:34 +00:00
struct Libp2pPeer * peer = libp2p_peerstore_get_or_add_peer_by_id ( node - > peerstore , ( unsigned char * ) sessionContext - > remote_peer_id , strlen ( sessionContext - > remote_peer_id ) ) ;
2017-07-31 21:36:52 +00:00
if ( peer = = NULL ) {
2017-08-02 13:53:34 +00:00
libp2p_logger_error ( " bitswap_network " , " Unable to find or add peer %s of length %d to peerstore. \n " , sessionContext - > remote_peer_id , strlen ( sessionContext - > remote_peer_id ) ) ;
2017-07-31 21:36:52 +00:00
ipfs_bitswap_message_free ( message ) ;
return 0 ;
}
2017-08-02 13:53:34 +00:00
// find the queue
2017-07-31 13:16:52 +00:00
struct PeerRequestEntry * queueEntry = ipfs_bitswap_peer_request_queue_find_entry ( bitswapContext - > peerRequestQueue , peer ) ;
2017-07-31 21:36:52 +00:00
if ( queueEntry = = NULL ) {
2017-08-02 13:53:34 +00:00
// add the queue
2017-07-31 21:36:52 +00:00
struct PeerRequest * peerRequest = ipfs_bitswap_peer_request_new ( ) ;
peerRequest - > peer = peer ;
ipfs_bitswap_peer_request_queue_add ( bitswapContext - > peerRequestQueue , peerRequest ) ;
queueEntry = ipfs_bitswap_peer_request_queue_find_entry ( bitswapContext - > peerRequestQueue , peer ) ;
if ( queueEntry = = NULL ) {
ipfs_bitswap_message_free ( message ) ;
return 0 ;
}
}
2017-07-31 13:16:52 +00:00
for ( int i = 0 ; i < message - > wantlist - > entries - > total ; i + + ) {
struct WantlistEntry * entry = ( struct WantlistEntry * ) libp2p_utils_vector_get ( message - > wantlist - > entries , i ) ;
// turn the "block" back into a cid
struct Cid * cid = NULL ;
2017-08-02 13:53:34 +00:00
if ( ! ipfs_cid_protobuf_decode ( entry - > block , entry - > block_size , & cid ) | | cid - > hash_length = = 0 ) {
libp2p_logger_error ( " bitswap_network " , " Message had invalid CID \n " ) ;
2017-07-31 13:16:52 +00:00
ipfs_cid_free ( cid ) ;
2017-08-03 22:46:20 +00:00
ipfs_bitswap_message_free ( message ) ;
2017-07-31 13:16:52 +00:00
return 0 ;
}
2017-08-03 16:16:58 +00:00
ipfs_bitswap_network_adjust_cid_queue ( queueEntry - > current - > cids_they_want , cid , entry - > cancel ) ;
2017-07-31 13:16:52 +00:00
}
}
2017-08-03 22:46:20 +00:00
ipfs_bitswap_message_free ( message ) ;
2017-07-31 13:16:52 +00:00
return 1 ;
}