From 029e3d800f36f1c4476b0e1d26429069329947e1 Mon Sep 17 00:00:00 2001 From: John Jones Date: Thu, 16 Feb 2017 23:13:16 -0500 Subject: [PATCH] Implementation of peer protobuf --- include/libp2p/record/message.h | 40 +++++++- include/libp2p/utils/vector.h | 17 ++++ record/Makefile | 2 +- record/message.c | 167 ++++++++++++++++++++++++++++++++ record/record.c | 10 +- utils/vector.c | 30 ++++++ 6 files changed, 256 insertions(+), 10 deletions(-) create mode 100644 record/message.c diff --git a/include/libp2p/record/message.h b/include/libp2p/record/message.h index 5a56cda..2b64178 100644 --- a/include/libp2p/record/message.h +++ b/include/libp2p/record/message.h @@ -5,8 +5,40 @@ * This is used for the KAD / DHT stuff */ -struct Libp2pPeer { - char* id; - size_t id_size; - +enum MessageType { + MESSAGE_TYPE_PUT_VALUE = 0, + MESSAGE_TYPE_GET_VALUE = 1, + MESSAGE_TYPE_ADD_PROVIDER = 2, + MESSAGE_TYPE_GET_PROVIDERS = 3, + MESSAGE_TYPE_FIND_NODE = 4, + MESSAGE_TYPE_PING = 5 }; + +enum ConnectionType { + // sender does not have a connection to the peer, and no extra information (default) + CONNECTION_TYPE_NOT_CONNECTED = 0, + // sender has a live connection to the peer + CONNECTION_TYPE_CONNECTED = 1, + // sender recently connected to peer + CONNECTION_TYPE_CAN_CONNECT = 2, + // sender recently tried to connect to peer repeatedly but failed to connect + CONNECTION_TYPE_CANNOT_CONNECT = 3 +}; + +struct Libp2pPeer { + char* id; // protobuf field 1 + size_t id_size; + struct Libp2pLinkedList* addr_head; // protobuf field 2 of multiaddr bytes (repeatable) (stored here as a Libp2pVector) + enum ConnectionType connection_type; // protobuf field 3 (a varint) +}; + +struct Libp2pMessage { + enum MessageType message_type; // protobuf field 1 (a varint) + char* key; // protobuf field 2 + size_t key_size; + struct Libp2pRecord* record; // protobuf field 3 + struct Libp2pLinkedList* closer_peer_head; // protobuf field 8 + struct Libp2pLinkedList* provider_peer_head; // protobuf field 9 + int32_t cluster_level_raw; // protobuf field 10 +}; + diff --git a/include/libp2p/utils/vector.h b/include/libp2p/utils/vector.h index 6220234..a778fbe 100644 --- a/include/libp2p/utils/vector.h +++ b/include/libp2p/utils/vector.h @@ -22,3 +22,20 @@ void libp2p_utils_vector_free(struct Libp2pVector* vector); * Add bytes to vector */ int libp2p_utils_vector_add(struct Libp2pVector* vector, unsigned char* in_bytes, size_t in_size); + +/** + * serialize the vector into a byte array that has a 4 byte prefix of the size + * @param vector the vector to serialize + * @param out a pointer to the byte array that will be filled + * @param out_size the number of bytes written + * @returns true(1) on success, otherwise false + */ +int libp2p_utils_vector_serialize(struct Libp2pVector* vector, unsigned char** out, size_t* out_size); + +/** + * turn a byte array into a Libp2pVector + * @param in the bytes that were previously serialized + * @param out the new Libp2pVector + * @returns true(1) on success, otherwise false(0) + */ +int libp2p_utils_vector_unserialize(unsigned char* in, struct Libp2pVector** out); diff --git a/record/Makefile b/record/Makefile index f72c847..69e2c04 100644 --- a/record/Makefile +++ b/record/Makefile @@ -2,7 +2,7 @@ CC = gcc CFLAGS = -O0 -I../include -I../../c-protobuf -I../../c-multihash/include -g3 LFLAGS = DEPS = -OBJS = record.o +OBJS = record.o message.o %.o: %.c $(DEPS) $(CC) -c -o $@ $< $(CFLAGS) diff --git a/record/message.c b/record/message.c new file mode 100644 index 0000000..60ac84f --- /dev/null +++ b/record/message.c @@ -0,0 +1,167 @@ +#include + +#include "libp2p/record/message.h" +#include "libp2p/utils/linked_list.h" +#include "libp2p/utils/vector.h" +#include "protobuf.h" +/** + * create a new Peer struct + * @returns a struct or NULL if there was a problem + */ +struct Libp2pPeer* libp2p_message_peer_new() { + struct Libp2pPeer* out = (struct Libp2pPeer*)malloc(sizeof(struct Libp2pPeer)); + if (out != NULL) { + out->id = NULL; + out->id_size = 0; + out->addr_head = NULL; + out->connection_type = CONNECTION_TYPE_NOT_CONNECTED; + } + return out; +} + +void libp2p_message_peer_free(struct Libp2pPeer* in) { + if (in != NULL) { + if (in->id != NULL) + free(in); + // free the memory in the linked list + struct Libp2pLinkedList* current = in->addr_head; + while (current != NULL) { + struct Libp2pLinkedList* temp = current->next; + free(current->item); + current = temp; + } + free(in); + } +} + +size_t libp2p_message_peer_protobuf_encode_size(struct Libp2pPeer* in) { + // id + connection_type + int sz = 11 + in->id_size + 11; + // loop through the multiaddresses + struct Libp2pLinkedList* current = in->addr_head; + while (current != NULL) { + unsigned char* data = (unsigned char*)current->item; + struct Libp2pVector* vector; + if (!libp2p_utils_vector_unserialize(data, &vector)) + return 0; + sz += 11 + vector->buffer_size; + libp2p_utils_vector_free(vector); + current = current->next; + } + return sz; +} + +int libp2p_message_peer_protobuf_encode(struct Libp2pPeer* in, unsigned char* buffer, size_t max_buffer_size, size_t* bytes_written) { + // data & data_size + size_t bytes_used = 0; + *bytes_written = 0; + int retVal = 0; + // field 1 (id) + retVal = protobuf_encode_length_delimited(1, WIRETYPE_LENGTH_DELIMITED, in->id, in->id_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used); + if (retVal == 0) + return 0; + *bytes_written += bytes_used; + // field 2 (repeated) + struct Libp2pLinkedList* current = in->addr_head; + while (current != NULL) { + struct Libp2pVector* vector; + if (!libp2p_utils_vector_unserialize(current->item, &vector)) + return 0; + retVal = protobuf_encode_length_delimited(2, WIRETYPE_LENGTH_DELIMITED, vector->buffer, vector->buffer_size, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used); + libp2p_utils_vector_free(vector); + if (retVal == 0) + return 0; + *bytes_written += bytes_used; + current = current->next; + } + // field 3 (varint) + retVal = protobuf_encode_varint(3, WIRETYPE_VARINT, in->connection_type, &buffer[*bytes_written], max_buffer_size - *bytes_written, &bytes_used); + if (retVal == 0) + return 0; + *bytes_written += bytes_used; + return 1; +} + +int libp2p_message_peer_protobuf_decode(unsigned char* in, size_t in_size, struct Libp2pPeer** out) { + size_t pos = 0; + int retVal = 0; + unsigned char* buffer = NULL; + size_t buffer_size = 0; + struct Libp2pVector vector; + struct Libp2pLinkedList* current = NULL; + struct Libp2pLinkedList* last = NULL; + + vector.buffer = NULL; + + if ( (*out = (struct Libp2pPeer*)malloc(sizeof(struct Libp2pPeer))) == NULL) + goto exit; + + struct Libp2pPeer* ptr = *out; + + ptr->addr_head = NULL; + + + while(pos < in_size) { + size_t bytes_read = 0; + int field_no; + enum WireType field_type; + if (protobuf_decode_field_and_type(&in[pos], in_size, &field_no, &field_type, &bytes_read) == 0) { + goto exit; + } + pos += bytes_read; + switch(field_no) { + case (1): // id + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&(ptr->id),&(ptr->id_size), &bytes_read) == 0) + goto exit; + pos += bytes_read; + break; + case (2): { // array of bytes that is a multiaddress, put it in a vector + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&vector.buffer, &vector.buffer_size, &bytes_read) == 0) + goto exit; + pos += bytes_read; + // now turn it into a byte array + struct Libp2pLinkedList* current = (struct Libp2pLinkedList*)malloc(sizeof(struct Libp2pLinkedList)); + if (current == NULL) + goto exit; + current->next = NULL; + size_t vector_size; + if (!libp2p_utils_vector_serialize(&vector, (unsigned char**)¤t->item, &vector_size)) { + free(current); + goto exit; + } + // clean up vector + free(vector.buffer); + vector.buffer = NULL; + // assign the values + if (ptr->addr_head == NULL) { + ptr->addr_head = current; + } else { + last->next = current; + } + last = current; + current = NULL; + break; + } + case (3): // enum as varint + if (!protobuf_decode_varint(&in[pos], in_size - pos, (long long unsigned int*)&ptr->connection_type, &bytes_read) == 0) + goto exit; + pos += bytes_read; + break; + } + } + + retVal = 1; + +exit: + if (retVal == 0) { + free(*out); + *out = NULL; + } + if (buffer != NULL) + free(buffer); + if (vector.buffer != NULL) + free(vector.buffer); + return retVal; + +} + diff --git a/record/record.c b/record/record.c index 6164f4a..ff4a3e0 100644 --- a/record/record.c +++ b/record/record.c @@ -87,27 +87,27 @@ int libp2p_record_protobuf_decode(const unsigned char* in, size_t in_size, struc pos += bytes_read; switch(field_no) { case (1): // key - if (protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->key),&((*out)->key_size), &bytes_read) == 0) + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->key),&((*out)->key_size), &bytes_read) == 0) goto exit; pos += bytes_read; break; case (2): // value - if (protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->value), &((*out)->value_size), &bytes_read) == 0) + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->value), &((*out)->value_size), &bytes_read) == 0) goto exit; pos += bytes_read; break; case (3): // author - if (protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->author), &((*out)->author_size), &bytes_read) == 0) + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->author), &((*out)->author_size), &bytes_read) == 0) goto exit; pos += bytes_read; break; case (4): // signature - if (protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->signature), &((*out)->signature_size), &bytes_read) == 0) + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->signature), &((*out)->signature_size), &bytes_read) == 0) goto exit; pos += bytes_read; break; case (5): // time - if (protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->time_received), &((*out)->time_received_size), &bytes_read) == 0) + if (!protobuf_decode_length_delimited(&in[pos], in_size - pos, (char**)&((*out)->time_received), &((*out)->time_received_size), &bytes_read) == 0) goto exit; pos += bytes_read; break; diff --git a/utils/vector.c b/utils/vector.c index 2565221..2087070 100644 --- a/utils/vector.c +++ b/utils/vector.c @@ -46,3 +46,33 @@ int libp2p_utils_vector_add(struct Libp2pVector* vector, unsigned char* in_bytes } return 1; } + +int libp2p_utils_vector_serialize(struct Libp2pVector* vector, unsigned char** out, size_t* out_size) { + // the first 4 bytes are the size, followed by the the byte array + *out_size = vector->buffer_size + 4; + *out = (unsigned char*)malloc(*out_size); + if (*out == NULL) + return 0; + unsigned char* ptr = *out; + ptr[0] = (vector->buffer_size >> 24) & 0xFF; + ptr[1] = (vector->buffer_size >> 16) & 0xFF; + ptr[2] = (vector->buffer_size >> 8) & 0xFF; + ptr[3] = vector->buffer_size & 0xFF; + memcpy(&ptr[4], vector->buffer, vector->buffer_size); + return 1; +} + +int libp2p_utils_vector_unserialize(unsigned char* in, struct Libp2pVector** out) { + *out = (struct Libp2pVector*)malloc(sizeof(struct Libp2pVector)); + if (*out == NULL) + return 0; + struct Libp2pVector* ptr = *out; + ptr->buffer_size = in[0] | (in[1] << 8) | (in[2] << 16) | (in[3] << 24); + ptr->buffer = (unsigned char*)malloc(ptr->buffer_size); + if (ptr->buffer == NULL) { + free (*out); + return 0; + } + memcpy(ptr->buffer, &in[4], ptr->buffer_size); + return 1; +}