Handling details of bitswap and connection pool

This commit is contained in:
jmjatlanta 2017-08-03 11:15:40 -05:00
parent a750c0edf1
commit 61a576eb93
7 changed files with 89 additions and 16 deletions

View file

@ -37,4 +37,12 @@ struct Stream {
* @returns true(1) on success, otherwise false(0)
*/
int (*close)(void* stream_context);
/***
* Checks to see if something is waiting on the stream
*
* @param stream the stream context
* @returns true(1) if something is waiting, false(0) otherwise
*/
int (*peek)(void* stream_context);
};

View file

@ -57,6 +57,13 @@ void libp2p_peer_free(struct Libp2pPeer* in);
*/
int libp2p_peer_connect(struct RsaPrivateKey* privateKey, struct Libp2pPeer* peer, struct Peerstore* peerstore, int timeout);
/***
* Clean up a bad connection
* @param peer the peer to clean up
* @returns true(1)
*/
int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer);
/**
* Make a copy of a peer
* @param in what is to be copied

View file

@ -5,6 +5,7 @@
#include <errno.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include "libp2p/net/p2pnet.h"
#include "libp2p/record/message.h"
#include "libp2p/secio/secio.h"
@ -38,6 +39,32 @@ int libp2p_net_multistream_close(void* stream_context) {
return 1;
}
/***
* Check the stream to see if there is something to read
* @param stream_context a SessionContext
* @returns number of bytes to be read, or -1 if there was an error
*/
int libp2p_net_multistream_peek(void* stream_context) {
if (stream_context == NULL)
return -1;
struct SessionContext* session_context = (struct SessionContext*)stream_context;
struct Stream* stream = session_context->default_stream;
if (stream == NULL)
return -1;
int socket_fd = *((int*)stream->socket_descriptor);
if (socket_fd < 0)
return -1;
int bytes = 0;
if (ioctl(socket_fd, FIONREAD, &bytes) < 0) {
// Ooff, we're having problems. Don't use this socket again.
return -1;
}
return bytes;
}
/**
* Write to an open multistream host
* @param stream_context the session context
@ -334,6 +361,7 @@ struct Stream* libp2p_net_multistream_stream_new(int socket_fd, const char* ip,
out->close = libp2p_net_multistream_close;
out->read = libp2p_net_multistream_read;
out->write = libp2p_net_multistream_write;
out->peek = libp2p_net_multistream_peek;
char str[strlen(ip) + 50];
sprintf(str, "/ip4/%s/tcp/%d", ip, port);
out->address = multiaddress_new_from_string(str);

View file

@ -75,6 +75,18 @@ void libp2p_peer_free(struct Libp2pPeer* in) {
}
}
/***
* Clean up a bad connection
* @param peer the peer to clean up
* @returns true(1)
*/
int libp2p_peer_handle_connection_error(struct Libp2pPeer* peer) {
peer->connection_type = CONNECTION_TYPE_NOT_CONNECTED;
libp2p_session_context_free(peer->sessionContext);
peer->sessionContext = NULL;
return 1;
}
/**
* Attempt to connect to the peer, setting connection_type correctly
* NOTE: If successful, this will set peer->connection to the stream

View file

@ -132,11 +132,14 @@ int libp2p_peerstore_add_peer(struct Peerstore* peerstore, const struct Libp2pPe
}
struct PeerEntry* peer_entry = libp2p_peer_entry_new();
if (peer_entry == NULL) {
libp2p_logger_error("peerstore", "Unable to allocate memory for new PeerEntry.\n");
return 0;
}
peer_entry->peer = libp2p_peer_copy(peer);
if (peer_entry->peer == NULL)
if (peer_entry->peer == NULL) {
libp2p_logger_error("peerstore", "Could not copy peer for PeerEntry.\n");
return 0;
}
retVal = libp2p_peerstore_add_peer_entry(peerstore, peer_entry);
libp2p_logger_debug("peerstore", "Adding peer %s to peerstore was a success\n", peer->id);
}

View file

@ -782,10 +782,7 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
struct StretchedKey* k1 = NULL, *k2 = NULL;
struct PrivateKey* priv = NULL;
struct PublicKey pub_key = {0};
struct Libp2pPeer* remote_peer = libp2p_peer_new();
remote_peer->sessionContext = local_session;
remote_peer->connection_type = CONNECTION_TYPE_CONNECTED;
struct Libp2pPeer* remote_peer = NULL;
//TODO: make sure we're not talking to ourself
@ -884,14 +881,32 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
// generate their peer id
libp2p_crypto_public_key_to_peer_id(public_key, &local_session->remote_peer_id);
// put peer information in Libp2pPeer struct
remote_peer->id_size = strlen(local_session->remote_peer_id);
if (remote_peer->id_size > 0) {
remote_peer->id = malloc(remote_peer->id_size + 1);
if (remote_peer->id != NULL) {
memcpy(remote_peer->id, local_session->remote_peer_id, remote_peer->id_size);
remote_peer->id[remote_peer->id_size] = 0;
// see if we already have this peer
int new_peer = 0;
remote_peer = libp2p_peerstore_get_peer(peerstore, (unsigned char*)local_session->remote_peer_id, strlen(local_session->remote_peer_id));
if (remote_peer == NULL) {
remote_peer = libp2p_peer_new();
new_peer = 1;
// put peer information in Libp2pPeer struct
remote_peer->id_size = strlen(local_session->remote_peer_id);
if (remote_peer->id_size > 0) {
remote_peer->id = malloc(remote_peer->id_size + 1);
if (remote_peer->id != NULL) {
memcpy(remote_peer->id, local_session->remote_peer_id, remote_peer->id_size);
remote_peer->id[remote_peer->id_size] = 0;
}
}
} else {
libp2p_logger_debug("secio", "Same remote connected. Replacing SessionContext.\n");
// clean up old session context
libp2p_session_context_free(remote_peer->sessionContext);
}
remote_peer->sessionContext = local_session;
remote_peer->connection_type = CONNECTION_TYPE_CONNECTED;
if (new_peer) {
libp2p_logger_debug("secio", "New connection. Adding Peer to Peerstore.\n");
libp2p_peerstore_add_peer(peerstore, remote_peer);
}
// negotiate encryption parameters NOTE: SelectBest must match, otherwise this won't work
@ -1086,9 +1101,6 @@ int libp2p_secio_handshake(struct SessionContext* local_session, struct RsaPriva
if (retVal == 1) {
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake success!\n");
// add this to the peerstore
if (peerstore != NULL)
libp2p_peerstore_add_peer(peerstore, remote_peer);
} else {
libp2p_logger_log("secio", LOGLEVEL_DEBUG, "Handshake returning false\n");
libp2p_peer_free(remote_peer);

View file

@ -75,9 +75,12 @@ void libp2p_logger_log(const char* area, int log_level, const char* format, ...)
libp2p_logger_init();
if (log_level <= CURRENT_LOGLEVEL) {
if (libp2p_logger_watching_class(area)) {
int new_format_size = strlen(format) + strlen(area) + 10;
char new_format[new_format_size];
sprintf(&new_format[0], "[%s] %s", area, format);
va_list argptr;
va_start(argptr, format);
vfprintf(stderr, format, argptr);
vfprintf(stderr, new_format, argptr);
va_end(argptr);
}
}