From 2f2f4363249eb12970d0552779472d523c589566 Mon Sep 17 00:00:00 2001 From: Dmitri Tikhonov Date: Tue, 8 Sep 2020 11:43:03 -0400 Subject: [PATCH] Release 2.19.9 - [FEATURE] Add lsquic_stream_pwritev(). This function allows one to reduce the number of system calls required to read a file from disk by using lsquic_stream_pwritev() together with preadv(2). - [BUGFIX] When stream is reset, it is writeable -- let user collect the error. - [BUGFIX] Calculate correct conn flow control if reading ends early. - [BUGFIX] Remove stream from read and write queues on internal shutdown. This is a regression introduced in 2.19.7. - [BUGFIX] Swapped arguments in IETF RESET_FRAME generation. - Turn off mini conn history when compiling with Visual Studio; this allows the project to compile on Windows again. - http_client: Add -3 flag to stop reading from streams early; code cleanup. - Don't use -Werror. --- CHANGELOG | 17 + CMakeLists.txt | 1 - bin/http_client.c | 57 +++- bin/http_server.c | 134 +++++++- docs/apiref.rst | 39 +++ docs/conf.py | 2 +- include/lsquic.h | 10 +- src/liblsquic/lsquic_conn.h | 11 + src/liblsquic/lsquic_engine.c | 2 +- src/liblsquic/lsquic_full_conn.c | 67 +++- src/liblsquic/lsquic_full_conn_ietf.c | 95 +++++- src/liblsquic/lsquic_mini_conn.h | 2 +- src/liblsquic/lsquic_parse.h | 5 + src/liblsquic/lsquic_parse_Q046.c | 1 + src/liblsquic/lsquic_parse_Q050.c | 1 + src/liblsquic/lsquic_parse_gquic_be.c | 21 ++ src/liblsquic/lsquic_parse_gquic_be.h | 3 + src/liblsquic/lsquic_parse_ietf_v1.c | 35 +- src/liblsquic/lsquic_send_ctl.c | 188 ++++++++++- src/liblsquic/lsquic_send_ctl.h | 14 + src/liblsquic/lsquic_stream.c | 384 ++++++++++++++++++++-- src/liblsquic/lsquic_stream.h | 27 +- tests/test_h3_framing.c | 447 +++++++++++++++++++++++++- tests/test_send_headers.c | 7 + tests/test_stream.c | 60 +++- 25 files changed, 1545 insertions(+), 85 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index c1b8fdd..2a53fb8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,20 @@ +2020-09-08 + - 2.19.9 + - [FEATURE] Add lsquic_stream_pwritev(). This function allows one to + reduce the number of system calls required to read a file from disk + by using lsquic_stream_pwritev() together with preadv(2). + - [BUGFIX] When stream is reset, it is writeable -- let user collect + the error. + - [BUGFIX] Calculate correct conn flow control if reading ends early. + - [BUGFIX] Remove stream from read and write queues on internal + shutdown. This is a regression introduced in 2.19.7. + - [BUGFIX] Swapped arguments in IETF RESET_FRAME generation. + - Turn off mini conn history when compiling with Visual Studio; this + allows the project to compile on Windows again. + - http_client: Add -3 flag to stop reading from streams early; code + cleanup. + - Don't use -Werror. + 2020-09-02 - 2.19.8 - [FEATURE] Update the timestamp extension to latest version. diff --git a/CMakeLists.txt b/CMakeLists.txt index a19f854..bde1f51 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -54,7 +54,6 @@ ENDIF() IF(CMAKE_BUILD_TYPE STREQUAL "Debug") SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -O0 -g3") - SET(MY_CMAKE_FLAGS "${MY_CMAKE_FLAGS} -Werror") IF(CMAKE_C_COMPILER MATCHES "clang" AND NOT "$ENV{TRAVIS}" MATCHES "^true$" AND NOT "$ENV{EXTRA_CFLAGS}" MATCHES "-fsanitize") diff --git a/bin/http_client.c b/bin/http_client.c index 433f03d..e115cd9 100644 --- a/bin/http_client.c +++ b/bin/http_client.c @@ -76,6 +76,11 @@ static int g_header_bypass; static int s_discard_response; +/* If set to a non-zero value, abandon reading from stream early: read at + * most `s_abandon_early' bytes and then close the stream. + */ +static long s_abandon_early; + struct sample_stats { unsigned n; @@ -164,8 +169,6 @@ struct path_elem { }; struct http_client_ctx { - TAILQ_HEAD(, lsquic_conn_ctx) - conn_ctxs; const char *hostname; const char *method; const char *payload; @@ -294,7 +297,6 @@ http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) conn_h->ch_n_reqs = MIN(client_ctx->hcc_total_n_reqs, client_ctx->hcc_reqs_per_conn); client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs; - TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch); ++conn_h->client_ctx->hcc_n_open_conns; if (!TAILQ_EMPTY(&client_ctx->hcc_path_elems)) create_streams(client_ctx, conn_h); @@ -346,7 +348,6 @@ http_client_on_conn_closed (lsquic_conn_t *conn) if (!(conn_h->client_ctx->hcc_flags & HCC_SEEN_FIN)) abort(); } - TAILQ_REMOVE(&conn_h->client_ctx->conn_ctxs, conn_h, next_ch); --conn_h->client_ctx->hcc_n_open_conns; cacos = calloc(1, sizeof(*cacos)); @@ -474,9 +475,16 @@ struct lsquic_stream_ctx { enum { HEADERS_SENT = (1 << 0), PROCESSED_HEADERS = 1 << 1, + ABANDON = 1 << 2, /* Abandon reading from stream after sh_stop bytes + * have been read. + */ } sh_flags; lsquic_time_t sh_created; lsquic_time_t sh_ttfb; + size_t sh_stop; /* Stop after reading this many bytes if ABANDON is set */ + size_t sh_nread; /* Number of bytes read from stream using one of + * lsquic_stream_read* functions. + */ unsigned count; struct lsquic_reader reader; }; @@ -524,6 +532,11 @@ http_client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream) lsquic_stream_wantwrite(stream, 1); if (randomly_reprioritize_streams) lsquic_stream_set_priority(stream, 1 + (random() & 0xFF)); + if (s_abandon_early) + { + st_h->sh_stop = random() % (s_abandon_early + 1); + st_h->sh_flags |= ABANDON; + } return st_h; } @@ -634,6 +647,14 @@ http_client_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) static size_t discard (void *ctx, const unsigned char *buf, size_t sz, int fin) { + lsquic_stream_ctx_t *st_h = ctx; + + if (st_h->sh_flags & ABANDON) + { + if (sz > st_h->sh_stop - st_h->sh_nread) + sz = st_h->sh_stop - st_h->sh_nread; + } + return sz; } @@ -671,10 +692,14 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) st_h->sh_flags |= PROCESSED_HEADERS; } else if (nread = (s_discard_response - ? lsquic_stream_readf(stream, discard, NULL) - : lsquic_stream_read(stream, buf, sizeof(buf))), + ? lsquic_stream_readf(stream, discard, st_h) + : lsquic_stream_read(stream, buf, + st_h->sh_flags & ABANDON + ? MIN(sizeof(buf), st_h->sh_nread - st_h->sh_stop) + : sizeof(buf))), nread > 0) { + st_h->sh_nread += (size_t) nread; s_stat_downloaded_bytes += nread; /* test stream_reset after some number of read bytes */ if (client_ctx->hcc_reset_after_nbytes && @@ -713,6 +738,13 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) LSQ_DEBUG("changed stream %"PRIu64" priority from %u to %u", lsquic_stream_id(stream), old_prio, new_prio); } + if ((st_h->sh_flags & ABANDON) && st_h->sh_nread >= st_h->sh_stop) + { + LSQ_DEBUG("closing stream early having read %zd bytes", + st_h->sh_nread); + lsquic_stream_close(stream); + break; + } } else if (0 == nread) { @@ -756,11 +788,7 @@ http_client_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) LSQ_INFO("%s called", __func__); struct http_client_ctx *const client_ctx = st_h->client_ctx; lsquic_conn_t *conn = lsquic_stream_conn(stream); - lsquic_conn_ctx_t *conn_h; - TAILQ_FOREACH(conn_h, &client_ctx->conn_ctxs, next_ch) - if (conn_h->conn == conn) - break; - assert(conn_h); + lsquic_conn_ctx_t *const conn_h = lsquic_conn_get_ctx(conn); --conn_h->ch_n_reqs; --conn_h->ch_n_cc_streams; if (0 == conn_h->ch_n_reqs) @@ -834,6 +862,8 @@ usage (const char *prog) " -q FILE QIF mode: issue requests from the QIF file and validate\n" " server responses.\n" " -e TOKEN Hexadecimal string representing resume token.\n" +" -3 MAX Close stream after reading at most MAX bytes. The actual\n" +" number of bytes read is randominzed.\n" , prog); } @@ -1406,7 +1436,6 @@ main (int argc, char **argv) TAILQ_INIT(&sports); memset(&client_ctx, 0, sizeof(client_ctx)); TAILQ_INIT(&client_ctx.hcc_path_elems); - TAILQ_INIT(&client_ctx.conn_ctxs); client_ctx.method = "GET"; client_ctx.hcc_concurrency = 1; client_ctx.hcc_cc_reqs_per_conn = 1; @@ -1420,6 +1449,7 @@ main (int argc, char **argv) while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:w:H:p:0:q:e:hatT:b:d:" + "3:" /* 3 is 133+ for "e" ("e" for "early") */ #ifndef WIN32 "C:" #endif @@ -1536,6 +1566,9 @@ main (int argc, char **argv) http_client_if.on_sess_resume_info = http_client_on_sess_resume_info; client_ctx.hcc_sess_resume_file_name = optarg; break; + case '3': + s_abandon_early = strtol(optarg, NULL, 10); + break; default: if (0 != prog_set_opt(&prog, opt, optarg)) exit(1); diff --git a/bin/http_server.c b/bin/http_server.c index 7596f30..e229ddf 100644 --- a/bin/http_server.c +++ b/bin/http_server.c @@ -278,6 +278,13 @@ static const size_t IDLE_SIZE = sizeof(on_being_idle) - 1; */ static int s_immediate_write; +/* Use preadv(2) in conjuction with lsquic_stream_pwritev() to reduce + * number of system calls required to read from disk. The actual value + * specifies maximum write size. A negative value indicates always to use + * the remaining file size. + */ +static ssize_t s_pwritev; + #define MIN(a, b) ((a) < (b) ? (a) : (b)) #define V(v) (v), strlen(v) @@ -445,6 +452,7 @@ struct lsquic_stream_ctx { SH_HEADERS_READ = (1 << 2), } flags; struct lsquic_reader reader; + int file_fd; /* Used by pwritev */ /* Fields below are used by interop callbacks: */ enum interop_handler { @@ -469,6 +477,7 @@ struct lsquic_stream_ctx { } interop_u; struct event *resume_resp; size_t written; + size_t file_size; /* Used by pwritev */ }; @@ -556,18 +565,35 @@ resume_response (evutil_socket_t fd, short what, void *arg) } +static size_t +bytes_left (lsquic_stream_ctx_t *st_h) +{ + if (s_pwritev) + return st_h->file_size - st_h->written; + else + return test_reader_size(st_h->reader.lsqr_ctx); +} + + +static ssize_t +my_preadv (void *user_data, const struct iovec *iov, int iovcnt) +{ + lsquic_stream_ctx_t *const st_h = user_data; + return preadv(st_h->file_fd, iov, iovcnt, st_h->written); +} + + static void http_server_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) { if (st_h->flags & SH_HEADERS_SENT) { ssize_t nw; - if (test_reader_size(st_h->reader.lsqr_ctx) > 0) + if (bytes_left(st_h) > 0) { if (st_h->server_ctx->delay_resp_sec && !(st_h->flags & SH_DELAYED) - && st_h->written > 10000000 - && test_reader_size(st_h->reader.lsqr_ctx) > 0) + && st_h->written > 10000000) { struct timeval delay = { .tv_sec = st_h->server_ctx->delay_resp_sec, }; @@ -585,7 +611,20 @@ http_server_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) else LSQ_ERROR("cannot allocate event"); } - nw = lsquic_stream_writef(stream, &st_h->reader); + if (s_pwritev) + { + size_t to_write = bytes_left(st_h); + if (s_pwritev > 0 && (size_t) s_pwritev < to_write) + to_write = s_pwritev; + nw = lsquic_stream_pwritev(stream, my_preadv, st_h, to_write); + if (nw == 0) + goto use_reader; + } + else + { + use_reader: + nw = lsquic_stream_writef(stream, &st_h->reader); + } if (nw < 0) { struct lsquic_conn *conn = lsquic_stream_conn(stream); @@ -601,7 +640,7 @@ http_server_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) exit(1); } } - if (test_reader_size(st_h->reader.lsqr_ctx) > 0) + if (bytes_left(st_h) > 0) { st_h->written += (size_t) nw; lsquic_stream_wantwrite(stream, 1); @@ -702,11 +741,32 @@ parse_request (struct lsquic_stream *stream, lsquic_stream_ctx_t *st_h) static void process_request (struct lsquic_stream *stream, lsquic_stream_ctx_t *st_h) { - st_h->reader.lsqr_read = test_reader_read; - st_h->reader.lsqr_size = test_reader_size; - st_h->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->req_path); - if (!st_h->reader.lsqr_ctx) - exit(1); + struct stat st; + + if (s_pwritev) + { + st_h->file_fd = open(st_h->req_path, O_RDONLY); + if (st_h->file_fd < 0) + { + LSQ_ERROR("cannot open %s for reading: %s", st_h->req_path, + strerror(errno)); + exit(1); + } + if (fstat(st_h->file_fd, &st) < 0) + { + LSQ_ERROR("fstat: %s", strerror(errno)); + exit(1); + } + st_h->file_size = st.st_size; + } + else + { + st_h->reader.lsqr_read = test_reader_read; + st_h->reader.lsqr_size = test_reader_size; + st_h->reader.lsqr_ctx = create_lsquic_reader_ctx(st_h->req_path); + if (!st_h->reader.lsqr_ctx) + exit(1); + } if (s_immediate_write) { @@ -1369,6 +1429,24 @@ new_req (enum method method, const char *path, const char *authority) } +static ssize_t +my_interop_preadv (void *user_data, const struct iovec *iov, int iovcnt) +{ + struct gen_file_ctx *const gfc = user_data; + size_t nread, nr; + int i; + + nread = 0; + for (i = 0; i < iovcnt; ++i) + { + nr = idle_read(gfc, iov[i].iov_base, iov[i].iov_len); + nread += nr; + } + + return (ssize_t) nread; +} + + static void idle_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) { @@ -1379,16 +1457,25 @@ idle_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) struct req *req; ssize_t nw; struct header_buf hbuf; + struct lsquic_reader reader; if (st_h->flags & SH_HEADERS_SENT) { - struct lsquic_reader reader = + if (s_pwritev) { - .lsqr_read = idle_read, - .lsqr_size = idle_size, - .lsqr_ctx = gfc, - }; - nw = lsquic_stream_writef(stream, &reader); + nw = lsquic_stream_pwritev(stream, my_interop_preadv, gfc, + gfc->remain); + if (nw == 0) + goto with_reader; + } + else + { + with_reader: + reader.lsqr_read = idle_read, + reader.lsqr_size = idle_size, + reader.lsqr_ctx = gfc, + nw = lsquic_stream_writef(stream, &reader); + } if (nw < 0) { LSQ_ERROR("error writing idle thoughts: %s", strerror(errno)); @@ -1507,6 +1594,10 @@ usage (const char *prog) " -p FILE Push request with this path\n" " -w SIZE Write immediately (LSWS mode). Argument specifies maximum\n" " size of the immediate write.\n" +" -P SIZE Use preadv(2) to read from disk and lsquic_stream_pwritev() to\n" +" write to stream. Positive SIZE indicate maximum value per\n" +" write; negative means always use remaining file size.\n" +" Incompatible with -w.\n" " -y DELAY Delay response for this many seconds -- use for debugging\n" , prog); } @@ -1680,7 +1771,7 @@ main (int argc, char **argv) prog_init(&prog, LSENG_SERVER|LSENG_HTTP, &server_ctx.sports, &http_server_if, &server_ctx); - while (-1 != (opt = getopt(argc, argv, PROG_OPTS "y:Y:n:p:r:w:h"))) + while (-1 != (opt = getopt(argc, argv, PROG_OPTS "y:Y:n:p:r:w:P:h"))) { switch (opt) { case 'n': @@ -1707,6 +1798,9 @@ main (int argc, char **argv) case 'w': s_immediate_write = atoi(optarg); break; + case 'P': + s_pwritev = strtoull(optarg, NULL, 10); + break; case 'y': server_ctx.delay_resp_sec = atoi(optarg); break; @@ -1734,6 +1828,12 @@ main (int argc, char **argv) #endif } + if (s_immediate_write && s_pwritev) + { + LSQ_ERROR("-w and -P are incompatible options"); + exit(EXIT_FAILURE); + } + if (0 != prog_prep(&prog)) { LSQ_ERROR("could not prep"); diff --git a/docs/apiref.rst b/docs/apiref.rst index acc4d33..2e3ec69 100644 --- a/docs/apiref.rst +++ b/docs/apiref.rst @@ -1457,6 +1457,45 @@ Writing To Streams the write functions -- :func:`lsquic_stream_write()` and :func:`lsquic_stream_writev()` utilize the same mechanism. +.. function:: ssize_t lsquic_stream_pwritev (struct lsquic_stream *stream, ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt), void *user_data, size_t n_to_write) + + :param stream: Stream to write to. + :param preadv: Pointer to a custom ``preadv(2)``-like function. + :param user_data: Data to pass to ``preadv`` function. + :param n_to_write: Number of bytes to write. + :return: Number of bytes written or -1 on error. + + Write to stream using user-supplied ``preadv()`` function. + The stream allocates one or more packets and calls ``preadv()``, + which then fills the array of buffers. This is a good way to + minimize the number of ``read(2)`` system calls; the user can call + ``preadv(2)`` instead. + + The number of bytes available in the ``iov`` vector passed back to + the user callback may be smaller than ``n_to_write``. The expected + use pattern is to pass the number of bytes remaining in the file + and keep on calling ``preadv(2)``. + + Note that, unlike other stream-writing functions above, + ``lsquic_stream_pwritev()`` does *not* buffer bytes inside the + stream; it only writes to packets. That means the caller must be + prepared for this function to return 0 even inside the "on write" + stream callback. In that case, the caller should fall back to using + another write function. + + It is OK for the ``preadv`` callback to write fewer bytes that + ``n_to_write``. (This can happen if the underlying data source + is truncated.) + +:: + + /* + * For example, the return value of zero can be handled as follows: + */ + nw = lsquic_stream_pwritev(stream, my_readv, some_ctx, n_to_write); + if (nw == 0) + nw = lsquic_stream_write(stream, rem_bytes_buf, rem_bytes_len); + .. function:: int lsquic_stream_flush (lsquic_stream_t *stream) :param stream: Stream to flush. diff --git a/docs/conf.py b/docs/conf.py index f3df41f..e80a88f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -26,7 +26,7 @@ author = u'LiteSpeed Technologies' # The short X.Y version version = u'2.19' # The full version, including alpha/beta/rc tags -release = u'2.19.8' +release = u'2.19.9' # -- General configuration --------------------------------------------------- diff --git a/include/lsquic.h b/include/lsquic.h index a7f7a8a..30c4493 100644 --- a/include/lsquic.h +++ b/include/lsquic.h @@ -25,7 +25,7 @@ extern "C" { #define LSQUIC_MAJOR_VERSION 2 #define LSQUIC_MINOR_VERSION 19 -#define LSQUIC_PATCH_VERSION 8 +#define LSQUIC_PATCH_VERSION 9 /** * Engine flags: @@ -1392,6 +1392,14 @@ lsquic_stream_write (lsquic_stream_t *s, const void *buf, size_t len); ssize_t lsquic_stream_writev (lsquic_stream_t *s, const struct iovec *vec, int count); +/** + * Write to streams using a single call to a preadv-like function. + */ +ssize_t +lsquic_stream_pwritev (lsquic_stream_t *s, + ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt), + void *user_data, size_t n_to_write); + /** * Used as argument to @ref lsquic_stream_writef() */ diff --git a/src/liblsquic/lsquic_conn.h b/src/liblsquic/lsquic_conn.h index 3bc0c1d..b4d3233 100644 --- a/src/liblsquic/lsquic_conn.h +++ b/src/liblsquic/lsquic_conn.h @@ -86,6 +86,11 @@ struct network_path #define NP_PEER_SA(path_) ((struct sockaddr *) (path_)->np_peer_addr) #define NP_IS_IPv6(path_) (AF_INET6 == NP_LOCAL_SA(path_)->sa_family) +struct ack_state +{ + uint32_t arr[6]; +}; + struct conn_iface { enum tick_st @@ -259,6 +264,12 @@ struct conn_iface /* Optional method. It is called when RTO occurs. */ void (*ci_retx_timeout) (struct lsquic_conn *); + + void + (*ci_ack_snapshot) (struct lsquic_conn *, struct ack_state *); + + void + (*ci_ack_rollback) (struct lsquic_conn *, struct ack_state *); }; #define LSCONN_CCE_BITS 3 diff --git a/src/liblsquic/lsquic_engine.c b/src/liblsquic/lsquic_engine.c index 5820c58..be04758 100644 --- a/src/liblsquic/lsquic_engine.c +++ b/src/liblsquic/lsquic_engine.c @@ -58,12 +58,12 @@ #include "lsquic_bw_sampler.h" #include "lsquic_minmax.h" #include "lsquic_bbr.h" -#include "lsquic_send_ctl.h" #include "lsquic_set.h" #include "lsquic_conn_flow.h" #include "lsquic_sfcw.h" #include "lsquic_hash.h" #include "lsquic_conn.h" +#include "lsquic_send_ctl.h" #include "lsquic_full_conn.h" #include "lsquic_util.h" #include "lsquic_qtags.h" diff --git a/src/liblsquic/lsquic_full_conn.c b/src/liblsquic/lsquic_full_conn.c index 4f16053..14bd1c3 100644 --- a/src/liblsquic/lsquic_full_conn.c +++ b/src/liblsquic/lsquic_full_conn.c @@ -42,7 +42,6 @@ #include "lsquic_bw_sampler.h" #include "lsquic_minmax.h" #include "lsquic_bbr.h" -#include "lsquic_send_ctl.h" #include "lsquic_set.h" #include "lsquic_malo.h" #include "lsquic_chsk_stream.h" @@ -66,6 +65,7 @@ #include "lsquic_attq.h" #include "lsquic_conn.h" +#include "lsquic_send_ctl.h" #include "lsquic_conn_public.h" #include "lsquic_ver_neg.h" #include "lsquic_mini_conn.h" @@ -1492,6 +1492,18 @@ process_padding_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, } +static void +log_conn_flow_control (struct full_conn *conn) +{ + LSQ_DEBUG("connection flow cap: wrote: %"PRIu64 + "; max: %"PRIu64, conn->fc_pub.conn_cap.cc_sent, + conn->fc_pub.conn_cap.cc_max); + LSQ_DEBUG("connection flow control window: read: %"PRIu64 + "; max: %"PRIu64, conn->fc_pub.cfcw.cf_max_recv_off, + conn->fc_pub.cfcw.cf_recv_off); +} + + static unsigned process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, const unsigned char *p, size_t len) @@ -1500,6 +1512,8 @@ process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in, */ EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID); LSQ_DEBUG("received PING"); + if (conn->fc_flags & FC_SERVER) + log_conn_flow_control(conn); return 1; } @@ -2763,6 +2777,8 @@ generate_ping_frame (struct full_conn *conn) lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, sz); packet_out->po_frame_types |= 1 << QUIC_FRAME_PING; LSQ_DEBUG("wrote PING frame"); + if (!(conn->fc_flags & FC_SERVER)) + log_conn_flow_control(conn); } @@ -3271,6 +3287,53 @@ full_conn_ci_can_write_ack (struct lsquic_conn *lconn) } +struct full_ack_state +{ + enum full_conn_flags conn_flags; + enum alarm_id_bit armed_set; + unsigned n_slack_akbl; + unsigned n_stop_waiting; +}; + + +typedef char ack_state_size[sizeof(struct full_ack_state) + <= sizeof(struct ack_state) ? 1 : - 1]; + +static void +full_conn_ci_ack_snapshot (struct lsquic_conn *lconn, struct ack_state *opaque) +{ + struct full_conn *conn = (struct full_conn *) lconn; + struct full_ack_state *const ack_state = (struct full_ack_state *) opaque; + + ack_state->conn_flags = conn->fc_flags; + ack_state->armed_set = conn->fc_alset.as_armed_set; + ack_state->n_slack_akbl = conn->fc_n_slack_akbl; + ack_state->n_stop_waiting + = lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl); + LSQ_DEBUG("take ACK snapshot"); +} + + +static void +full_conn_ci_ack_rollback (struct lsquic_conn *lconn, struct ack_state *opaque) +{ + struct full_ack_state *const ack_state = (struct full_ack_state *) opaque; + struct full_conn *conn = (struct full_conn *) lconn; + + conn->fc_flags &= ~(FC_ACK_HAD_MISS|FC_ACK_QUEUED); + conn->fc_flags |= (FC_ACK_HAD_MISS|FC_ACK_QUEUED) + & ack_state->conn_flags; + + conn->fc_alset.as_armed_set &= ~ALBIT_ACK_APP; + conn->fc_alset.as_armed_set |= ALBIT_ACK_APP & ack_state->armed_set; + + conn->fc_n_slack_akbl = ack_state->n_slack_akbl; + conn->fc_send_ctl.sc_n_stop_waiting = ack_state->n_stop_waiting; + + LSQ_DEBUG("roll back ACK state"); +} + + /* This should be called before lsquic_alarmset_ring_expired() */ static void maybe_set_noprogress_alarm (struct full_conn *conn, lsquic_time_t now) @@ -4404,6 +4467,8 @@ static const struct headers_stream_callbacks *headers_callbacks_ptr = &headers_c static const struct conn_iface full_conn_iface = { .ci_abort = full_conn_ci_abort, .ci_abort_error = full_conn_ci_abort_error, + .ci_ack_rollback = full_conn_ci_ack_rollback, + .ci_ack_snapshot = full_conn_ci_ack_snapshot, .ci_can_write_ack = full_conn_ci_can_write_ack, .ci_cancel_pending_streams = full_conn_ci_cancel_pending_streams, diff --git a/src/liblsquic/lsquic_full_conn_ietf.c b/src/liblsquic/lsquic_full_conn_ietf.c index 4655738..b3974a3 100644 --- a/src/liblsquic/lsquic_full_conn_ietf.c +++ b/src/liblsquic/lsquic_full_conn_ietf.c @@ -1654,6 +1654,62 @@ generate_timestamp_frame (struct ietf_full_conn *conn, } +struct ietf_ack_state +{ + enum ifull_conn_flags conn_flags; + enum send_flags send_flags; + enum alarm_id_bit armed_set; + unsigned n_slack_akbl; + unsigned n_slack_all; + unsigned char unretx_thresh; +}; + + +typedef char ack_state_size[sizeof(struct ietf_ack_state) + <= sizeof(struct ack_state) ? 1 : - 1]; + +static void +ietf_full_conn_ci_ack_snapshot (struct lsquic_conn *lconn, + struct ack_state *opaque) +{ + struct ietf_full_conn *conn = (struct ietf_full_conn *) lconn; + struct ietf_ack_state *const ack_state = (struct ietf_ack_state *) opaque; + + ack_state->conn_flags = conn->ifc_flags; + ack_state->send_flags = conn->ifc_send_flags; + ack_state->armed_set = conn->ifc_alset.as_armed_set; + ack_state->n_slack_akbl = conn->ifc_n_slack_akbl[PNS_APP]; + ack_state->n_slack_all = conn->ifc_n_slack_all; + ack_state->unretx_thresh= conn->ifc_ping_unretx_thresh; + LSQ_DEBUG("take ACK snapshot"); +} + + +static void +ietf_full_conn_ci_ack_rollback (struct lsquic_conn *lconn, + struct ack_state *opaque) +{ + struct ietf_ack_state *const ack_state = (struct ietf_ack_state *) opaque; + struct ietf_full_conn *conn = (struct ietf_full_conn *) lconn; + + conn->ifc_flags &= ~(IFC_ACK_HAD_MISS|IFC_ACK_QUED_APP); + conn->ifc_flags |= (IFC_ACK_HAD_MISS|IFC_ACK_QUED_APP) + & ack_state->conn_flags; + + conn->ifc_send_flags &= ~SF_SEND_PING; + conn->ifc_send_flags |= SF_SEND_PING & ack_state->send_flags; + + conn->ifc_alset.as_armed_set &= ~ALBIT_ACK_APP; + conn->ifc_alset.as_armed_set |= ALBIT_ACK_APP & ack_state->armed_set; + + conn->ifc_n_slack_akbl[PNS_APP] = ack_state->n_slack_akbl; + conn->ifc_n_slack_all = ack_state->n_slack_all; + conn->ifc_ping_unretx_thresh = ack_state->unretx_thresh; + + LSQ_DEBUG("roll back ACK state"); +} + + static int generate_ack_frame_for_pns (struct ietf_full_conn *conn, struct lsquic_packet_out *packet_out, enum packnum_space pns, @@ -2262,7 +2318,7 @@ generate_stream_blocked_frame (struct ietf_full_conn *conn, static int -generate_stop_sending_frame (struct ietf_full_conn *conn, +generate_stop_sending_frame_by_id (struct ietf_full_conn *conn, lsquic_stream_id_t stream_id, enum http_error_code error_code) { struct lsquic_packet_out *packet_out; @@ -2301,6 +2357,21 @@ generate_stop_sending_frame (struct ietf_full_conn *conn, } +/* Return true if generated, false otherwise */ +static int +generate_stop_sending_frame (struct ietf_full_conn *conn, + struct lsquic_stream *stream) +{ + if (0 == generate_stop_sending_frame_by_id(conn, stream->id, HEC_NO_ERROR)) + { + lsquic_stream_ss_frame_sent(stream); + return 1; + } + else + return 0; +} + + static void generate_stop_sending_frames (struct ietf_full_conn *conn, lsquic_time_t now) { @@ -2311,7 +2382,7 @@ generate_stop_sending_frames (struct ietf_full_conn *conn, lsquic_time_t now) while (!STAILQ_EMPTY(&conn->ifc_stream_ids_to_ss)) { sits = STAILQ_FIRST(&conn->ifc_stream_ids_to_ss); - if (0 == generate_stop_sending_frame(conn, sits->sits_stream_id, + if (0 == generate_stop_sending_frame_by_id(conn, sits->sits_stream_id, sits->sits_error_code)) { STAILQ_REMOVE_HEAD(&conn->ifc_stream_ids_to_ss, sits_next); @@ -2614,6 +2685,8 @@ process_stream_ready_to_send (struct ietf_full_conn *conn, r &= generate_stream_blocked_frame(conn, stream); if (stream->sm_qflags & SMQF_SEND_RST) r &= generate_rst_stream_frame(conn, stream); + if (stream->sm_qflags & SMQF_SEND_STOP_SENDING) + r &= generate_stop_sending_frame(conn, stream); return r; } @@ -3994,6 +4067,18 @@ generate_connection_close_packet (struct ietf_full_conn *conn) } +static void +log_conn_flow_control (struct ietf_full_conn *conn) +{ + LSQ_DEBUG("connection flow cap: wrote: %"PRIu64 + "; max: %"PRIu64, conn->ifc_pub.conn_cap.cc_sent, + conn->ifc_pub.conn_cap.cc_max); + LSQ_DEBUG("connection flow control window: read: %"PRIu64 + "; max: %"PRIu64, conn->ifc_pub.cfcw.cf_max_recv_off, + conn->ifc_pub.cfcw.cf_recv_off); +} + + static void generate_ping_frame (struct ietf_full_conn *conn, lsquic_time_t unused) { @@ -4023,6 +4108,8 @@ generate_ping_frame (struct ietf_full_conn *conn, lsquic_time_t unused) packet_out->po_frame_types |= 1 << QUIC_FRAME_PING; LSQ_DEBUG("wrote PING frame"); conn->ifc_send_flags &= ~SF_SEND_PING; + if (!(conn->ifc_flags & IFC_SERVER)) + log_conn_flow_control(conn); } @@ -5268,6 +5355,8 @@ process_ping_frame (struct ietf_full_conn *conn, */ EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID); LSQ_DEBUG("received PING"); + if (conn->ifc_flags & IFC_SERVER) + log_conn_flow_control(conn); return 1; } @@ -7802,6 +7891,8 @@ ietf_full_conn_ci_count_garbage (struct lsquic_conn *lconn, size_t garbage_sz) #define IETF_FULL_CONN_FUNCS \ .ci_abort = ietf_full_conn_ci_abort, \ .ci_abort_error = ietf_full_conn_ci_abort_error, \ + .ci_ack_snapshot = ietf_full_conn_ci_ack_snapshot, \ + .ci_ack_rollback = ietf_full_conn_ci_ack_rollback, \ .ci_retire_cid = ietf_full_conn_ci_retire_cid, \ .ci_can_write_ack = ietf_full_conn_ci_can_write_ack, \ .ci_cancel_pending_streams = ietf_full_conn_ci_cancel_pending_streams, \ diff --git a/src/liblsquic/lsquic_mini_conn.h b/src/liblsquic/lsquic_mini_conn.h index 84298b8..43c759d 100644 --- a/src/liblsquic/lsquic_mini_conn.h +++ b/src/liblsquic/lsquic_mini_conn.h @@ -20,7 +20,7 @@ struct lsquic_packet_out; struct lsquic_engine_public; #ifndef LSQUIC_KEEP_MINICONN_HISTORY -# ifndef NDEBUG +# if !defined(NDEBUG) && !defined(_MSC_VER) # define LSQUIC_KEEP_MINICONN_HISTORY 1 # else # define LSQUIC_KEEP_MINICONN_HISTORY 0 diff --git a/src/liblsquic/lsquic_parse.h b/src/liblsquic/lsquic_parse.h index a81f279..8f4def7 100644 --- a/src/liblsquic/lsquic_parse.h +++ b/src/liblsquic/lsquic_parse.h @@ -105,6 +105,11 @@ struct parse_funcs int (*pf_parse_crypto_frame) (const unsigned char *buf, size_t rem_packet_sz, struct stream_frame *); + /* Return true if STREAM frame extends to the end of the packet and thus + * does not contain a Length field (no update). + */ + int + (*pf_dec_stream_frame_size) (unsigned char *buf, size_t new_size); int (*pf_parse_ack_frame) (const unsigned char *buf, size_t buf_len, struct ack_info *ack_info, uint8_t exp); diff --git a/src/liblsquic/lsquic_parse_Q046.c b/src/liblsquic/lsquic_parse_Q046.c index 420c714..1d6be90 100644 --- a/src/liblsquic/lsquic_parse_Q046.c +++ b/src/liblsquic/lsquic_parse_Q046.c @@ -318,6 +318,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q046 = .pf_gen_stream_frame = lsquic_gquic_be_gen_stream_frame, .pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic, .pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame, + .pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size, .pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame, .pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame, .pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame, diff --git a/src/liblsquic/lsquic_parse_Q050.c b/src/liblsquic/lsquic_parse_Q050.c index c2ca0a1..a285502 100644 --- a/src/liblsquic/lsquic_parse_Q050.c +++ b/src/liblsquic/lsquic_parse_Q050.c @@ -848,6 +848,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q050 = .pf_gen_stream_frame = lsquic_gquic_be_gen_stream_frame, .pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic, .pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame, + .pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size, .pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame, .pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame, .pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame, diff --git a/src/liblsquic/lsquic_parse_gquic_be.c b/src/liblsquic/lsquic_parse_gquic_be.c index c629c2a..c8a8689 100644 --- a/src/liblsquic/lsquic_parse_gquic_be.c +++ b/src/liblsquic/lsquic_parse_gquic_be.c @@ -293,6 +293,26 @@ lsquic_gquic_be_gen_stream_frame (unsigned char *buf, size_t buf_len, return p - buf; } +int +lsquic_gquic_be_dec_stream_frame_size (unsigned char *buf, size_t new_size) +{ + /* 1fdoooss */ + const unsigned char type = buf[0]; + + if (!(type & 0x20)) + return 1; + + const unsigned offset_len = ((type >> 2) & 7) + 1 - !((type >> 2) & 7); + const unsigned stream_id_len = 1 + (type & 3); + + uint16_t len = new_size; +#if __BYTE_ORDER == __LITTLE_ENDIAN + len = bswap_16(len); +#endif + memcpy(buf + 1 + offset_len + stream_id_len, &len, 2); + return 0; +} + /* return parsed (used) buffer length */ int @@ -1032,6 +1052,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q043 = .pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic, .pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame, .pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame, + .pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size, .pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame, .pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame, .pf_parse_stop_waiting_frame = lsquic_gquic_be_parse_stop_waiting_frame, diff --git a/src/liblsquic/lsquic_parse_gquic_be.h b/src/liblsquic/lsquic_parse_gquic_be.h index 12df58e..30d27dc 100644 --- a/src/liblsquic/lsquic_parse_gquic_be.h +++ b/src/liblsquic/lsquic_parse_gquic_be.h @@ -111,4 +111,7 @@ lsquic_gquic_be_gen_ack_frame (unsigned char *outbuf, size_t outbuf_sz, void *rechist, lsquic_time_t now, int *has_missing, lsquic_packno_t *, const uint64_t *); +int +lsquic_gquic_be_dec_stream_frame_size (unsigned char *buf, size_t new_size); + #endif diff --git a/src/liblsquic/lsquic_parse_ietf_v1.c b/src/liblsquic/lsquic_parse_ietf_v1.c index ef99cc9..90ae599 100644 --- a/src/liblsquic/lsquic_parse_ietf_v1.c +++ b/src/liblsquic/lsquic_parse_ietf_v1.c @@ -472,6 +472,38 @@ ietf_v1_gen_crypto_frame (unsigned char *buf, size_t buf_len, } +static int +ietf_v1_dec_stream_frame_size (unsigned char *p, size_t new_size) +{ + /* 0b00001XXX + * 0x4 OFF + * 0x2 LEN + * 0x1 FIN + */ + unsigned bits; + + const char type = *p++; + if (!(type & 0x2)) + return 1; + + /* Stream ID */ + bits = *p >> 6; + p += 1 << bits; + + if (type & 0x4) + { + /* Offset */ + bits = *p >> 6; + p += 1 << bits; + } + + /* Write new size */ + bits = *p >> 6; + vint_write(p, new_size, bits, 1 << bits); + return 0; +} + + /* return parsed (used) buffer length */ static int ietf_v1_parse_stream_frame (const unsigned char *buf, size_t rem_packet_sz, @@ -697,7 +729,7 @@ ietf_v1_rst_frame_size (lsquic_stream_id_t stream_id, uint64_t error_code, static int ietf_v1_gen_rst_frame (unsigned char *buf, size_t buf_len, - lsquic_stream_id_t stream_id, uint64_t error_code, uint64_t final_size) + lsquic_stream_id_t stream_id, uint64_t final_size, uint64_t error_code) { unsigned vbits; unsigned char *p; @@ -2122,6 +2154,7 @@ const struct parse_funcs lsquic_parse_funcs_ietf_v1 = .pf_gen_stream_frame = ietf_v1_gen_stream_frame, .pf_calc_stream_frame_header_sz = ietf_v1_calc_stream_frame_header_sz, .pf_parse_stream_frame = ietf_v1_parse_stream_frame, + .pf_dec_stream_frame_size = ietf_v1_dec_stream_frame_size, .pf_parse_ack_frame = ietf_v1_parse_ack_frame, .pf_gen_ack_frame = ietf_v1_gen_ack_frame, .pf_gen_blocked_frame = ietf_v1_gen_blocked_frame, diff --git a/src/liblsquic/lsquic_send_ctl.c b/src/liblsquic/lsquic_send_ctl.c index 2d55eb4..7276dae 100644 --- a/src/liblsquic/lsquic_send_ctl.c +++ b/src/liblsquic/lsquic_send_ctl.c @@ -30,7 +30,6 @@ #include "lsquic_bw_sampler.h" #include "lsquic_minmax.h" #include "lsquic_bbr.h" -#include "lsquic_send_ctl.h" #include "lsquic_util.h" #include "lsquic_sfcw.h" #include "lsquic_varint.h" @@ -40,6 +39,7 @@ #include "lsquic_ver_neg.h" #include "lsquic_ev_log.h" #include "lsquic_conn.h" +#include "lsquic_send_ctl.h" #include "lsquic_conn_flow.h" #include "lsquic_conn_public.h" #include "lsquic_cong_ctl.h" @@ -482,6 +482,9 @@ set_retx_alarm (struct lsquic_send_ctl *ctl, enum packnum_space pns, #define SC_PACK_SIZE(ctl_) (+(ctl_)->sc_conn_pub->path->np_pack_size) +/* XXX can we optimize this by caching the value of this function? It should + * not change within one tick. + */ static lsquic_time_t send_ctl_transfer_time (void *ctx) { @@ -3522,3 +3525,186 @@ lsquic_send_ctl_disable_ecn (struct lsquic_send_ctl *ctl) TAILQ_FOREACH(packet_out, &ctl->sc_scheduled_packets, po_next) lsquic_packet_out_set_ecn(packet_out, ECN_NOT_ECT); } + + +void +lsquic_send_ctl_snapshot (struct lsquic_send_ctl *ctl, + struct send_ctl_state *ctl_state) +{ + struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn; + int buffered, repace; + + buffered = !lsquic_send_ctl_schedule_stream_packets_immediately(ctl); + repace = !buffered && (ctl->sc_flags & SC_PACE); + + if (repace) + ctl_state->pacer = ctl->sc_pacer; + + if (buffered) + { + lconn->cn_if->ci_ack_snapshot(lconn, &ctl_state->ack_state); + ctl_state->buf_counts[BPT_OTHER_PRIO] + = ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_count; + ctl_state->buf_counts[BPT_HIGHEST_PRIO] + = ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count; + } +} + + +static void +send_ctl_repace (struct lsquic_send_ctl *ctl, const struct pacer *pacer, + unsigned count) +{ + unsigned n; + int in_rec; + + LSQ_DEBUG("repace, count: %u", count); + ctl->sc_pacer = *pacer; + + in_rec = send_ctl_in_recovery(ctl); + for (n = 0; n < count; ++n) + lsquic_pacer_packet_scheduled(&ctl->sc_pacer, + ctl->sc_n_in_flight_retx + ctl->sc_n_scheduled + n, in_rec, + send_ctl_transfer_time, ctl); +} + + +void +lsquic_send_ctl_rollback (struct lsquic_send_ctl *ctl, + struct send_ctl_state *ctl_state, const struct iovec *last_iov, + size_t shortfall) +{ + struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn; + struct lsquic_packet_out *packet_out, *next; + struct lsquic_packets_tailq *packets; + struct stream_frame stream_frame; + struct packet_out_frec_iter pofi; + enum buf_packet_type packet_type; + unsigned orig_count, new_count; + enum quic_ft_bit lost_types; + int buffered, repace, len, to_end; + unsigned short prev_frec_len; + struct frame_rec *frec; + + buffered = !lsquic_send_ctl_schedule_stream_packets_immediately(ctl); + repace = !buffered && (ctl->sc_flags & SC_PACE); + + if (!buffered) + { + orig_count = ctl->sc_n_scheduled; + packets = &ctl->sc_scheduled_packets; + packet_type = 0; /* Not necessary, but compiler complains */ + } + else if (ctl_state->buf_counts[BPT_HIGHEST_PRIO] + < ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count) + { + packets = &ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_packets; + orig_count = ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count; + packet_type = BPT_HIGHEST_PRIO; + } + else + { + packets = &ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets; + orig_count = ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_count; + packet_type = BPT_OTHER_PRIO; + } + + /* Now find last packet: */ + TAILQ_FOREACH(packet_out, packets, po_next) + if ((unsigned char *) last_iov->iov_base >= packet_out->po_data + && (unsigned char *) last_iov->iov_base + < packet_out->po_data + packet_out->po_data_sz) + break; + + if (!packet_out) + { + lconn->cn_if->ci_internal_error(lconn, + "rollback failed: cannot find packet"); + return; + } + + for (frec = lsquic_pofi_first(&pofi, packet_out); frec; + frec = lsquic_pofi_next(&pofi)) + if (frec->fe_frame_type == QUIC_FRAME_STREAM + /* At the time of this writing, pwritev() generates a single STREAM + * frame per packet. To keep code future-proof, we use an extra + * check. + */ + && (unsigned char *) last_iov->iov_base + > packet_out->po_data + frec->fe_off + && (unsigned char *) last_iov->iov_base + < packet_out->po_data + frec->fe_off + frec->fe_len) + break; + + if (!frec) + { + lconn->cn_if->ci_internal_error(lconn, + "rollback failed: cannot find frame record"); + return; + } + + /* Strictly less because of the STREAM frame header */ + assert(last_iov->iov_len < frec->fe_len); + + len = lconn->cn_pf->pf_parse_stream_frame( + packet_out->po_data + frec->fe_off, frec->fe_len, &stream_frame); + if (len < 0) + { + lconn->cn_if->ci_internal_error(lconn, + "error parsing own STREAM frame"); + return; + } + + if (stream_frame.data_frame.df_size > last_iov->iov_len - shortfall) + { + packet_out->po_data_sz = (unsigned char *) last_iov->iov_base + + last_iov->iov_len - shortfall - packet_out->po_data; + prev_frec_len = frec->fe_len; + frec->fe_len = packet_out->po_data_sz - frec->fe_off; + to_end = lconn->cn_pf->pf_dec_stream_frame_size( + packet_out->po_data + frec->fe_off, + stream_frame.data_frame.df_size - (prev_frec_len - frec->fe_len)); + if (to_end) + { /* A frame that's too short may be generated when pwritev runs out + * of iovecs. In that case, we adjust it here. + */ + if (!(packet_out->po_flags & PO_STREAM_END)) + LSQ_DEBUG("set stream-end flag on truncated packet"); + packet_out->po_flags |= PO_STREAM_END; + } + if (!buffered) + ctl->sc_bytes_scheduled -= prev_frec_len - frec->fe_len; + } + else + assert(stream_frame.data_frame.df_size + == last_iov->iov_len - shortfall); + + /* Drop any frames that follow */ + for (frec = lsquic_pofi_next(&pofi); frec; frec = lsquic_pofi_next(&pofi)) + frec->fe_frame_type = 0; + + /* Return unused packets */ + new_count = orig_count; + lost_types = 0; + for (packet_out = TAILQ_NEXT(packet_out, po_next); packet_out != NULL; + packet_out = next) + { + next = TAILQ_NEXT(packet_out, po_next); + --new_count; + lost_types |= packet_out->po_frame_types; + /* Undo lsquic_send_ctl_get_packet_for_stream() */ + if (!buffered) + send_ctl_sched_remove(ctl, packet_out); + else + { + TAILQ_REMOVE(packets, packet_out, po_next); + --ctl->sc_buffered_packets[packet_type].bpq_count; + } + send_ctl_destroy_packet(ctl, packet_out); + } + + if (new_count < orig_count && repace) + send_ctl_repace(ctl, &ctl_state->pacer, new_count); + if (buffered && (lost_types & QUIC_FTBIT_ACK)) + lconn->cn_if->ci_ack_rollback(lconn, &ctl_state->ack_state); +} diff --git a/src/liblsquic/lsquic_send_ctl.h b/src/liblsquic/lsquic_send_ctl.h index 82b2b0a..b19c554 100644 --- a/src/liblsquic/lsquic_send_ctl.h +++ b/src/liblsquic/lsquic_send_ctl.h @@ -412,4 +412,18 @@ lsquic_send_ctl_can_send_probe (const struct lsquic_send_ctl *, void lsquic_send_ctl_disable_ecn (struct lsquic_send_ctl *); +struct send_ctl_state +{ + struct pacer pacer; + struct ack_state ack_state; + unsigned buf_counts[BPT_OTHER_PRIO + 1]; +}; + +void +lsquic_send_ctl_snapshot (struct lsquic_send_ctl *, struct send_ctl_state *); + +void +lsquic_send_ctl_rollback (struct lsquic_send_ctl *, struct send_ctl_state *, + const struct iovec *, size_t); + #endif diff --git a/src/liblsquic/lsquic_stream.c b/src/liblsquic/lsquic_stream.c index 6fa794a..c67095a 100644 --- a/src/liblsquic/lsquic_stream.c +++ b/src/liblsquic/lsquic_stream.c @@ -93,8 +93,15 @@ stream_wantread (lsquic_stream_t *stream, int is_want); static int stream_wantwrite (lsquic_stream_t *stream, int is_want); +enum stream_write_options +{ + SWO_BUFFER = 1 << 0, /* Allow buffering in sm_buf */ +}; + + static ssize_t -stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t); +stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t, + enum stream_write_options); static ssize_t save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len); @@ -181,6 +188,7 @@ enum stream_history_event SHE_EMPTY = '\0', /* Special entry. No init besides memset required */ SHE_PLUS = '+', /* Special entry: previous event occured more than once */ SHE_REACH_FIN = 'a', + SHE_EARLY_READ_STOP = 'A', SHE_BLOCKED_OUT = 'b', SHE_CREATED = 'C', SHE_FRAME_IN = 'd', @@ -197,13 +205,15 @@ enum stream_history_event SHE_ONCLOSE_CALL = 'L', SHE_ONNEW = 'N', SHE_SET_PRIO = 'p', + SHE_SHORT_WRITE = 'q', SHE_USER_READ = 'r', SHE_SHUTDOWN_READ = 'R', SHE_RST_IN = 's', - SHE_SS_IN = 'S', + SHE_STOP_SENDIG_IN = 'S', SHE_RST_OUT = 't', SHE_RST_ACKED = 'T', SHE_FLUSH = 'u', + SHE_STOP_SENDIG_OUT = 'U', SHE_USER_WRITE_DATA = 'w', SHE_SHUTDOWN_WRITE = 'W', SHE_CLOSE = 'X', @@ -643,10 +653,13 @@ stream_is_finished (const lsquic_stream_t *stream) * stream are outstanding: */ && 0 == stream->n_unacked + && 0 == (stream->sm_qflags & ( /* This checks that no packets that reference this stream will * become outstanding: */ - && 0 == (stream->sm_qflags & SMQF_SEND_RST) + SMQF_SEND_RST + /* Can't finish stream until all "self" flags are unset: */ + | SMQF_SELF_FLAGS)) && ((stream->stream_flags & STREAM_FORCE_FINISH) || (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT))); } @@ -822,6 +835,22 @@ lsquic_stream_readable (struct lsquic_stream *stream) } +static int +stream_writeable (struct lsquic_stream *stream) +{ + /* A stream is writeable if one of the following is true: */ + return + /* - The stream is reset, by either side. In this case, + * lsquic_stream_write() will return -1 (we want the user to be + * able to collect the error). + */ + lsquic_stream_is_reset(stream) + /* - Data can be written to stream: */ + || lsquic_stream_write_avail(stream) + ; +} + + static size_t stream_write_avail_no_frames (struct lsquic_stream *stream) { @@ -1008,6 +1037,7 @@ lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame) { SM_HISTORY_APPEND(stream, SHE_FIN_IN); stream->stream_flags |= STREAM_FIN_RECVD; + stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF; stream->sm_fin_off = DF_END(frame); maybe_finish_stream(stream); } @@ -1127,10 +1157,22 @@ lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset, lsquic_sfcw_consume_rem(&stream->fc); drop_frames_in(stream); - drop_buffered_data(stream); - maybe_elide_stream_frames(stream); - if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT)) + if (!(stream->sm_bflags & SMBF_IETF)) + { + drop_buffered_data(stream); + maybe_elide_stream_frames(stream); + } + + if (stream->sm_qflags & SMQF_WAIT_FIN_OFF) + { + stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF; + LSQ_DEBUG("final offset is now known: %"PRIu64, offset); + } + + if (!(stream->stream_flags & + (STREAM_RST_SENT|STREAM_SS_SENT|STREAM_FIN_SENT)) + && !(stream->sm_bflags & SMBF_IETF) && !(stream->sm_qflags & SMQF_SEND_RST)) lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0); @@ -1153,7 +1195,7 @@ lsquic_stream_stop_sending_in (struct lsquic_stream *stream, return; } - SM_HISTORY_APPEND(stream, SHE_SS_IN); + SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_IN); stream->stream_flags |= STREAM_SS_RECVD; /* Let user collect error: */ @@ -1577,11 +1619,62 @@ lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len) } +void +lsquic_stream_ss_frame_sent (struct lsquic_stream *stream) +{ + assert(stream->sm_qflags & SMQF_SEND_STOP_SENDING); + SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_OUT); + stream->sm_qflags &= ~SMQF_SEND_STOP_SENDING; + stream->stream_flags |= STREAM_SS_SENT; + if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) + TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream); +} + + +static void +handle_early_read_shutdown_ietf (struct lsquic_stream *stream) +{ + if (!(stream->sm_qflags & SMQF_SENDING_FLAGS)) + TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, + next_send_stream); + stream->sm_qflags |= SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF; +} + + +static void +handle_early_read_shutdown_gquic (struct lsquic_stream *stream) +{ + if (!(stream->stream_flags & STREAM_RST_SENT)) + { + lsquic_stream_reset_ext(stream, 7 /* QUIC_STREAM_CANCELLED */, 0); + stream->sm_qflags |= SMQF_WAIT_FIN_OFF; + } +} + + +static void +handle_early_read_shutdown (struct lsquic_stream *stream) +{ + if (stream->sm_bflags & SMBF_IETF) + handle_early_read_shutdown_ietf(stream); + else + handle_early_read_shutdown_gquic(stream); +} + + static void stream_shutdown_read (lsquic_stream_t *stream) { if (!(stream->stream_flags & STREAM_U_READ_DONE)) { + if (!(stream->stream_flags & STREAM_FIN_REACHED)) + { + LSQ_DEBUG("read shut down before reading FIN. (FIN received: %d)", + !!(stream->stream_flags & STREAM_FIN_RECVD)); + SM_HISTORY_APPEND(stream, SHE_EARLY_READ_STOP); + if (!(stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD))) + handle_early_read_shutdown(stream); + } SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ); stream->stream_flags |= STREAM_U_READ_DONE; stream->sm_readable = stream_readable_discard; @@ -1625,7 +1718,9 @@ stream_shutdown_write (lsquic_stream_t *stream) if (!(stream->sm_bflags & SMBF_CRYPTO) && !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT)) && !stream_is_incoming_unidir(stream) - && !(stream->sm_qflags & SMQF_SEND_RST)) + /* In gQUIC, receiving a RESET means "stop sending" */ + && !(!(stream->sm_qflags & SMBF_IETF) + && (stream->stream_flags & STREAM_RST_RECVD))) { if ((stream->sm_bflags & SMBF_USE_HEADERS) && !(stream->stream_flags & STREAM_HEADERS_SENT)) @@ -1705,6 +1800,8 @@ lsquic_stream_shutdown_internal (lsquic_stream_t *stream) { LSQ_DEBUG("internal shutdown"); stream->stream_flags |= STREAM_U_READ_DONE|STREAM_U_WRITE_DONE; + stream_wantwrite(stream, 0); + stream_wantread(stream, 0); if (lsquic_stream_is_critical(stream)) { LSQ_DEBUG("add flag to force-finish special stream"); @@ -2054,7 +2151,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream) stream->stream_flags |= STREAM_LAST_WRITE_OK; while ((stream->sm_qflags & SMQF_WANT_WRITE) && (stream->stream_flags & STREAM_LAST_WRITE_OK) - && lsquic_stream_write_avail(stream)) + && stream_writeable(stream)) { progress = stream_progress(stream); @@ -2173,7 +2270,7 @@ lsquic_stream_dispatch_write_events (lsquic_stream_t *stream) if (stream->sm_bflags & SMBF_RW_ONCE) { if ((stream->sm_qflags & SMQF_WANT_WRITE) - && lsquic_stream_write_avail(stream)) + && stream_writeable(stream)) { on_write = select_on_write(stream); on_write(stream, stream->st_ctx); @@ -2229,7 +2326,7 @@ stream_flush (lsquic_stream_t *stream) empty_reader.lsqr_size = inner_reader_empty_size; empty_reader.lsqr_read = inner_reader_empty_read; empty_reader.lsqr_ctx = NULL; /* pro forma */ - nw = stream_write_to_packets(stream, &empty_reader, 0); + nw = stream_write_to_packets(stream, &empty_reader, 0, SWO_BUFFER); if (nw >= 0) { @@ -2655,6 +2752,27 @@ stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off, } +struct hq_arr +{ + unsigned char **p; + unsigned count; + unsigned max; +}; + + +static int +save_hq_ptr (struct hq_arr *hq_arr, void *p) +{ + if (hq_arr->count < hq_arr->max) + { + hq_arr->p[hq_arr->count++] = p; + return 0; + } + else + return -1; +} + + static size_t frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) { @@ -2708,6 +2826,11 @@ frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin) if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM))) { shf->shf_frame_ptr = p; + if (stream->sm_hq_arr && 0 != save_hq_ptr(stream->sm_hq_arr, p)) + { + stream_hq_frame_put(stream, shf); + break; + } memset(p, 0, frame_sz); p += frame_sz; } @@ -3079,7 +3202,7 @@ maybe_close_varsize_hq_frame (struct lsquic_stream *stream) static ssize_t stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, - size_t thresh) + size_t thresh, enum stream_write_options swo) { size_t size; ssize_t nw; @@ -3146,7 +3269,7 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, if (use_framing && seen_ok) maybe_close_varsize_hq_frame(stream); - if (thresh) + if (thresh && (swo & SWO_BUFFER)) { assert(size < thresh); assert(size >= stream->sm_n_buffered); @@ -3159,7 +3282,8 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */ } } - else +#ifndef NDEBUG + else if (swo & SWO_BUFFER) { /* We count flushed data towards both stream and connection limits, * so we should have been able to packetize all of it: @@ -3167,6 +3291,7 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader, assert(0 == stream->sm_n_buffered); assert(size == 0); } +#endif maybe_mark_as_blocked(stream); @@ -3333,7 +3458,8 @@ save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader, static ssize_t -stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) +stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader, + enum stream_write_options swo) { const struct stream_hq_frame *shf; size_t thresh, len, frames, total_len, n_allowed, nwritten; @@ -3354,6 +3480,8 @@ stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) n_allowed = stream_get_n_allowed(stream); if (total_len <= n_allowed && total_len < thresh) { + if (!(swo & SWO_BUFFER)) + return 0; nwritten = 0; do { @@ -3370,7 +3498,7 @@ stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader) return nwritten; } else - return stream_write_to_packets(stream, reader, thresh); + return stream_write_to_packets(stream, reader, thresh, swo); } @@ -3450,7 +3578,7 @@ lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov, .lsqr_ctx = &iro, }; - return stream_write(stream, &reader); + return stream_write(stream, &reader, SWO_BUFFER); } @@ -3459,7 +3587,221 @@ lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader) { COMMON_WRITE_CHECKS(); SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); - return stream_write(stream, reader); + return stream_write(stream, reader, SWO_BUFFER); +} + + +/* Configuration for lsquic_stream_pwritev: */ +#ifndef LSQUIC_PWRITEV_DEF_IOVECS +#define LSQUIC_PWRITEV_DEF_IOVECS 16 +#endif +/* This is an overkill, this limit should only be reached during testing: */ +#ifndef LSQUIC_PWRITEV_DEF_FRAMES +#define LSQUIC_PWRITEV_DEF_FRAMES (LSQUIC_PWRITEV_DEF_IOVECS * 2) +#endif + +#ifdef NDEBUG +#define PWRITEV_IOVECS LSQUIC_PWRITEV_DEF_IOVECS +#define PWRITEV_FRAMES LSQUIC_PWRITEV_DEF_FRAMES +#else +static unsigned + PWRITEV_IOVECS = LSQUIC_PWRITEV_DEF_IOVECS, + PWRITEV_FRAMES = LSQUIC_PWRITEV_DEF_FRAMES; + +void +lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames) +{ + PWRITEV_IOVECS = iovecs; + PWRITEV_FRAMES = frames; +} + + +#endif + +struct pwritev_ctx +{ + struct iovec *iov; + const struct hq_arr *hq_arr; + size_t total_bytes; + size_t n_to_write; + unsigned n_iovecs, max_iovecs; +}; + + +static size_t +pwritev_size (void *lsqr_ctx) +{ + struct pwritev_ctx *const ctx = lsqr_ctx; + + if (ctx->n_iovecs < ctx->max_iovecs + && ctx->hq_arr->count < ctx->hq_arr->max) + return ctx->n_to_write - ctx->total_bytes; + else + return 0; +} + + +static size_t +pwritev_read (void *lsqr_ctx, void *buf, size_t count) +{ + struct pwritev_ctx *const ctx = lsqr_ctx; + + assert(ctx->n_iovecs < ctx->max_iovecs); + ctx->iov[ctx->n_iovecs].iov_base = buf; + ctx->iov[ctx->n_iovecs].iov_len = count; + ++ctx->n_iovecs; + ctx->total_bytes += count; + return count; +} + + +/* pwritev works as follows: allocate packets via lsquic_stream_writef() call + * and record pointers and sizes into an iovec array. Then issue a single call + * to user-supplied preadv() to populate all packets in one shot. + * + * Unwinding state changes due to a short write is by far the most complicated + * part of the machinery that follows. We optimize the normal path: it should + * be cheap to be prepared for the unwinding; unwinding itself can be more + * expensive, as we do not expect it to happen often. + */ +ssize_t +lsquic_stream_pwritev (struct lsquic_stream *stream, + ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt), + void *user_data, size_t n_to_write) +{ + struct lsquic_send_ctl *const ctl = stream->conn_pub->send_ctl; + struct iovec iovecs[PWRITEV_IOVECS], *last_iov; + unsigned char *hq_frames[PWRITEV_FRAMES]; + struct pwritev_ctx ctx; + struct lsquic_reader reader; + struct send_ctl_state ctl_state; + struct hq_arr hq_arr; + ssize_t nw; + size_t n_allocated, sum; +#ifndef NDEBUG + const unsigned short n_buffered = stream->sm_n_buffered; +#endif + + COMMON_WRITE_CHECKS(); + SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA); + + lsquic_send_ctl_snapshot(ctl, &ctl_state); + + ctx.total_bytes = 0; + ctx.n_to_write = n_to_write; + ctx.n_iovecs = 0; + ctx.max_iovecs = sizeof(iovecs) / sizeof(iovecs[0]); + ctx.iov = iovecs; + ctx.hq_arr = &hq_arr; + + hq_arr.p = hq_frames; + hq_arr.count = 0; + hq_arr.max = sizeof(hq_frames) / sizeof(hq_frames[0]); + stream->sm_hq_arr = &hq_arr; + + reader.lsqr_ctx = &ctx; + reader.lsqr_size = pwritev_size; + reader.lsqr_read = pwritev_read; + + nw = stream_write(stream, &reader, 0); + LSQ_DEBUG("pwritev: stream_write returned %zd, n_iovecs: %d", nw, + ctx.n_iovecs); + if (nw > 0) + { + /* Amount of buffered data shouldn't have increased */ + assert(n_buffered >= stream->sm_n_buffered); + n_allocated = (size_t) nw; + nw = preadv(user_data, ctx.iov, ctx.n_iovecs); + LSQ_DEBUG("pwritev: preadv returned %zd", nw); + if (nw >= 0 && (size_t) nw < n_allocated) + goto unwind_short_write; + } + + cleanup: + stream->sm_hq_arr = NULL; + return nw; + + unwind_short_write: + /* What follows is not the most efficient process. The emphasis here is + * on being simple instead. We expect short writes to be rare, so being + * slower than possible is a good tradeoff for being correct. + */ + LSQ_DEBUG("short write occurred, unwind"); + SM_HISTORY_APPEND(stream, SHE_SHORT_WRITE); + + /* First, adjust connection cap and stream offsets, and HTTP/3 framing, + * if necessary. + */ + if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF)) + == (SMBF_USE_HEADERS|SMBF_IETF)) + { + size_t shortfall, payload_sz, decr; + unsigned char *p; + unsigned bits; + + assert(hq_arr.count > 0); + shortfall = n_allocated - (size_t) nw; + do + { + const unsigned count = hq_arr.count; + (void) count; + p = hq_frames[--hq_arr.count]; + assert(p[0] == HQFT_DATA); + assert(!(p[1] & 0x80)); /* Only one- and two-byte frame sizes */ + if (p[1] & 0x40) + { + payload_sz = (p[1] & 0x3F) << 8; + payload_sz |= p[2]; + } + else + payload_sz = p[1]; + if (payload_sz > shortfall) + { + bits = p[1] >> 6; + vint_write(p + 1, payload_sz - shortfall, bits, 1 << bits); + decr = shortfall; + if (stream->sm_bflags & SMBF_CONN_LIMITED) + stream->conn_pub->conn_cap.cc_sent -= decr; + stream->sm_payload -= decr; + stream->tosend_off -= decr; + shortfall = 0; + } + else + { + decr = payload_sz + 2 + (p[1] >> 6); + if (stream->sm_bflags & SMBF_CONN_LIMITED) + stream->conn_pub->conn_cap.cc_sent -= decr; + stream->sm_payload -= payload_sz; + stream->tosend_off -= decr; + shortfall -= payload_sz; + } + } + while (hq_arr.count); + assert(shortfall == 0); + } + else + { + const size_t shortfall = n_allocated - (size_t) nw; + if (stream->sm_bflags & SMBF_CONN_LIMITED) + stream->conn_pub->conn_cap.cc_sent -= shortfall; + stream->sm_payload -= shortfall; + stream->tosend_off -= shortfall; + } + + /* Find last iovec: */ + sum = 0; + for (last_iov = iovecs; last_iov + < iovecs + sizeof(iovecs)/sizeof(iovecs[0]); ++last_iov) + { + sum += last_iov->iov_len; + if ((last_iov == iovecs || (size_t) nw > sum - last_iov->iov_len) + && (size_t) nw <= sum) + break; + } + assert(last_iov < iovecs + sizeof(iovecs)/sizeof(iovecs[0])); + lsquic_send_ctl_rollback(ctl, &ctl_state, last_iov, sum - nw); + + goto cleanup; } @@ -3478,7 +3820,7 @@ stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz) .lsqr_size = inner_reader_iovec_size, .lsqr_ctx = &iro, }; - return stream_write(stream, &reader); + return stream_write(stream, &reader, SWO_BUFFER); } @@ -4747,7 +5089,7 @@ on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h) promise = SLIST_FIRST(&stream->sm_promises); init_pp_reader(promise, &pp_reader); - nw = stream_write(stream, &pp_reader); + nw = stream_write(stream, &pp_reader, SWO_BUFFER); if (nw > 0) { LSQ_DEBUG("wrote %zd bytes more of push promise (%s)", @@ -4801,7 +5143,7 @@ lsquic_stream_push_promise (struct lsquic_stream *stream, stream->stream_flags |= STREAM_PUSHING; init_pp_reader(promise, &pp_reader); - nw = stream_write(stream, &pp_reader); + nw = stream_write(stream, &pp_reader, SWO_BUFFER); if (nw > 0) { SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next); diff --git a/src/liblsquic/lsquic_stream.h b/src/liblsquic/lsquic_stream.h index 70e2eab..f4abcf0 100644 --- a/src/liblsquic/lsquic_stream.h +++ b/src/liblsquic/lsquic_stream.h @@ -22,11 +22,7 @@ TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream); #ifndef LSQUIC_KEEP_STREAM_HISTORY -# ifdef NDEBUG -# define LSQUIC_KEEP_STREAM_HISTORY 0 -# else -# define LSQUIC_KEEP_STREAM_HISTORY 1 -# endif +# define LSQUIC_KEEP_STREAM_HISTORY 1 #endif @@ -149,11 +145,13 @@ enum stream_q_flags * connections's sending_streams queue. Note that writing STREAM * frames is done separately. */ -#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED) +#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED\ + |SMQF_SEND_STOP_SENDING) /* sending_streams: */ SMQF_SEND_WUF = 1 << 3, /* WUF: Window Update Frame */ SMQF_SEND_BLOCKED = 1 << 4, SMQF_SEND_RST = 1 << 5, /* Error: want to send RST_STREAM */ + SMQF_SEND_STOP_SENDING = 1 << 10, /* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is * the MAX_STREAM_DATA frame. Define an alias for use in the IETF @@ -167,6 +165,10 @@ enum stream_q_flags SMQF_ABORT_CONN = 1 << 8, /* Unrecoverable error occurred */ SMQF_QPACK_DEC = 1 << 9, /* QPACK decoder handler is holding a reference to this stream */ + + /* The stream can reference itself, preventing its own destruction: */ +#define SMQF_SELF_FLAGS SMQF_WAIT_FIN_OFF + SMQF_WAIT_FIN_OFF = 1 << 11, /* Waiting for final offset: FIN or RST */ }; @@ -209,7 +211,7 @@ enum stream_flags { STREAM_PUSHING = 1 << 18, STREAM_NOPUSH = 1 << 19, /* Disallow further push promises */ STREAM_GOAWAY_IN = 1 << 20, /* Incoming GOAWAY has been processed */ - STREAM_UNUSED21 = 1 << 21, /* Unused */ + STREAM_SS_SENT = 1 << 21, /* STOP_SENDING sent */ STREAM_RST_ACKED = 1 << 22, /* Packet containing RST has been acked */ STREAM_BLOCKED_SENT = 1 << 23, /* Stays set once a STREAM_BLOCKED frame is sent */ STREAM_RST_READ = 1 << 24, /* User code collected the error */ @@ -266,6 +268,9 @@ struct lsquic_stream struct hq_filter sm_hq_filter; + /* Optional tap for pwritev undo */ + struct hq_arr *sm_hq_arr; + /* We can safely use sm_hq_filter */ #define sm_uni_type_state sm_hq_filter.hqfi_vint2_state.vr2s_varint_state @@ -602,4 +607,12 @@ lsquic_stream_verify_len (struct lsquic_stream *, unsigned long long); #define lsquic_stream_is_blocked(stream_) ((stream_)->blocked_off && \ (stream_)->blocked_off == (stream_)->max_send_off) +void +lsquic_stream_ss_frame_sent (struct lsquic_stream *); + +#ifndef NDEBUG +void +lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames); +#endif + #endif diff --git a/tests/test_h3_framing.c b/tests/test_h3_framing.c index e7063af..fa00b95 100644 --- a/tests/test_h3_framing.c +++ b/tests/test_h3_framing.c @@ -59,6 +59,9 @@ #include "lsquic_hq.h" #include "lsquic_data_in_if.h" +#define MIN(a, b) ((a) < (b) ? (a) : (b)) +#define MAX(a, b) ((a) > (b) ? (a) : (b)) + static const struct parse_funcs *g_pf = select_pf_by_ver(LSQVER_ID27); struct test_ctl_settings @@ -270,11 +273,24 @@ struct test_objs { struct lsquic_hset_if hsi_if; }; +static int s_ack_written; + +static void +write_ack (struct lsquic_conn *conn, struct lsquic_packet_out *packet_out) +{ + /* We don't need to generate full-blown ACK, as logic in + * lsquic_send_ctl_rollback() only looks at po_frame_types. + */ + packet_out->po_frame_types |= QUIC_FRAME_ACK; + s_ack_written = 1; +} + +static int s_can_write_ack; static int -unit_test_doesnt_write_ack (struct lsquic_conn *lconn) +can_write_ack (struct lsquic_conn *lconn) { - return 0; + return s_can_write_ack; } @@ -286,11 +302,31 @@ get_network_path (struct lsquic_conn *lconn, const struct sockaddr *sa) return &network_path; } +static enum { + SNAPSHOT_STATE_NONE = 0, + SNAPSHOT_STATE_TAKEN = 1 << 0, + SNAPSHOT_STATE_ROLLED_BACK = 1 << 1, +} s_snapshot_state; + +static void +ack_snapshot (struct lsquic_conn *lconn, struct ack_state *ack_state) +{ + s_snapshot_state |= SNAPSHOT_STATE_TAKEN; +} + +static void +ack_rollback (struct lsquic_conn *lconn, struct ack_state *ack_state) +{ + s_snapshot_state |= SNAPSHOT_STATE_ROLLED_BACK; +} static const struct conn_iface our_conn_if = { - .ci_can_write_ack = unit_test_doesnt_write_ack, + .ci_can_write_ack = can_write_ack, + .ci_write_ack = write_ack, .ci_get_path = get_network_path, + .ci_ack_snapshot = ack_snapshot, + .ci_ack_rollback = ack_rollback, }; @@ -662,7 +698,7 @@ main_test_hq_framing (void) * file. This allows afl-fuzz explore the code paths. */ void -fuzz_guided_testing (const char *input) +fuzz_guided_hq_framing_testing (const char *input) { /* Range */ /* Bytes from file */ unsigned short packet_sz; /* [200, 0x3FFF] */ /* 2 */ @@ -728,6 +764,391 @@ fuzz_guided_testing (const char *input) } +struct pwritev_stream_ctx +{ + int limit; /* Test limit */ + size_t avail; + const unsigned char *input; + size_t input_sz; + ssize_t nw; /* Number of bytes written */ +}; + + +static ssize_t +my_preadv (void *user_data, const struct iovec *iov, int iovcnt) +{ + struct pwritev_stream_ctx *const pw_ctx = user_data; + const unsigned char *p; + size_t ntoread, tocopy; + int i; + + ntoread = 0; + for (i = 0; i < iovcnt; ++i) + ntoread += iov[i].iov_len; + + if (pw_ctx->limit < 0) + { + if ((size_t) -pw_ctx->limit < ntoread) + ntoread -= (size_t) -pw_ctx->limit; + } + else if ((size_t) pw_ctx->limit < ntoread) + ntoread = (size_t) pw_ctx->limit; + + assert(ntoread <= pw_ctx->input_sz); /* Self-check */ + + p = pw_ctx->input; + for (i = 0; i < iovcnt; ++i) + { + tocopy = MIN(iov[i].iov_len, ntoread - (p - pw_ctx->input)); + memcpy(iov[i].iov_base, p, tocopy); + p += tocopy; + if (ntoread == (size_t) (p - pw_ctx->input)) + break; + } + + assert(ntoread == (size_t) (p - pw_ctx->input)); + return (ssize_t) (p - pw_ctx->input); +} + + +static void +pwritev_on_write (lsquic_stream_t *stream, lsquic_stream_ctx_t *ctx) +{ + struct pwritev_stream_ctx *const pw_ctx = (void *) ctx; + ssize_t nw; + + nw = lsquic_stream_pwritev(stream, my_preadv, pw_ctx, pw_ctx->input_sz); + pw_ctx->nw = nw; + lsquic_stream_wantwrite(stream, 0); +} + + + +static const struct lsquic_stream_if pwritev_stream_if = { + .on_new_stream = packetization_on_new_stream, + .on_close = packetization_on_close, + .on_write = pwritev_on_write, +}; + + +static void +test_pwritev (enum lsquic_version version, int http, int sched_immed, + int limit, unsigned short packet_sz, size_t prologue_sz, + unsigned n_packets) +{ + struct test_objs tobjs; + struct lsquic_stream *stream; + size_t nw; + int fin, s; + unsigned char *buf_in, *buf_out; + /* Some values that are large enough: */ + const size_t buf_in_sz = MAX(n_packets * packet_sz, 0x1000), + buf_out_sz = (float) buf_in_sz * 1.1; + const int ietf = (1 << version) & LSQUIC_IETF_VERSIONS; + const enum stream_ctor_flags ietf_flags = ietf ? SCF_IETF : 0; + + s_snapshot_state = 0; + s_ack_written = 0; + + /* We'll write headers first after which stream will switch to using + * data-framing writer. This is simply so that we don't have to + * expose more stream things only for testing. + */ + struct lsxpack_header header = { XHDR(":method", "GET") }; + struct lsquic_http_headers headers = { 1, &header, }; + + buf_in = malloc(buf_in_sz); + buf_out = malloc(buf_out_sz); + assert(buf_in && buf_out); + + struct pwritev_stream_ctx pwritev_stream_ctx = + { + .input = buf_in + prologue_sz, + .input_sz = buf_in_sz - prologue_sz, + .limit = limit, + }; + + init_buf(buf_in, buf_in_sz); + + init_test_ctl_settings(&g_ctl_settings); + g_ctl_settings.tcs_schedule_stream_packets_immediately = sched_immed; + + stream_ctor_flags |= ietf_flags; + init_test_objs(&tobjs, buf_out_sz, buf_out_sz, packet_sz); + tobjs.lconn.cn_version = version; + tobjs.lconn.cn_esf_c = select_esf_common_by_ver(version); + tobjs.stream_if_ctx = &pwritev_stream_ctx; + tobjs.ctor_flags |= (http ? SCF_HTTP : 0)|ietf_flags; + if (sched_immed) + { + g_ctl_settings.tcs_can_send = n_packets; + tobjs.stream_if = &pwritev_stream_if; + } + else + { + lsquic_send_ctl_set_max_bpq_count(n_packets); + g_ctl_settings.tcs_can_send = INT_MAX; + g_ctl_settings.tcs_bp_type = BPT_OTHER_PRIO; + /* Need this for on_new_stream() callback not to mess with + * the context, otherwise this is not used. + */ + tobjs.stream_if = &pwritev_stream_if; + } + + stream = new_stream(&tobjs, 0, buf_out_sz); + + if (http) + { + if (ietf) + { + s = lsquic_stream_send_headers(stream, &headers, 0); + assert(0 == s); + } + else + /* Here we fake it in order not to have to set up frame writer. */ + stream->stream_flags |= STREAM_HEADERS_SENT; + } + + if (prologue_sz) + { + ssize_t written = lsquic_stream_write(stream, buf_in, prologue_sz); + assert(written > 0 && (size_t) written == prologue_sz); + } + + if (sched_immed) + { + lsquic_stream_dispatch_write_events(stream); + assert(!(s_snapshot_state & SNAPSHOT_STATE_TAKEN)); + // lsquic_stream_flush(stream); + } + else + { + pwritev_on_write(stream, (void *) &pwritev_stream_ctx); + assert(s_snapshot_state & SNAPSHOT_STATE_TAKEN); + if (n_packets > 0 + && s_ack_written + && tobjs.send_ctl.sc_buffered_packets[BPT_OTHER_PRIO].bpq_count == 0) + assert(s_snapshot_state & SNAPSHOT_STATE_ROLLED_BACK); + g_ctl_settings.tcs_schedule_stream_packets_immediately = 1; + lsquic_send_ctl_schedule_buffered(&tobjs.send_ctl, BPT_OTHER_PRIO); + g_ctl_settings.tcs_schedule_stream_packets_immediately = 0; + lsquic_send_ctl_set_max_bpq_count(10); + } + + assert(pwritev_stream_ctx.nw >= 0); + + /* Verify written data: */ + nw = read_from_scheduled_packets(&tobjs.send_ctl, 0, buf_out, buf_out_sz, + 0, &fin, 1); + assert(nw <= buf_in_sz); + + if (ietf && http) + { /* Remove framing and verify contents */ + const unsigned char *src; + unsigned char *dst; + uint64_t sz; + unsigned frame_type; + int s; + + src = buf_out; + dst = buf_out; + while (src < buf_out + nw) + { + frame_type = *src++; + s = vint_read(src, buf_out + buf_out_sz, &sz); + assert(s > 0); + /* In some rare circumstances it is possible to produce zero-length + * DATA frames: + * + * assert(sz > 0); + */ + assert(sz < (1 << 14)); + src += s; + if (src == buf_out + s + 1) + { + /* Ignore headers */ + assert(frame_type == HQFT_HEADERS); + src += sz; + } + else + { + assert(frame_type == HQFT_DATA); + if (src + sz > buf_out + nw) /* Chopped DATA frame (last) */ + sz = buf_out + nw - src; + memmove(dst, src, sz); + dst += sz; + src += sz; + } + } + assert(nw <= buf_in_sz); + if (n_packets && pwritev_stream_ctx.nw) + { + assert((size_t) pwritev_stream_ctx.nw + prologue_sz == (uintptr_t) dst - (uintptr_t) buf_out); + assert(0 == memcmp(buf_in, buf_out, (uintptr_t) dst - (uintptr_t) buf_out)); + } + else + assert((uintptr_t) dst - (uintptr_t) buf_out == 0 + || (uintptr_t) dst - (uintptr_t) buf_out == prologue_sz); + } + else + { + assert(nw <= buf_in_sz); + assert(nw <= buf_out_sz); + if (n_packets && pwritev_stream_ctx.nw) + { + assert((size_t) pwritev_stream_ctx.nw + prologue_sz == nw); + assert(0 == memcmp(buf_in, buf_out, (size_t) nw)); + } + else + assert(nw == 0 || nw == prologue_sz); + } + + lsquic_stream_destroy(stream); + deinit_test_objs(&tobjs); + free(buf_in); + free(buf_out); + + stream_ctor_flags &= ~ietf_flags; +} + + +static void +main_test_pwritev (void) +{ + const int limits[] = { INT_MAX, -1, -2, -3, -7, -10, -50, -100, -201, -211, + -1000, -2003, -3000, -4000, -17803, -20000, 16 * 1024, 16 * 1024 - 1, + 16 * 1024 - 2, 8000, 273, 65, 63, 10, 5, 1, 0, }; + unsigned n_packets; + const unsigned short packet_sz[] = { 1252, 1370, 0x1000, 0xFF00, }; + const size_t prologues[] = { 0, 17, 238, }; + unsigned i, j, k; + enum lsquic_version version; + int http, sched_immed; + const struct { unsigned iovecs, frames; } combos[] = + { + { 32, 16, }, + { 16, 16, }, + { 16, 8, }, + { 3, 7, }, + { 7, 3, }, + { 100, 100, }, + }, *combo = combos; + + s_can_write_ack = 1; + + run_test: + for (version = 0; version < N_LSQVER; ++version) + if ((1 << version) & LSQUIC_SUPPORTED_VERSIONS) + for (http = 0; http < 2; ++http) + for (sched_immed = 0; sched_immed <= 1; ++sched_immed) + for (i = 0; i < sizeof(limits) / sizeof(limits[i]); ++i) + for (j = 0; j < sizeof(packet_sz) / sizeof(packet_sz[0]); + ++j) + for (k = 0; k < sizeof(prologues) / sizeof(prologues[0]); ++k) + for (n_packets = 1; n_packets < 21; ++n_packets) + test_pwritev(version, http, sched_immed, + limits[i], packet_sz[j], prologues[k], n_packets); + + if (combo < combos + sizeof(combos) / sizeof(combos[0])) + { + lsquic_stream_set_pwritev_params(combo->iovecs, combo->frames); + ++combo; + goto run_test; + } + + s_can_write_ack = 0; +} + + +/* Instead of the not-very-random testing done in main_test_pwritev(), + * the fuzz-guided testing initializes parameters based on the fuzz input + * file. This allows afl-fuzz explore the code paths. + */ +void +fuzz_guided_pwritev_testing (const char *input) +{ + /* Range */ /* Bytes from file */ + unsigned short packet_sz; /* [1200, 0xFF00] */ /* 2 */ + int limit; /* [INT_MIN, INT_MAX] */ /* 2 */ + unsigned n_packets; /* [0, 255] */ /* 1 */ + unsigned n_iovecs; /* [0, 255] */ /* 1 */ + unsigned n_frames; /* [0, 255] */ /* 1 */ + size_t prologue_sz; /* [0, 170] */ /* 1 */ + enum lsquic_version version;/* [0,7] */ /* 1 */ + int sched_immed; /* 0 or 1 */ /* 1 (same byte) */ + int http; /* 0 or 1 */ /* 1 (same byte) */ + + /* TOTAL: 9 bytes */ + + FILE *f; + size_t nread; + union { + uint16_t tmp; + int16_t itmp; + } u; + unsigned char buf[10]; + + f = fopen(input, "rb"); + if (!f) + { + assert(0); + return; + } + + nread = fread(buf, 1, sizeof(buf), f); + if (nread != 9) + goto cleanup; + + memcpy(&u.tmp, &buf[0], 2); + if (u.tmp < 1200) + u.tmp = 1200; + else if (u.tmp > 0xFF00) + u.tmp = 0xFF00; + packet_sz = u.tmp; + + memcpy(&u.itmp, &buf[2], 2); + if (u.itmp < SHRT_MIN / 2) + limit = INT_MIN; + else if (u.itmp < SHRT_MIN / 4) + limit = 0; + else if (u.itmp > SHRT_MAX / 2) + limit = INT_MAX; + else if (u.itmp > SHRT_MAX / 2) + limit = 0; + else + limit = u.itmp; + + n_packets = buf[4]; + n_iovecs = buf[5]; + n_frames = buf[6]; + + prologue_sz = buf[7]; + if (prologue_sz > 170) + prologue_sz = 170; + + switch (buf[8] & 7) + { + case 0: version = LSQVER_043; break; + case 1: version = LSQVER_046; break; + case 2: version = LSQVER_050; break; + case 3: version = LSQVER_ID27; break; + case 4: version = LSQVER_ID28; break; + default: + case 5: version = LSQVER_ID29; break; + } + + sched_immed = !!(buf[8] & 0x08); + http = !!(buf[8] & 0x10); + + lsquic_stream_set_pwritev_params(n_iovecs, n_frames); + test_pwritev(version, http, sched_immed, limit, packet_sz, prologue_sz, + n_packets); + + cleanup: + (void) fclose(f); +} + + static void test_frame_header_split (unsigned n_packets, unsigned extra_sz, int add_one_more) @@ -1170,18 +1591,22 @@ test_reading_zero_size_data_frame_scenario3 (void) int main (int argc, char **argv) { - const char *fuzz_input = NULL; + const char *fuzz_hq_framing_input = NULL; + const char *fuzz_pwritev_input = NULL; int opt, add_one_more; unsigned n_packets, extra_sz; lsquic_global_init(LSQUIC_GLOBAL_SERVER); - while (-1 != (opt = getopt(argc, argv, "f:l:"))) + while (-1 != (opt = getopt(argc, argv, "f:p:l:"))) { switch (opt) { case 'f': - fuzz_input = optarg; + fuzz_hq_framing_input = optarg; + break; + case 'p': + fuzz_pwritev_input = optarg; break; case 'l': lsquic_log_to_fstream(stderr, 0); @@ -1194,10 +1619,14 @@ main (int argc, char **argv) init_test_ctl_settings(&g_ctl_settings); - if (fuzz_input) - fuzz_guided_testing(fuzz_input); + if (fuzz_hq_framing_input) + fuzz_guided_hq_framing_testing(fuzz_hq_framing_input); + else if (fuzz_pwritev_input) + fuzz_guided_pwritev_testing(fuzz_pwritev_input); else { + main_test_pwritev(); + return 0; main_test_hq_framing(); for (n_packets = 1; n_packets <= 2; ++n_packets) for (extra_sz = 0; extra_sz <= 2; ++extra_sz) diff --git a/tests/test_send_headers.c b/tests/test_send_headers.c index 6ea74b2..9fc938f 100644 --- a/tests/test_send_headers.c +++ b/tests/test_send_headers.c @@ -340,6 +340,9 @@ test_flushes_and_closes (void) assert(s == 0); s = lsquic_stream_close(stream); assert(s == 0); + /* OK, we did not read FIN, expect these flags: */ + assert((stream->sm_qflags & (SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF)) == (SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF)); + lsquic_stream_ss_frame_sent(stream); assert(stream->sm_hblock_sz == test_vals.prefix_sz + test_vals.headers_sz); assert(0 == stream->sm_n_buffered); assert(stream->sm_qflags & SMQF_WANT_WRITE); /* Still set */ @@ -348,6 +351,9 @@ test_flushes_and_closes (void) assert(stream->sm_qflags & SMQF_CALL_ONCLOSE); lsquic_stream_acked(stream, QUIC_FRAME_STREAM); lsquic_stream_call_on_close(stream); + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); /* Not yet */ + lsquic_stream_rst_in(stream, 0, 0); + assert(!(stream->sm_qflags & (SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF))); assert(stream->sm_qflags & SMQF_FREE_STREAM); lsquic_stream_destroy(stream); @@ -386,6 +392,7 @@ test_headers_wantwrite_restoration (const int want_write) assert(s == 0); hset = lsquic_stream_get_hset(stream); assert(hset == (void *) 12345); + stream->stream_flags |= STREAM_FIN_RECVD; /* Pretend we received FIN */ s = lsquic_stream_shutdown(stream, 0); assert(0 == s); test_vals.status = QWH_PARTIAL; diff --git a/tests/test_stream.c b/tests/test_stream.c index 7eba330..4459320 100644 --- a/tests/test_stream.c +++ b/tests/test_stream.c @@ -745,7 +745,7 @@ test_rem_FIN_loc_FIN (struct test_objs *tobjs) /* Server: we read data and close the read side before reading FIN, which - * DOES NOT result in stream being reset. + * results in stream being reset. */ static void test_rem_data_loc_close_and_rst_in (struct test_objs *tobjs) @@ -765,6 +765,13 @@ test_rem_data_loc_close_and_rst_in (struct test_objs *tobjs) s = lsquic_stream_shutdown(stream, 0); assert(0 == s); + /* Early read shutdown results in different frames on different QUIC + * transports: + */ + if (stream->sm_bflags & SMBF_IETF) + assert(stream->sm_qflags & SMQF_SEND_STOP_SENDING); + else + assert(stream->sm_qflags & SMQF_SEND_RST); assert(TAILQ_EMPTY(&tobjs->conn_pub.service_streams)); assert(!((stream->sm_qflags & (SMQF_SERVICE_FLAGS)) == SMQF_CALL_ONCLOSE)); @@ -776,6 +783,7 @@ test_rem_data_loc_close_and_rst_in (struct test_objs *tobjs) assert(0 == s); assert(1 == lsquic_send_ctl_n_scheduled(&tobjs->send_ctl)); /* Shutdown performs a flush */ + assert(stream->n_unacked == 1); assert(!TAILQ_EMPTY(&tobjs->conn_pub.service_streams)); assert((stream->sm_qflags & (SMQF_SERVICE_FLAGS)) == SMQF_CALL_ONCLOSE); @@ -783,7 +791,17 @@ test_rem_data_loc_close_and_rst_in (struct test_objs *tobjs) s = lsquic_stream_rst_in(stream, 100, 1); assert(0 == s); - assert(stream->sm_qflags & SMQF_FREE_STREAM); + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); /* Not yet */ + assert(stream->sm_qflags & SMQF_CALL_ONCLOSE); + + lsquic_stream_rst_frame_sent(stream); + stream->n_unacked++; /* RESET frame take a reference */ + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); /* Not yet, + because: */ assert(stream->n_unacked == 2); + + lsquic_stream_acked(stream, QUIC_FRAME_STREAM); + lsquic_stream_acked(stream, QUIC_FRAME_RST_STREAM); + assert(stream->sm_qflags & SMQF_FREE_STREAM); /* OK, now */ lsquic_stream_destroy(stream); /* This simply checks that the stream got removed from the queue: */ @@ -795,8 +813,8 @@ test_rem_data_loc_close_and_rst_in (struct test_objs *tobjs) /* Server: we read data and close the read side before reading FIN. No - * FIN or RST arrive from peer. This should still place the stream on - * the "streams to be freed" list. + * FIN or RST arrive from peer. This should schedule RST_STREAM to be + * sent (this is gQUIC) and add "wait for known FIN" flag. */ static void test_rem_data_loc_close (struct test_objs *tobjs) @@ -833,7 +851,16 @@ test_rem_data_loc_close (struct test_objs *tobjs) assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); lsquic_stream_acked(stream, QUIC_FRAME_STREAM); - assert(stream->sm_qflags & SMQF_FREE_STREAM); + + lsquic_stream_rst_frame_sent(stream); + stream->n_unacked++; /* RESET frame take a reference */ + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); /* No */ + + lsquic_stream_acked(stream, QUIC_FRAME_RST_STREAM); + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); /* Still no */ + + /* Stream will linger until we have the offset: */ + assert(stream->sm_qflags & SMQF_WAIT_FIN_OFF); lsquic_stream_destroy(stream); /* This simply checks that the stream got removed from the queue: */ @@ -918,6 +945,13 @@ test_loc_FIN_rem_RST (struct test_objs *tobjs) ack_packet(&tobjs->send_ctl, 1); ack_packet(&tobjs->send_ctl, 2); +#if 0 + /* OK, here we pretend that we sent a RESET and it was acked */ + assert(stream->sm_qflags & SMQF_SEND_RST); + stream->sm_qflags |= SMQF_SEND_RST; + stream->stream_flags +#endif + assert(!TAILQ_EMPTY(&tobjs->conn_pub.service_streams)); assert((stream->sm_qflags & (SMQF_SERVICE_FLAGS)) == (SMQF_CALL_ONCLOSE|SMQF_FREE_STREAM)); @@ -1044,7 +1078,7 @@ test_loc_RST_rem_FIN (struct test_objs *tobjs) assert(!TAILQ_EMPTY(&tobjs->conn_pub.sending_streams)); assert((stream->sm_qflags & SMQF_SENDING_FLAGS) == SMQF_SEND_RST); sss = lsquic_stream_sending_state(stream); - assert(SSS_SEND == sss); /* Reset hasn't been packetized yet */ + assert(SSS_DATA_SENT == sss); /* FIN was packetized */ s = lsquic_stream_frame_in(stream, new_frame_in(tobjs, 0, 90, 1)); assert(s == 0); @@ -1061,9 +1095,12 @@ test_loc_RST_rem_FIN (struct test_objs *tobjs) sss = lsquic_stream_sending_state(stream); assert(SSS_RESET_RECVD == sss); - lsquic_stream_call_on_close(stream); - assert(TAILQ_EMPTY(&tobjs->conn_pub.sending_streams)); + + lsquic_stream_call_on_close(stream); + assert(TAILQ_EMPTY(&tobjs->conn_pub.service_streams)); /* Not acked yet */ + lsquic_stream_acked(stream, QUIC_FRAME_STREAM); + assert(!TAILQ_EMPTY(&tobjs->conn_pub.service_streams)); assert((stream->sm_qflags & SMQF_SERVICE_FLAGS) == SMQF_FREE_STREAM); @@ -1317,7 +1354,12 @@ test_data_flush_on_close (struct test_objs *tobjs) assert(0 == lsquic_send_ctl_n_scheduled(&tobjs->send_ctl)); lsquic_stream_close(stream); - assert(1 == lsquic_send_ctl_n_scheduled(&tobjs->send_ctl)); + /* Nothing is scheduled because STREAM frames are elided */ + assert(0 == lsquic_send_ctl_n_scheduled(&tobjs->send_ctl)); + + assert(stream->sm_qflags & SMQF_SEND_RST); + assert(!(stream->sm_qflags & SMQF_FREE_STREAM)); + assert(stream->sm_qflags & SMQF_WAIT_FIN_OFF); /* We take connection cap hit after stream is flushed: */ assert(0x4000 - 100 == lsquic_conn_cap_avail(cap)); /* Conn cap hit */