diff --git a/net/connectionstream.c b/net/connectionstream.c index a6465c3..a036e60 100644 --- a/net/connectionstream.c +++ b/net/connectionstream.c @@ -51,6 +51,7 @@ int libp2p_net_connection_peek(void* stream_context) { int bytes = 0; int retVal = ioctl(socket_fd, FIONREAD, &bytes); + ctx->last_comm_epoch = time(NULL); if (retVal < 0) { // Ooff, we're having problems. Don't use this socket again. libp2p_logger_error("connectionstream", "Attempted a peek, but ioctl reported %s.\n", strerror(errno)); @@ -73,6 +74,7 @@ int libp2p_net_connection_read(void* stream_context, struct StreamMessage** msg, int current_size = 0; while (1) { int retVal = socket_read(ctx->socket_descriptor, (char*)&buffer[0], 4096, 0, timeout_secs); + ctx->last_comm_epoch = time(NULL); libp2p_logger_debug("connectionstream", "Retrieved %d bytes from socket %d.\n", retVal, ctx->socket_descriptor); if (retVal < 1) { // get out of the loop if (retVal < 0) // error @@ -133,6 +135,7 @@ int libp2p_net_connection_read_raw(void* stream_context, uint8_t* buffer, int bu int num_read = 0; for(int i = 0; i < buffer_size; i++) { int retVal = socket_read(ctx->socket_descriptor, (char*)&buffer[i], 1, 0, timeout_secs); + ctx->last_comm_epoch = time(NULL); if (retVal < 1) { // get out of the loop if (retVal < 0) // error return -1; @@ -156,6 +159,7 @@ int libp2p_net_connection_write(void* stream_context, struct StreamMessage* msg) } struct ConnectionContext* ctx = (struct ConnectionContext*) stream_context; libp2p_logger_debug("connectionstream", "write: About to write %d bytes to socket %d.\n", msg->data_size, ctx->socket_descriptor); + ctx->last_comm_epoch = time(NULL); return socket_write(ctx->socket_descriptor, (char*)msg->data, msg->data_size, 0); } diff --git a/peer/peer.c b/peer/peer.c index 903acdc..82052f6 100644 --- a/peer/peer.c +++ b/peer/peer.c @@ -346,6 +346,16 @@ int libp2p_peer_compare(const struct Libp2pPeer* a, const struct Libp2pPeer* b) return 0; } +struct Stream* libp2p_peer_get_connection_stream(struct Stream* incoming_stream) { + struct Stream* current_stream = incoming_stream; + while (current_stream->parent_stream != NULL) { + current_stream = current_stream->parent_stream; + } + if (current_stream != NULL && current_stream->stream_type == STREAM_TYPE_RAW) + return current_stream; + return NULL; +} + /*** * Get the last time we communicated with this peer as an epoch * @param peer the peer to examine @@ -355,8 +365,9 @@ unsigned long long libp2p_peer_last_comm(const struct Libp2pPeer* peer) { unsigned long long retVal = 0; if (peer != NULL) { if (peer->sessionContext != NULL) { - if (peer->sessionContext->insecure_stream != NULL) { - struct ConnectionContext* ctx = (struct ConnectionContext*)peer->sessionContext->insecure_stream->stream_context; + struct Stream* connectionStream = libp2p_peer_get_connection_stream(peer->sessionContext->default_stream); + if (connectionStream != NULL) { + struct ConnectionContext* ctx = (struct ConnectionContext*)connectionStream->stream_context; retVal = ctx->last_comm_epoch; } } diff --git a/yamux/session.c b/yamux/session.c index fdddcc1..b8b03c4 100644 --- a/yamux/session.c +++ b/yamux/session.c @@ -13,9 +13,15 @@ #include "libp2p/yamux/stream.h" #include "libp2p/yamux/yamux.h" #include "libp2p/utils/logger.h" +#include "libp2p/net/multistream.h" static struct yamux_config dcfg = YAMUX_DEFAULT_CONFIG; +// forward declarations +struct YamuxContext* libp2p_yamux_get_context(void* stream_context); +struct yamux_stream* yamux_stream_new(); + + /*** * Create a new yamux session * @param config the configuration @@ -155,20 +161,16 @@ ssize_t yamux_session_ping(struct yamux_session* session, uint32_t value, int po * @returns 0 on success, negative number on error */ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, struct StreamMessage** return_message) { - - // retrieve the yamux context - struct yamux_session* yamux_session = NULL; - struct YamuxContext* yamuxContext = NULL; if (context == NULL) return 0; - if ( ((char*)context)[0] == YAMUX_CONTEXT) { - yamuxContext = (struct YamuxContext*)context; - yamux_session = yamuxContext->session; - } else if ( ((char*)context)[0] == YAMUX_CHANNEL_CONTEXT) { - struct YamuxChannelContext* channelContext = (struct YamuxChannelContext*)context; - yamuxContext = channelContext->yamux_context; - yamux_session = channelContext->yamux_context->session; - } + + int frame_size = sizeof(struct yamux_frame); + + // retrieve the yamux context + struct YamuxContext* yamuxContext = libp2p_yamux_get_context(context); + struct yamux_session* yamux_session = yamuxContext->session; + struct yamux_stream* s = NULL; + // decode frame struct yamux_frame f; @@ -238,7 +240,7 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s for (size_t i = 0; i < yamux_session->cap_streams; ++i) { struct yamux_session_stream* ss = &yamux_session->streams[i]; - struct yamux_stream* s = ss->stream; + s = ss->stream; if (!ss->alive || s->state == yamux_stream_closed) // skip dead or closed streams continue; @@ -283,9 +285,8 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s return -EPROTO; } - int sz = sizeof(struct yamux_frame); libp2p_logger_debug("yamux", "Processing frame of %d bytes.\n"); - ssize_t re = yamux_stream_process(s, &f, &incoming[sz], incoming_size - sz); + ssize_t re = yamux_stream_process(s, &f, &incoming[frame_size], incoming_size - frame_size); return (re < 0) ? re : (re + incoming_size); } } @@ -320,8 +321,26 @@ int yamux_decode(void* context, const uint8_t* incoming, size_t incoming_size, s libp2p_logger_debug("yamux", "session->yamux_decode: Calling new_stream_fn.\n"); yamux_session->new_stream_fn(yamuxContext, yamuxContext->stream, msg); } - + // handle window update (if there is one) + struct yamux_session_stream ss = yamux_session->streams[f.streamid]; + ss.alive = 1; + ss.stream = yamux_stream_new(); + ss.stream->id = f.streamid; + ss.stream->session = yamux_session; + ss.stream->state = yamux_stream_syn_recv; + ss.stream->window_size = 0; + yamux_stream_process(ss.stream, &f, &incoming[frame_size], incoming_size - frame_size); channelContext->state = yamux_stream_syn_recv; + if (f.type == yamux_frame_window_update) { + // send it back + yamux_stream_window_update(channelContext, ss.stream->window_size); + } + // TODO: Start negotiations of multistream + struct Stream* multistream = libp2p_net_multistream_stream_new(yamuxChannelStream, 0); + if (multistream != NULL) { + channelContext->child_stream = multistream; + } + } else { libp2p_logger_debug("yamux", "I thought this was supposed to be a new channel, but the numbering is off. The stream number is %d, and I am a %s", f.streamid, (yamuxContext->am_server ? "server" : "client)")); } diff --git a/yamux/stream.c b/yamux/stream.c index 59d30d5..9d23074 100644 --- a/yamux/stream.c +++ b/yamux/stream.c @@ -14,6 +14,9 @@ #define MIN(x,y) (y^((x^y)&-(xstream->write(ctx->stream->stream_context, &outgoing)) + struct YamuxContext* ctx = libp2p_yamux_get_context(context); + if (!ctx->stream->parent_stream->write(ctx->stream->parent_stream->stream_context, &outgoing)) return 0; return outgoing.data_size; } @@ -348,6 +355,14 @@ void yamux_stream_free(struct yamux_stream* stream) free(stream); } +struct yamux_stream* yamux_stream_new() { + struct yamux_stream* out = (struct yamux_stream*) malloc(sizeof(struct yamux_stream)); + if (out != NULL) { + memset(out, 0, sizeof(struct yamux_stream)); + } + return out; +} + /*** * process stream * @param stream the stream @@ -362,6 +377,13 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr switch (f.type) { + case yamux_frame_window_update: + { + uint64_t nws = (uint64_t) ( (int64_t)stream->window_size + (int64_t)(int32_t)f.length ); + nws &= 0xFFFFFFFFLL; + stream->window_size = (uint32_t)nws; + } + //no break case yamux_frame_data: { if (incoming_size != (ssize_t)f.length) @@ -372,13 +394,6 @@ ssize_t yamux_stream_process(struct yamux_stream* stream, struct yamux_frame* fr return incoming_size; } - case yamux_frame_window_update: - { - uint64_t nws = (uint64_t)((int64_t)stream->window_size + (int64_t)(int32_t)f.length); - nws &= 0xFFFFFFFFLL; - stream->window_size = (uint32_t)nws; - break; - } default: return -EPROTO; }