Reading from stream now uses a StreamMessage struct
This commit is contained in:
parent
e394723fb6
commit
9afaf535d6
4 changed files with 22 additions and 20 deletions
10
core/null.c
10
core/null.c
|
@ -58,20 +58,18 @@ void ipfs_null_connection (void *ptr) {
|
||||||
libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count));
|
libp2p_logger_info("null", "Connection %d, count %d\n", connection_param->file_descriptor, *(connection_param->count));
|
||||||
|
|
||||||
// try to read from the network
|
// try to read from the network
|
||||||
uint8_t *results = 0;
|
struct StreamMessage *results = 0;
|
||||||
size_t bytes_read = 0;
|
|
||||||
// handle the call
|
// handle the call
|
||||||
for(;;) {
|
for(;;) {
|
||||||
// immediately attempt to negotiate multistream
|
// immediately attempt to negotiate multistream
|
||||||
if (!libp2p_net_multistream_send_protocol(session))
|
if (!libp2p_net_multistream_send_protocol(session))
|
||||||
break;
|
break;
|
||||||
if (!session->default_stream->read(session, &results, &bytes_read, DEFAULT_NETWORK_TIMEOUT)) {
|
if (!session->default_stream->read(session, &results, DEFAULT_NETWORK_TIMEOUT)) {
|
||||||
// problem reading;
|
// problem reading;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
retVal = libp2p_protocol_marshal(results, bytes_read, session, connection_param->local_node->protocol_handlers);
|
retVal = libp2p_protocol_marshal(results->data, results->data_size, session, connection_param->local_node->protocol_handlers);
|
||||||
if (results != NULL)
|
libp2p_stream_message_free(results);
|
||||||
free(results);
|
|
||||||
// exit the loop on error (or if they ask us to no longer loop by returning 0)
|
// exit the loop on error (or if they ask us to no longer loop by returning 0)
|
||||||
if (retVal <= 0)
|
if (retVal <= 0)
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -104,13 +104,12 @@ void* ipfs_bitswap_engine_peer_request_processor_start(void* ctx) {
|
||||||
libp2p_peer_handle_connection_error(current_peer_entry);
|
libp2p_peer_handle_connection_error(current_peer_entry);
|
||||||
} else if (retVal > 0) {
|
} else if (retVal > 0) {
|
||||||
libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry));
|
libp2p_logger_debug("bitswap_engine", "%d bytes waiting on network for peer %s.\n", retVal, libp2p_peer_id_to_string(current_peer_entry));
|
||||||
unsigned char* buffer = NULL;
|
struct StreamMessage* buffer = NULL;
|
||||||
size_t buffer_len = 0;
|
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, 1)) {
|
||||||
if (current_peer_entry->sessionContext->default_stream->read(current_peer_entry->sessionContext, &buffer, &buffer_len, 1)) {
|
|
||||||
// handle it
|
// handle it
|
||||||
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer_len, buffer);
|
libp2p_logger_debug("bitswap_engine", "%lu bytes read, result: [%s].\n", buffer->data_size, buffer->data);
|
||||||
int retVal = libp2p_protocol_marshal(buffer, buffer_len, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
int retVal = libp2p_protocol_marshal(buffer->data, buffer->data_size, current_peer_entry->sessionContext, context->ipfsNode->protocol_handlers);
|
||||||
free(buffer);
|
libp2p_stream_message_free(buffer);
|
||||||
did_some_processing = 1;
|
did_some_processing = 1;
|
||||||
if (retVal == -1) {
|
if (retVal == -1) {
|
||||||
libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n");
|
libp2p_logger_error("bitswap_engine", "protocol_marshal tried to handle the network traffic, but failed.\n");
|
||||||
|
|
|
@ -168,15 +168,15 @@ struct HashtableNode* ipfs_resolver_remote_get(const char* path, struct Hashtabl
|
||||||
if (!libp2p_routing_dht_upgrade_stream(&session_context))
|
if (!libp2p_routing_dht_upgrade_stream(&session_context))
|
||||||
return NULL;
|
return NULL;
|
||||||
stream->write(&session_context, message_protobuf, message_protobuf_size);
|
stream->write(&session_context, message_protobuf, message_protobuf_size);
|
||||||
unsigned char* response;
|
struct StreamMessage* response;
|
||||||
size_t response_size;
|
|
||||||
// we should get back a protobuf'd record
|
// we should get back a protobuf'd record
|
||||||
stream->read(&session_context, &response, &response_size, 5);
|
stream->read(&session_context, &response, 5);
|
||||||
if (response_size == 1)
|
if (response->data_size == 1)
|
||||||
return NULL;
|
return NULL;
|
||||||
// turn the protobuf into a Node
|
// turn the protobuf into a Node
|
||||||
struct HashtableNode* node;
|
struct HashtableNode* node;
|
||||||
ipfs_hashtable_node_protobuf_decode(response, response_size, &node);
|
ipfs_hashtable_node_protobuf_decode(response->data, response->data_size, &node);
|
||||||
|
libp2p_stream_message_free(response);
|
||||||
return node;
|
return node;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -303,6 +303,7 @@ int ipfs_journal_adjust_time(struct JournalToDo* todo, struct IpfsNode* local_no
|
||||||
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
* @returns 0 if the caller should not continue looping, <0 on error, >0 on success
|
||||||
*/
|
*/
|
||||||
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
|
int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, struct SessionContext* session_context, void* protocol_context) {
|
||||||
|
struct StreamMessage* msg = NULL;
|
||||||
// remove protocol
|
// remove protocol
|
||||||
uint8_t *incoming_pos = (uint8_t*) incoming;
|
uint8_t *incoming_pos = (uint8_t*) incoming;
|
||||||
size_t pos_size = incoming_size;
|
size_t pos_size = incoming_size;
|
||||||
|
@ -315,11 +316,15 @@ int ipfs_journal_handle_message(const uint8_t* incoming, size_t incoming_size, s
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
// read next segment from network
|
// read next segment from network
|
||||||
if (!session_context->default_stream->read(session_context, &incoming_pos, &pos_size, 10))
|
if (!session_context->default_stream->read(session_context, &msg, 10)) {
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
incoming_pos = msg->data;
|
||||||
second_read = 1;
|
second_read = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
libp2p_stream_message_free(msg);
|
||||||
|
msg = NULL;
|
||||||
}
|
}
|
||||||
libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id);
|
libp2p_logger_debug("journal", "Handling incoming message from %s.\n", session_context->remote_peer_id);
|
||||||
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
struct IpfsNode* local_node = (struct IpfsNode*)protocol_context;
|
||||||
|
|
Loading…
Reference in a new issue