From f00cde395912d394fa6bbec5335e31aee687b106 Mon Sep 17 00:00:00 2001 From: Stephen Petrides Date: Wed, 9 Jan 2019 17:17:38 -0500 Subject: [PATCH] Add concurrent streams feature for http_client --- test/http_client.c | 52 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/test/http_client.c b/test/http_client.c index 2aefca3..32dd5ac 100644 --- a/test/http_client.c +++ b/test/http_client.c @@ -45,6 +45,8 @@ #include "../src/liblsquic/lsquic_logger.h" +#define MIN(a, b) ((a) < (b) ? (a) : (b)) + /* This is used to exercise generating and sending of priority frames */ static int randomly_reprioritize_streams; @@ -85,6 +87,7 @@ struct http_client_ctx { unsigned hcc_total_n_reqs; unsigned hcc_reqs_per_conn; unsigned hcc_concurrency; + unsigned hcc_cc_reqs_per_conn; unsigned hcc_n_open_conns; enum { @@ -103,6 +106,10 @@ struct lsquic_conn_ctx { unsigned ch_n_reqs; /* This number gets decremented as streams are closed and * incremented as push promises are accepted. */ + unsigned ch_n_cc_streams; /* This number is incremented as streams are opened + * and decremented as streams are closed. It should + * never exceed hcc_cc_reqs_per_conn in client_ctx. + */ }; @@ -136,6 +143,18 @@ create_connections (struct http_client_ctx *client_ctx) } +static void +create_streams (struct http_client_ctx *client_ctx, lsquic_conn_ctx_t *conn_h) +{ + while (conn_h->ch_n_reqs - conn_h->ch_n_cc_streams && + conn_h->ch_n_cc_streams < client_ctx->hcc_cc_reqs_per_conn) + { + lsquic_conn_make_stream(conn_h->conn); + conn_h->ch_n_cc_streams++; + } +} + + static lsquic_conn_ctx_t * http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) { @@ -143,13 +162,12 @@ http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) lsquic_conn_ctx_t *conn_h = calloc(1, sizeof(*conn_h)); conn_h->conn = conn; conn_h->client_ctx = client_ctx; - conn_h->ch_n_reqs = client_ctx->hcc_total_n_reqs < - client_ctx->hcc_reqs_per_conn ? - client_ctx->hcc_total_n_reqs : client_ctx->hcc_reqs_per_conn; + conn_h->ch_n_reqs = MIN(client_ctx->hcc_total_n_reqs, + client_ctx->hcc_reqs_per_conn); client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs; TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch); ++conn_h->client_ctx->hcc_n_open_conns; - lsquic_conn_make_stream(conn); + create_streams(client_ctx, conn_h); return conn_h; } @@ -509,20 +527,29 @@ http_client_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h) } LSQ_INFO("%s called", __func__); + struct http_client_ctx *const client_ctx = st_h->client_ctx; lsquic_conn_t *conn = lsquic_stream_conn(stream); lsquic_conn_ctx_t *conn_h; - TAILQ_FOREACH(conn_h, &st_h->client_ctx->conn_ctxs, next_ch) + TAILQ_FOREACH(conn_h, &client_ctx->conn_ctxs, next_ch) if (conn_h->conn == conn) break; assert(conn_h); --conn_h->ch_n_reqs; + --conn_h->ch_n_cc_streams; if (0 == conn_h->ch_n_reqs) { LSQ_INFO("all requests completed, closing connection"); lsquic_conn_close(conn_h->conn); } else - lsquic_conn_make_stream(conn); + { + LSQ_INFO("%u active stream, %u request remain, creating %u new stream", + conn_h->ch_n_cc_streams, + conn_h->ch_n_reqs - conn_h->ch_n_cc_streams, + MIN((conn_h->ch_n_reqs - conn_h->ch_n_cc_streams), + (client_ctx->hcc_cc_reqs_per_conn - conn_h->ch_n_cc_streams))); + create_streams(client_ctx, conn_h); + } if (st_h->reader.lsqr_ctx) destroy_lsquic_reader_ctx(st_h->reader.lsqr_ctx); free(st_h); @@ -553,8 +580,10 @@ usage (const char *prog) " -p PATH Path to request. May be specified more than once.\n" " -n CONNS Number of concurrent connections. Defaults to 1.\n" " -r NREQS Total number of requests to send. Defaults to 1.\n" -" -R NREQS Maximum number of requests per single connection. Some\n" +" -R MAXREQS Maximum number of requests per single connection. Some\n" " connections will have fewer requests than this.\n" +" -w CONCUR Number of concurrent requests per single connection.\n" +" Defaults to 1.\n" " -m METHOD Method. Defaults to GET.\n" " -P PAYLOAD Name of the file that contains payload to be used in the\n" " request. This adds two more headers to the request:\n" @@ -566,8 +595,8 @@ usage (const char *prog) " -6 Prefer IPv6 when resolving hostname\n" #ifndef WIN32 " -C DIR Certificate store. If specified, server certificate will\n" -#endif " be verified.\n" +#endif , prog); } @@ -779,11 +808,11 @@ main (int argc, char **argv) TAILQ_INIT(&sports); memset(&client_ctx, 0, sizeof(client_ctx)); - client_ctx.hcc_concurrency = 1; TAILQ_INIT(&client_ctx.hcc_path_elems); TAILQ_INIT(&client_ctx.conn_ctxs); client_ctx.method = "GET"; client_ctx.hcc_concurrency = 1; + client_ctx.hcc_cc_reqs_per_conn = 1; client_ctx.hcc_reqs_per_conn = 1; client_ctx.hcc_total_n_reqs = 1; client_ctx.prog = &prog; @@ -794,7 +823,7 @@ main (int argc, char **argv) prog_init(&prog, LSENG_HTTP, &sports, &http_client_if, &client_ctx); - while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:H:p:h" + while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:w:H:p:h" #ifndef WIN32 "C:" #endif @@ -833,6 +862,9 @@ main (int argc, char **argv) case 'n': client_ctx.hcc_concurrency = atoi(optarg); break; + case 'w': + client_ctx.hcc_cc_reqs_per_conn = atoi(optarg); + break; case 'P': client_ctx.payload = optarg; if (0 != stat(optarg, &st))