Implementing a Stream interface in front of Multistream
This commit is contained in:
parent
8139dc9d48
commit
f1aaae0f17
6 changed files with 121 additions and 89 deletions
|
@ -1,25 +1,34 @@
|
|||
#pragma once
|
||||
|
||||
#include "libp2p/net/stream.h"
|
||||
|
||||
/***
|
||||
* An implementation of the libp2p multistream
|
||||
*
|
||||
* NOTE: This is a severe twist on (break from?) what is multistream. In the GO code,
|
||||
* multistream does the initial connection, and has a list of protocols that
|
||||
* do the work. Here, we've gotten rid of the protocols for now, in order to
|
||||
* get things working. We're passing around DHT messages for now.
|
||||
*
|
||||
* So in short, much of this will change. But for now, think of it as a Proof of Concept.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Write to an open multistream host
|
||||
* Read from a multistream socket
|
||||
* @param socket_fd the socket file descriptor
|
||||
* @param data the data to send
|
||||
* @param data_length the length of the data
|
||||
* @returns the number of bytes written
|
||||
*/
|
||||
int libp2p_net_multistream_send(int socket_fd, const unsigned char* data, size_t data_length);
|
||||
int libp2p_net_multistream_read(struct Stream* stream, unsigned char** data, size_t* data_length);
|
||||
/**
|
||||
* Read from a multistream socket
|
||||
* Write to an open multistream host
|
||||
* @param socket_fd the socket file descriptor
|
||||
* @param results where to put the results. NOTE: this memory is allocated
|
||||
* @param results_size the size of the results in bytes
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
*/
|
||||
int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* results_size);
|
||||
int libp2p_net_multistream_write(struct Stream* stream, const unsigned char* data, size_t data_size);
|
||||
|
||||
/**
|
||||
* Connect to a multistream host, and this includes the multistream handshaking.
|
||||
|
@ -27,28 +36,23 @@ int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* result
|
|||
* @param port the port
|
||||
* @returns the socket file descriptor of the connection, or -1 on error
|
||||
*/
|
||||
int libp2p_net_multistream_connect(const char* hostname, int port);
|
||||
struct Stream* libp2p_net_multistream_connect(const char* hostname, int port);
|
||||
|
||||
/**
|
||||
* Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function.
|
||||
* Servers should send the protocol ID, and then expect it back.
|
||||
* @param fd the socket file descriptor
|
||||
* @returns true(1) if the negotiation was successful.
|
||||
* @returns true(1) on success, or false(0)
|
||||
*/
|
||||
int libp2p_net_multistream_negotiate(int fd);
|
||||
int libp2p_net_multistream_negotiate(struct Stream* stream);
|
||||
|
||||
/**
|
||||
* Expect to read a message, and follow its instructions
|
||||
* @param fd the socket file descriptor
|
||||
* @returns true(1) on success, false(0) if not
|
||||
*/
|
||||
int libp2p_net_multistream_handle_message(int fd);
|
||||
struct Libp2pMessage* libp2p_net_multistream_get_message(struct Stream* stream);
|
||||
|
||||
/**
|
||||
* Connect to a multistream host, and this includes the multistream handshaking.
|
||||
* @param hostname the host
|
||||
* @param port the port
|
||||
* @returns the socket file descriptor of the connection, or -1 on error
|
||||
*/
|
||||
int libp2p_net_multistream_connect(const char* hostname, int port);
|
||||
struct Stream* libp2p_net_multistream_stream_new(int socket_fd);
|
||||
|
||||
void libp2p_net_multistream_stream_free(struct Stream* stream);
|
||||
|
|
|
@ -13,11 +13,10 @@ struct Stream {
|
|||
* Reads from the stream
|
||||
* @param stream the stream
|
||||
* @param buffer where to put the results
|
||||
* @param max_buffer_size don't read more than this many bytes
|
||||
* @param bytes_read how many bytes were read
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int (*read)(struct Stream* stream, char* buffer, size_t max_buffer_size, size_t* bytes_read);
|
||||
int (*read)(struct Stream* stream, unsigned char** buffer, size_t* bytes_read);
|
||||
|
||||
/**
|
||||
* Writes to a stream
|
||||
|
@ -26,7 +25,7 @@ struct Stream {
|
|||
* @param how much to write
|
||||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int (*write)(struct Stream* stream, char* buffer, size_t buffer_size);
|
||||
int (*write)(struct Stream* stream, const unsigned char* buffer, size_t buffer_size);
|
||||
|
||||
/**
|
||||
* Closes a stream
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
#include <stdlib.h>
|
||||
#include <unistd.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <errno.h>
|
||||
|
@ -7,11 +8,17 @@
|
|||
#include "libp2p/net/p2pnet.h"
|
||||
#include "libp2p/record/message.h"
|
||||
#include "varint.h"
|
||||
#include "libp2p/net/multistream.h"
|
||||
|
||||
/***
|
||||
* An implementation of the libp2p multistream
|
||||
*/
|
||||
|
||||
int libp2p_net_multistream_close(struct Stream* stream) {
|
||||
close((intptr_t)stream->socket_descriptor);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Write to an open multistream host
|
||||
* @param socket_fd the socket file descriptor
|
||||
|
@ -19,7 +26,7 @@
|
|||
* @param data_length the length of the data
|
||||
* @returns the number of bytes written
|
||||
*/
|
||||
int libp2p_net_multistream_send(int socket_fd, const unsigned char* data, size_t data_length) {
|
||||
int libp2p_net_multistream_write(struct Stream* stream, const unsigned char* data, size_t data_length) {
|
||||
int num_bytes = 0;
|
||||
|
||||
if (data_length > 0) { // only do this is if there is something to send
|
||||
|
@ -27,11 +34,11 @@ int libp2p_net_multistream_send(int socket_fd, const unsigned char* data, size_t
|
|||
unsigned char varint[12];
|
||||
size_t varint_size = 0;
|
||||
varint_encode(data_length, &varint[0], 12, &varint_size);
|
||||
num_bytes = socket_write(socket_fd, (char*)varint, varint_size, 0);
|
||||
num_bytes = socket_write(*((int*)stream->socket_descriptor), (char*)varint, varint_size, 0);
|
||||
if (num_bytes == 0)
|
||||
return 0;
|
||||
// then send the actual data
|
||||
num_bytes += socket_write(socket_fd, (char*)data, data_length, 0);
|
||||
num_bytes += socket_write(*((int*)stream->socket_descriptor), (char*)data, data_length, 0);
|
||||
}
|
||||
|
||||
return num_bytes;
|
||||
|
@ -44,7 +51,7 @@ int libp2p_net_multistream_send(int socket_fd, const unsigned char* data, size_t
|
|||
* @param results_size the size of the results in bytes
|
||||
* @returns number of bytes received
|
||||
*/
|
||||
int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* results_size) {
|
||||
int libp2p_net_multistream_read(struct Stream* stream, unsigned char** results, size_t* results_size) {
|
||||
int bytes = 0;
|
||||
size_t buffer_size = 65535;
|
||||
char buffer[buffer_size];
|
||||
|
@ -54,7 +61,7 @@ int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* result
|
|||
// first read the varint
|
||||
while(1) {
|
||||
unsigned char c;
|
||||
bytes = socket_read(socket_fd, (char*)&c, 1, 0);
|
||||
bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0);
|
||||
pos[0] = c;
|
||||
if (c >> 7 == 0) {
|
||||
pos[1] = 0;
|
||||
|
@ -68,7 +75,7 @@ int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* result
|
|||
|
||||
left = num_bytes_requested;
|
||||
do {
|
||||
bytes = socket_read(socket_fd, &buffer[already_read], left, 0);
|
||||
bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0);
|
||||
if (bytes < 0) {
|
||||
bytes = 0;
|
||||
if ( (errno == EAGAIN) || (errno == EWOULDBLOCK)) {
|
||||
|
@ -100,11 +107,12 @@ int libp2p_net_multistream_receive(int socket_fd, char** results, size_t* result
|
|||
* @param port the port
|
||||
* @returns the socket file descriptor of the connection, or -1 on error
|
||||
*/
|
||||
int libp2p_net_multistream_connect(const char* hostname, int port) {
|
||||
struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
|
||||
int retVal = -1, return_result = -1, socket = -1;
|
||||
char* results = NULL;
|
||||
unsigned char* results = NULL;
|
||||
size_t results_size;
|
||||
size_t num_bytes = 0;
|
||||
struct Stream* stream = NULL;
|
||||
|
||||
uint32_t ip = hostname_to_ip(hostname);
|
||||
socket = socket_open4();
|
||||
|
@ -116,88 +124,104 @@ int libp2p_net_multistream_connect(const char* hostname, int port) {
|
|||
// send the multistream handshake
|
||||
char* protocol_buffer = "/multistream/1.0.0\n";
|
||||
|
||||
num_bytes = libp2p_net_multistream_send(socket, (unsigned char*)protocol_buffer, strlen(protocol_buffer));
|
||||
stream = libp2p_net_multistream_stream_new(socket);
|
||||
if (stream == NULL)
|
||||
goto exit;
|
||||
|
||||
num_bytes = libp2p_net_multistream_write(stream, (unsigned char*)protocol_buffer, strlen(protocol_buffer));
|
||||
if (num_bytes <= 0)
|
||||
goto exit;
|
||||
|
||||
// try to receive the protocol id
|
||||
return_result = libp2p_net_multistream_receive(socket, &results, &results_size);
|
||||
return_result = libp2p_net_multistream_read(stream, &results, &results_size);
|
||||
if (return_result == 0 || results_size < 1)
|
||||
goto exit;
|
||||
|
||||
if (strstr(results, "multistream") == NULL)
|
||||
if (strstr((char*)results, "multistream") == NULL)
|
||||
goto exit;
|
||||
|
||||
// we are now in the loop, so we can switch to another protocol (i.e. /secio/1.0.0)
|
||||
|
||||
retVal = socket;
|
||||
exit:
|
||||
if (results != NULL)
|
||||
free(results);
|
||||
if (retVal < 0 && stream != NULL) {
|
||||
libp2p_net_multistream_stream_free(stream);
|
||||
stream = NULL;
|
||||
}
|
||||
return stream;
|
||||
}
|
||||
|
||||
int libp2p_net_multistream_negotiate(struct Stream* stream) {
|
||||
const char* protocolID = "/multistream/1.0.0\n";
|
||||
unsigned char* results = NULL;
|
||||
size_t results_length = 0;
|
||||
int retVal = 0;
|
||||
// send the protocol id
|
||||
if (!libp2p_net_multistream_write(stream, (unsigned char*)protocolID, strlen(protocolID)))
|
||||
goto exit;
|
||||
// expect the same back
|
||||
libp2p_net_multistream_read(stream, &results, &results_length);
|
||||
if (results_length == 0)
|
||||
goto exit;
|
||||
if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0)
|
||||
goto exit;
|
||||
retVal = 1;
|
||||
exit:
|
||||
if (results != NULL)
|
||||
free(results);
|
||||
return retVal;
|
||||
}
|
||||
|
||||
int libp2p_net_multistream_negotiate(int fd) {
|
||||
const char* protocolID = "/multistream/1.0.0\n";
|
||||
char* results;
|
||||
size_t results_length = 0;
|
||||
// send the protocol id
|
||||
if (!libp2p_net_multistream_send(fd, (unsigned char*)protocolID, strlen(protocolID)))
|
||||
return 0;
|
||||
// expect the same back
|
||||
libp2p_net_multistream_receive(fd, &results, &results_length);
|
||||
if (results_length == 0)
|
||||
return 0;
|
||||
if (strncmp(results, protocolID, strlen(protocolID)) != 0)
|
||||
return 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* The remote client requested a ping
|
||||
* Expect to read a message
|
||||
* @param fd the socket file descriptor
|
||||
* @param msg the incoming ping message
|
||||
* @returns true(1) on success, otherwise false(0)
|
||||
* @returns the retrieved message, or NULL
|
||||
*/
|
||||
int libp2p_net_multistream_handle_ping(int fd, struct Libp2pMessage* msg) {
|
||||
// protobuf the message
|
||||
size_t protobuf_size = libp2p_message_protobuf_encode_size(msg);
|
||||
unsigned char protobuf[protobuf_size];
|
||||
libp2p_message_protobuf_encode(msg, &protobuf[0], protobuf_size, &protobuf_size);
|
||||
libp2p_net_multistream_send(fd, protobuf, protobuf_size);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Expect to read a message, and follow its instructions
|
||||
* @param fd the socket file descriptor
|
||||
* @returns true(1) on success, false(0) if not
|
||||
*/
|
||||
int libp2p_net_multistream_handle_message(int fd) {
|
||||
struct Libp2pMessage* libp2p_net_multistream_get_message(struct Stream* stream) {
|
||||
int retVal = 0;
|
||||
unsigned char* results = NULL;
|
||||
size_t results_size = 0;
|
||||
struct Libp2pMessage* msg = NULL;
|
||||
// read what they sent
|
||||
libp2p_net_multistream_receive(fd, (char**)&results, &results_size);
|
||||
libp2p_net_multistream_read(stream, &results, &results_size);
|
||||
// unprotobuf it
|
||||
if (!libp2p_message_protobuf_decode(results, results_size, &msg))
|
||||
goto exit;
|
||||
// do what they ask
|
||||
switch (msg->message_type) {
|
||||
case (MESSAGE_TYPE_PING):
|
||||
libp2p_net_multistream_handle_ping(fd, msg);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
// clean up
|
||||
retVal = 1;
|
||||
exit:
|
||||
if (results != NULL)
|
||||
free(results);
|
||||
if (msg != NULL)
|
||||
if (retVal != 1 && msg != NULL)
|
||||
libp2p_message_free(msg);
|
||||
|
||||
return retVal;
|
||||
return msg;
|
||||
}
|
||||
|
||||
void libp2p_net_multistream_stream_free(struct Stream* stream) {
|
||||
if (stream != NULL) {
|
||||
if (stream->socket_descriptor != NULL)
|
||||
free(stream->socket_descriptor);
|
||||
free(stream);
|
||||
}
|
||||
}
|
||||
|
||||
struct Stream* libp2p_net_multistream_stream_new(int socket_fd) {
|
||||
struct Stream* out = (struct Stream*)malloc(sizeof(struct Stream));
|
||||
if (out != NULL) {
|
||||
out->socket_descriptor = malloc(sizeof(int));
|
||||
*((int*)out->socket_descriptor) = socket_fd;
|
||||
int res = *((int*)out->socket_descriptor);
|
||||
if (res != socket_fd) {
|
||||
libp2p_net_multistream_stream_free(out);
|
||||
return NULL;
|
||||
}
|
||||
out->close = libp2p_net_multistream_close;
|
||||
out->read = libp2p_net_multistream_read;
|
||||
out->write = libp2p_net_multistream_write;
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
|
|
|
@ -474,6 +474,9 @@ int libp2p_secio_read(struct SecureSession* session, unsigned char** results, si
|
|||
* @returns true(1) on success, false(0) otherwise
|
||||
*/
|
||||
int libp2p_secio_handshake(struct SecureSession* local_session, struct RsaPrivateKey* private_key) {
|
||||
// this needs to be redone
|
||||
return 0;
|
||||
/*
|
||||
int retVal = 0;
|
||||
size_t results_size = 0, bytes_written = 0;
|
||||
unsigned char* propose_in_bytes = NULL; // the remote protobuf
|
||||
|
@ -553,16 +556,14 @@ int libp2p_secio_handshake(struct SecureSession* local_session, struct RsaPrivat
|
|||
memcpy(total, protocol, protocol_len);
|
||||
memcpy(&total[protocol_len], propose_out_bytes, propose_out_size);
|
||||
|
||||
bytes_written = libp2p_net_multistream_send(local_session->socket_descriptor, total, protocol_len + propose_out_size);
|
||||
bytes_written = libp2p_net_multistream_write(local_session->socket_descriptor, total, protocol_len + propose_out_size);
|
||||
free(total);
|
||||
if (bytes_written <= 0)
|
||||
goto exit;
|
||||
|
||||
/*
|
||||
bytes_written = libp2p_secio_write(local_session, propose_out_bytes, propose_out_size);
|
||||
if (bytes_written < propose_out_size)
|
||||
goto exit;
|
||||
*/
|
||||
//bytes_written = libp2p_secio_write(local_session, propose_out_bytes, propose_out_size);
|
||||
//if (bytes_written < propose_out_size)
|
||||
// goto exit;
|
||||
// we should get back the secio confirmation
|
||||
bytes_written = libp2p_net_multistream_receive(local_session->socket_descriptor, (char**)&results, &results_size);
|
||||
if (bytes_written < 5 || strstr((char*)results, "secio") == NULL)
|
||||
|
@ -726,5 +727,5 @@ int libp2p_secio_handshake(struct SecureSession* local_session, struct RsaPrivat
|
|||
libp2p_secio_propose_free(propose_in);
|
||||
|
||||
return retVal;
|
||||
|
||||
*/
|
||||
}
|
||||
|
|
|
@ -7,12 +7,12 @@
|
|||
#include "libp2p/net/multistream.h"
|
||||
|
||||
int test_multistream_connect() {
|
||||
int retVal = 0, socket_fd = -1;
|
||||
int retVal = 0;
|
||||
char* response;
|
||||
size_t response_size;
|
||||
|
||||
socket_fd = libp2p_net_multistream_connect("www.jmjatlanta.com", 4001);
|
||||
if (socket_fd < 0)
|
||||
struct Stream* stream = libp2p_net_multistream_connect("www.jmjatlanta.com", 4001);
|
||||
if (stream == NULL)
|
||||
goto exit;
|
||||
|
||||
retVal = 1;
|
||||
|
@ -24,25 +24,25 @@ int test_multistream_connect() {
|
|||
|
||||
int test_multistream_get_list() {
|
||||
int retVal = 0, socket_fd = -1;
|
||||
char* response;
|
||||
unsigned char* response;
|
||||
size_t response_size;
|
||||
|
||||
socket_fd = libp2p_net_multistream_connect("www.jmjatlanta.com", 4001);
|
||||
struct Stream* stream = libp2p_net_multistream_connect("www.jmjatlanta.com", 4001);
|
||||
if (socket_fd < 0)
|
||||
goto exit;
|
||||
|
||||
// try to respond something, ls command
|
||||
const unsigned char* out = "ls\n";
|
||||
|
||||
if (libp2p_net_multistream_send(socket_fd, out, strlen((char*)out)) <= 0)
|
||||
if (libp2p_net_multistream_write(stream, out, strlen((char*)out)) <= 0)
|
||||
goto exit;
|
||||
|
||||
// retrieve response
|
||||
retVal = libp2p_net_multistream_receive(socket_fd, &response, &response_size);
|
||||
retVal = libp2p_net_multistream_read(stream, &response, &response_size);
|
||||
if (retVal <= 0)
|
||||
goto exit;
|
||||
|
||||
fprintf(stdout, "Response from multistream ls: %s", response);
|
||||
fprintf(stdout, "Response from multistream ls: %s", (char*)response);
|
||||
|
||||
retVal = 1;
|
||||
|
||||
|
|
|
@ -5,6 +5,9 @@
|
|||
#include "libp2p/net/p2pnet.h"
|
||||
|
||||
int test_secio_handshake() {
|
||||
return 0;
|
||||
/*
|
||||
* this needs to be redone
|
||||
int retVal = 0;
|
||||
size_t decode_base64_size = 0;
|
||||
unsigned char* decode_base64 = NULL;
|
||||
|
@ -74,4 +77,5 @@ int test_secio_handshake() {
|
|||
if (rsa_private_key != NULL)
|
||||
libp2p_crypto_rsa_rsa_private_key_free(rsa_private_key);
|
||||
return retVal;
|
||||
*/
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue