Add concurrent streams feature for http_client

This commit is contained in:
Stephen Petrides 2019-01-09 17:17:38 -05:00
parent 229fce07a3
commit f00cde3959

View file

@ -45,6 +45,8 @@
#include "../src/liblsquic/lsquic_logger.h" #include "../src/liblsquic/lsquic_logger.h"
#define MIN(a, b) ((a) < (b) ? (a) : (b))
/* This is used to exercise generating and sending of priority frames */ /* This is used to exercise generating and sending of priority frames */
static int randomly_reprioritize_streams; static int randomly_reprioritize_streams;
@ -85,6 +87,7 @@ struct http_client_ctx {
unsigned hcc_total_n_reqs; unsigned hcc_total_n_reqs;
unsigned hcc_reqs_per_conn; unsigned hcc_reqs_per_conn;
unsigned hcc_concurrency; unsigned hcc_concurrency;
unsigned hcc_cc_reqs_per_conn;
unsigned hcc_n_open_conns; unsigned hcc_n_open_conns;
enum { enum {
@ -103,6 +106,10 @@ struct lsquic_conn_ctx {
unsigned ch_n_reqs; /* This number gets decremented as streams are closed and unsigned ch_n_reqs; /* This number gets decremented as streams are closed and
* incremented as push promises are accepted. * incremented as push promises are accepted.
*/ */
unsigned ch_n_cc_streams; /* This number is incremented as streams are opened
* and decremented as streams are closed. It should
* never exceed hcc_cc_reqs_per_conn in client_ctx.
*/
}; };
@ -136,6 +143,18 @@ create_connections (struct http_client_ctx *client_ctx)
} }
static void
create_streams (struct http_client_ctx *client_ctx, lsquic_conn_ctx_t *conn_h)
{
while (conn_h->ch_n_reqs - conn_h->ch_n_cc_streams &&
conn_h->ch_n_cc_streams < client_ctx->hcc_cc_reqs_per_conn)
{
lsquic_conn_make_stream(conn_h->conn);
conn_h->ch_n_cc_streams++;
}
}
static lsquic_conn_ctx_t * static lsquic_conn_ctx_t *
http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn) http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
{ {
@ -143,13 +162,12 @@ http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
lsquic_conn_ctx_t *conn_h = calloc(1, sizeof(*conn_h)); lsquic_conn_ctx_t *conn_h = calloc(1, sizeof(*conn_h));
conn_h->conn = conn; conn_h->conn = conn;
conn_h->client_ctx = client_ctx; conn_h->client_ctx = client_ctx;
conn_h->ch_n_reqs = client_ctx->hcc_total_n_reqs < conn_h->ch_n_reqs = MIN(client_ctx->hcc_total_n_reqs,
client_ctx->hcc_reqs_per_conn ? client_ctx->hcc_reqs_per_conn);
client_ctx->hcc_total_n_reqs : client_ctx->hcc_reqs_per_conn;
client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs; client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs;
TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch); TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch);
++conn_h->client_ctx->hcc_n_open_conns; ++conn_h->client_ctx->hcc_n_open_conns;
lsquic_conn_make_stream(conn); create_streams(client_ctx, conn_h);
return conn_h; return conn_h;
} }
@ -509,20 +527,29 @@ http_client_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
} }
LSQ_INFO("%s called", __func__); LSQ_INFO("%s called", __func__);
struct http_client_ctx *const client_ctx = st_h->client_ctx;
lsquic_conn_t *conn = lsquic_stream_conn(stream); lsquic_conn_t *conn = lsquic_stream_conn(stream);
lsquic_conn_ctx_t *conn_h; lsquic_conn_ctx_t *conn_h;
TAILQ_FOREACH(conn_h, &st_h->client_ctx->conn_ctxs, next_ch) TAILQ_FOREACH(conn_h, &client_ctx->conn_ctxs, next_ch)
if (conn_h->conn == conn) if (conn_h->conn == conn)
break; break;
assert(conn_h); assert(conn_h);
--conn_h->ch_n_reqs; --conn_h->ch_n_reqs;
--conn_h->ch_n_cc_streams;
if (0 == conn_h->ch_n_reqs) if (0 == conn_h->ch_n_reqs)
{ {
LSQ_INFO("all requests completed, closing connection"); LSQ_INFO("all requests completed, closing connection");
lsquic_conn_close(conn_h->conn); lsquic_conn_close(conn_h->conn);
} }
else else
lsquic_conn_make_stream(conn); {
LSQ_INFO("%u active stream, %u request remain, creating %u new stream",
conn_h->ch_n_cc_streams,
conn_h->ch_n_reqs - conn_h->ch_n_cc_streams,
MIN((conn_h->ch_n_reqs - conn_h->ch_n_cc_streams),
(client_ctx->hcc_cc_reqs_per_conn - conn_h->ch_n_cc_streams)));
create_streams(client_ctx, conn_h);
}
if (st_h->reader.lsqr_ctx) if (st_h->reader.lsqr_ctx)
destroy_lsquic_reader_ctx(st_h->reader.lsqr_ctx); destroy_lsquic_reader_ctx(st_h->reader.lsqr_ctx);
free(st_h); free(st_h);
@ -553,8 +580,10 @@ usage (const char *prog)
" -p PATH Path to request. May be specified more than once.\n" " -p PATH Path to request. May be specified more than once.\n"
" -n CONNS Number of concurrent connections. Defaults to 1.\n" " -n CONNS Number of concurrent connections. Defaults to 1.\n"
" -r NREQS Total number of requests to send. Defaults to 1.\n" " -r NREQS Total number of requests to send. Defaults to 1.\n"
" -R NREQS Maximum number of requests per single connection. Some\n" " -R MAXREQS Maximum number of requests per single connection. Some\n"
" connections will have fewer requests than this.\n" " connections will have fewer requests than this.\n"
" -w CONCUR Number of concurrent requests per single connection.\n"
" Defaults to 1.\n"
" -m METHOD Method. Defaults to GET.\n" " -m METHOD Method. Defaults to GET.\n"
" -P PAYLOAD Name of the file that contains payload to be used in the\n" " -P PAYLOAD Name of the file that contains payload to be used in the\n"
" request. This adds two more headers to the request:\n" " request. This adds two more headers to the request:\n"
@ -566,8 +595,8 @@ usage (const char *prog)
" -6 Prefer IPv6 when resolving hostname\n" " -6 Prefer IPv6 when resolving hostname\n"
#ifndef WIN32 #ifndef WIN32
" -C DIR Certificate store. If specified, server certificate will\n" " -C DIR Certificate store. If specified, server certificate will\n"
#endif
" be verified.\n" " be verified.\n"
#endif
, prog); , prog);
} }
@ -779,11 +808,11 @@ main (int argc, char **argv)
TAILQ_INIT(&sports); TAILQ_INIT(&sports);
memset(&client_ctx, 0, sizeof(client_ctx)); memset(&client_ctx, 0, sizeof(client_ctx));
client_ctx.hcc_concurrency = 1;
TAILQ_INIT(&client_ctx.hcc_path_elems); TAILQ_INIT(&client_ctx.hcc_path_elems);
TAILQ_INIT(&client_ctx.conn_ctxs); TAILQ_INIT(&client_ctx.conn_ctxs);
client_ctx.method = "GET"; client_ctx.method = "GET";
client_ctx.hcc_concurrency = 1; client_ctx.hcc_concurrency = 1;
client_ctx.hcc_cc_reqs_per_conn = 1;
client_ctx.hcc_reqs_per_conn = 1; client_ctx.hcc_reqs_per_conn = 1;
client_ctx.hcc_total_n_reqs = 1; client_ctx.hcc_total_n_reqs = 1;
client_ctx.prog = &prog; client_ctx.prog = &prog;
@ -794,7 +823,7 @@ main (int argc, char **argv)
prog_init(&prog, LSENG_HTTP, &sports, &http_client_if, &client_ctx); prog_init(&prog, LSENG_HTTP, &sports, &http_client_if, &client_ctx);
while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:H:p:h" while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:w:H:p:h"
#ifndef WIN32 #ifndef WIN32
"C:" "C:"
#endif #endif
@ -833,6 +862,9 @@ main (int argc, char **argv)
case 'n': case 'n':
client_ctx.hcc_concurrency = atoi(optarg); client_ctx.hcc_concurrency = atoi(optarg);
break; break;
case 'w':
client_ctx.hcc_cc_reqs_per_conn = atoi(optarg);
break;
case 'P': case 'P':
client_ctx.payload = optarg; client_ctx.payload = optarg;
if (0 != stat(optarg, &st)) if (0 != stat(optarg, &st))