Now doing multistream over secio

This commit is contained in:
John Jones 2017-07-13 09:01:50 -05:00
parent cff0e4d6aa
commit 6c19984368
4 changed files with 94 additions and 62 deletions

View file

@ -1,6 +1,7 @@
#pragma once #pragma once
#include "libp2p/net/stream.h" #include "libp2p/net/stream.h"
#include "libp2p/conn/session.h"
/*** /***
* An implementation of the libp2p multistream * An implementation of the libp2p multistream
@ -42,10 +43,10 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port);
/** /**
* Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function. * Negotiate the multistream protocol by sending and receiving the protocol id. This is a server side function.
* Servers should send the protocol ID, and then expect it back. * Servers should send the protocol ID, and then expect it back.
* @param fd the socket file descriptor * @param session the struct Session, which contains all the context info
* @returns true(1) on success, or false(0) * @returns true(1) on success, or false(0)
*/ */
int libp2p_net_multistream_negotiate(struct Stream* stream); int libp2p_net_multistream_negotiate(struct SessionContext* session);
/** /**
* Expect to read a message, and follow its instructions * Expect to read a message, and follow its instructions

View file

@ -10,6 +10,7 @@
#include "libp2p/secio/secio.h" #include "libp2p/secio/secio.h"
#include "varint.h" #include "varint.h"
#include "libp2p/net/multistream.h" #include "libp2p/net/multistream.h"
#include "libp2p/utils/logger.h"
#include "multiaddr/multiaddr.h" #include "multiaddr/multiaddr.h"
// NOTE: this is normally set to 5 seconds, but you may want to increase this during debugging // NOTE: this is normally set to 5 seconds, but you may want to increase this during debugging
@ -36,7 +37,7 @@ int libp2p_net_multistream_close(void* stream_context) {
*/ */
int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_length) { int libp2p_net_multistream_write(void* stream_context, const unsigned char* data, size_t data_length) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct SessionContext* session_context = (struct SessionContext*)stream_context;
struct Stream* stream = session_context->insecure_stream; struct Stream* stream = session_context->default_stream;
int num_bytes = 0; int num_bytes = 0;
if (data_length > 0) { // only do this is if there is something to send if (data_length > 0) { // only do this is if there is something to send
@ -44,11 +45,23 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data
unsigned char varint[12]; unsigned char varint[12];
size_t varint_size = 0; size_t varint_size = 0;
varint_encode(data_length, &varint[0], 12, &varint_size); varint_encode(data_length, &varint[0], 12, &varint_size);
num_bytes = socket_write(*((int*)stream->socket_descriptor), (char*)varint, varint_size, 0); // now put the size with the data
if (num_bytes == 0) unsigned char* buffer = (unsigned char*)malloc(data_length + varint_size);
return 0; memset(buffer, 0, data_length + varint_size);
// then send the actual data memcpy(buffer, varint, varint_size);
num_bytes += socket_write(*((int*)stream->socket_descriptor), (char*)data, data_length, 0); memcpy(&buffer[varint_size], data, data_length);
// determine if this should run through the secio protocol or not
if (session_context->secure_stream == NULL) {
// do a "raw" write
num_bytes = socket_write(*((int*)stream->socket_descriptor), (char*)varint, varint_size, 0);
if (num_bytes == 0)
return 0;
// then send the actual data
num_bytes += socket_write(*((int*)stream->socket_descriptor), (char*)data, data_length, 0);
} else {
// write using secio
num_bytes = stream->write(stream_context, buffer, data_length + varint_size);
}
} }
return num_bytes; return num_bytes;
@ -64,7 +77,7 @@ int libp2p_net_multistream_write(void* stream_context, const unsigned char* data
*/ */
int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size, int timeout_secs) { int libp2p_net_multistream_read(void* stream_context, unsigned char** results, size_t* results_size, int timeout_secs) {
struct SessionContext* session_context = (struct SessionContext*)stream_context; struct SessionContext* session_context = (struct SessionContext*)stream_context;
struct Stream* stream = session_context->insecure_stream; struct Stream* stream = session_context->default_stream;
int bytes = 0; int bytes = 0;
// TODO: this is arbitrary, and should be dynamic // TODO: this is arbitrary, and should be dynamic
size_t buffer_size = 362144; size_t buffer_size = 362144;
@ -72,48 +85,69 @@ int libp2p_net_multistream_read(void* stream_context, unsigned char** results, s
char* pos = buffer; char* pos = buffer;
size_t num_bytes_requested = 0, left = 0, already_read = 0; size_t num_bytes_requested = 0, left = 0, already_read = 0;
// first read the varint if (session_context->secure_stream == NULL) {
while(1) { // first read the varint
unsigned char c = '\0'; while(1) {
bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0, timeout_secs); unsigned char c = '\0';
if (bytes <= 0) { // timeout bytes = socket_read(*((int*)stream->socket_descriptor), (char*)&c, 1, 0, timeout_secs);
return 0; if (bytes <= 0) { // timeout
}
pos[0] = c;
if (c >> 7 == 0) {
pos[1] = 0;
num_bytes_requested = varint_decode((unsigned char*)buffer, strlen(buffer), NULL);
break;
}
pos++;
}
if (num_bytes_requested <= 0)
return 0;
left = num_bytes_requested;
do {
bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0, timeout_secs);
if (bytes < 0) {
bytes = 0;
if ( (errno == EAGAIN)) {
// do something intelligent
} else {
return 0; return 0;
} }
pos[0] = c;
if (c >> 7 == 0) {
pos[1] = 0;
num_bytes_requested = varint_decode((unsigned char*)buffer, strlen(buffer), NULL);
break;
}
pos++;
} }
left = left - bytes; if (num_bytes_requested <= 0)
already_read += bytes; return 0;
} while (left > 0);
if (already_read != num_bytes_requested) left = num_bytes_requested;
return 0; do {
bytes = socket_read(*((int*)stream->socket_descriptor), &buffer[already_read], left, 0, timeout_secs);
if (bytes < 0) {
bytes = 0;
if ( (errno == EAGAIN)) {
// do something intelligent
} else {
return 0;
}
}
left = left - bytes;
already_read += bytes;
} while (left > 0);
if (already_read != num_bytes_requested)
return 0;
// parse the results, removing the leading size indicator
*results = malloc(num_bytes_requested);
if (*results == NULL)
return 0;
memcpy(*results, buffer, num_bytes_requested);
*results_size = num_bytes_requested;
} else { // we should use secio instead of raw read/writes
if (session_context->default_stream->read(session_context, (unsigned char**)&pos, &buffer_size, timeout_secs) == 0) {
return 0;
}
// pull out num_bytes_requested
num_bytes_requested = varint_decode((unsigned char*)pos, strlen(pos), &left);
if (num_bytes_requested > buffer_size - left) {
libp2p_logger_error("multistream", "multistream wants to read %lu bytes from buffer of %lu bytes.\n", num_bytes_requested, buffer_size);
return 0;
}
*results = malloc(num_bytes_requested);
*results_size = num_bytes_requested;
if (*results == NULL) {
libp2p_logger_error("multistream", "Unable to allocate %lu bytes of memory.", num_bytes_requested);
return 0;
}
memcpy(*results, &pos[left], num_bytes_requested);
}
// parse the results, removing the leading size indicator
*results = malloc(num_bytes_requested);
if (*results == NULL)
return 0;
memcpy(*results, buffer, num_bytes_requested);
*results_size = num_bytes_requested;
return num_bytes_requested; return num_bytes_requested;
} }
@ -147,6 +181,7 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
struct SessionContext session; struct SessionContext session;
session.insecure_stream = stream; session.insecure_stream = stream;
session.secure_stream = NULL;
session.default_stream = stream; session.default_stream = stream;
// try to receive the protocol id // try to receive the protocol id
@ -176,19 +211,16 @@ struct Stream* libp2p_net_multistream_connect(const char* hostname, int port) {
return stream; return stream;
} }
int libp2p_net_multistream_negotiate(struct Stream* stream) { int libp2p_net_multistream_negotiate(struct SessionContext* session) {
const char* protocolID = "/multistream/1.0.0\n"; const char* protocolID = "/multistream/1.0.0\n";
unsigned char* results = NULL; unsigned char* results = NULL;
size_t results_length = 0; size_t results_length = 0;
int retVal = 0; int retVal = 0;
// send the protocol id // send the protocol id
struct SessionContext secure_session; if (!libp2p_net_multistream_write(session, (unsigned char*)protocolID, strlen(protocolID)))
secure_session.insecure_stream = stream;
secure_session.default_stream = stream;
if (!libp2p_net_multistream_write(&secure_session, (unsigned char*)protocolID, strlen(protocolID)))
goto exit; goto exit;
// expect the same back // expect the same back
libp2p_net_multistream_read(&secure_session, &results, &results_length, multistream_default_timeout); libp2p_net_multistream_read(session, &results, &results_length, multistream_default_timeout);
if (results_length == 0) if (results_length == 0)
goto exit; goto exit;
if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0) if (strncmp((char*)results, protocolID, strlen(protocolID)) != 0)

View file

@ -648,7 +648,7 @@ int libp2p_secio_decrypt(struct SessionContext* session, const unsigned char* in
int retVal = memcmp(&incoming[data_section_size], generated_mac, 32); int retVal = memcmp(&incoming[data_section_size], generated_mac, 32);
if (retVal != 0) { if (retVal != 0) {
// MAC verification failed // MAC verification failed
libp2p_logger_error("secio", "libp2p_secio_decrypt: MAC verification failed"); libp2p_logger_error("secio", "libp2p_secio_decrypt: MAC verification failed.\n");
return 0; return 0;
} }

View file

@ -74,6 +74,7 @@ int test_secio_handshake() {
secure_session.traffic_type = TCP; secure_session.traffic_type = TCP;
// connect to host // connect to host
secure_session.insecure_stream = libp2p_net_multistream_connect(secure_session.host, secure_session.port); secure_session.insecure_stream = libp2p_net_multistream_connect(secure_session.host, secure_session.port);
secure_session.default_stream = secure_session.insecure_stream;
if (*((int*)secure_session.insecure_stream->socket_descriptor) == -1) { if (*((int*)secure_session.insecure_stream->socket_descriptor) == -1) {
fprintf(stderr, "test_secio_handshake: Unable to get socket descriptor\n"); fprintf(stderr, "test_secio_handshake: Unable to get socket descriptor\n");
goto exit; goto exit;
@ -94,6 +95,7 @@ int test_secio_handshake() {
goto exit; goto exit;
} }
/*
fprintf(stdout, "Shared key: "); fprintf(stdout, "Shared key: ");
for(int i = 0; i < secure_session.shared_key_size; i++) for(int i = 0; i < secure_session.shared_key_size; i++)
fprintf(stdout, "%d ", secure_session.shared_key[i]); fprintf(stdout, "%d ", secure_session.shared_key[i]);
@ -102,16 +104,13 @@ int test_secio_handshake() {
fprintf(stdout, "\nRemote stretched key: "); fprintf(stdout, "\nRemote stretched key: ");
print_stretched_key(secure_session.remote_stretched_key); print_stretched_key(secure_session.remote_stretched_key);
fprintf(stdout, "\n"); fprintf(stdout, "\n");
*/
// now attempt to do something with it... // now attempt to do something with it... try to negotiate a multistream
secure_session.secure_stream->write(&secure_session, "/multistream/1.0.0\n", 3); if (libp2p_net_multistream_negotiate(&secure_session) == 0) {
unsigned char* results; fprintf(stdout, "Unable to negotiate multistream\n");
size_t results_size; goto exit;
secure_session.secure_stream->read(&secure_session, &results, &results_size, 10); }
fprintf(stdout, "test_secio_handshake: Results from multistream: Size: %lu string: %s", results_size, results);
for(int i = 0; i < results_size; i++)
fprintf(stdout, "%d ", results[i]);
fprintf(stdout, "\n");
retVal = 1; retVal = 1;
exit: exit: