mirror of
https://gitea.invidious.io/iv-org/litespeed-quic.git
synced 2024-08-15 00:53:43 +00:00
Merge pull request #57 from sdpetrides/201901091655-concurrent-streams
Add concurrent streams feature for http_client
This commit is contained in:
commit
a07c3aaa16
1 changed files with 42 additions and 10 deletions
|
@ -45,6 +45,8 @@
|
||||||
|
|
||||||
#include "../src/liblsquic/lsquic_logger.h"
|
#include "../src/liblsquic/lsquic_logger.h"
|
||||||
|
|
||||||
|
#define MIN(a, b) ((a) < (b) ? (a) : (b))
|
||||||
|
|
||||||
/* This is used to exercise generating and sending of priority frames */
|
/* This is used to exercise generating and sending of priority frames */
|
||||||
static int randomly_reprioritize_streams;
|
static int randomly_reprioritize_streams;
|
||||||
|
|
||||||
|
@ -85,6 +87,7 @@ struct http_client_ctx {
|
||||||
unsigned hcc_total_n_reqs;
|
unsigned hcc_total_n_reqs;
|
||||||
unsigned hcc_reqs_per_conn;
|
unsigned hcc_reqs_per_conn;
|
||||||
unsigned hcc_concurrency;
|
unsigned hcc_concurrency;
|
||||||
|
unsigned hcc_cc_reqs_per_conn;
|
||||||
unsigned hcc_n_open_conns;
|
unsigned hcc_n_open_conns;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -103,6 +106,10 @@ struct lsquic_conn_ctx {
|
||||||
unsigned ch_n_reqs; /* This number gets decremented as streams are closed and
|
unsigned ch_n_reqs; /* This number gets decremented as streams are closed and
|
||||||
* incremented as push promises are accepted.
|
* incremented as push promises are accepted.
|
||||||
*/
|
*/
|
||||||
|
unsigned ch_n_cc_streams; /* This number is incremented as streams are opened
|
||||||
|
* and decremented as streams are closed. It should
|
||||||
|
* never exceed hcc_cc_reqs_per_conn in client_ctx.
|
||||||
|
*/
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -136,6 +143,18 @@ create_connections (struct http_client_ctx *client_ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
create_streams (struct http_client_ctx *client_ctx, lsquic_conn_ctx_t *conn_h)
|
||||||
|
{
|
||||||
|
while (conn_h->ch_n_reqs - conn_h->ch_n_cc_streams &&
|
||||||
|
conn_h->ch_n_cc_streams < client_ctx->hcc_cc_reqs_per_conn)
|
||||||
|
{
|
||||||
|
lsquic_conn_make_stream(conn_h->conn);
|
||||||
|
conn_h->ch_n_cc_streams++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static lsquic_conn_ctx_t *
|
static lsquic_conn_ctx_t *
|
||||||
http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
|
http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
|
||||||
{
|
{
|
||||||
|
@ -143,13 +162,12 @@ http_client_on_new_conn (void *stream_if_ctx, lsquic_conn_t *conn)
|
||||||
lsquic_conn_ctx_t *conn_h = calloc(1, sizeof(*conn_h));
|
lsquic_conn_ctx_t *conn_h = calloc(1, sizeof(*conn_h));
|
||||||
conn_h->conn = conn;
|
conn_h->conn = conn;
|
||||||
conn_h->client_ctx = client_ctx;
|
conn_h->client_ctx = client_ctx;
|
||||||
conn_h->ch_n_reqs = client_ctx->hcc_total_n_reqs <
|
conn_h->ch_n_reqs = MIN(client_ctx->hcc_total_n_reqs,
|
||||||
client_ctx->hcc_reqs_per_conn ?
|
client_ctx->hcc_reqs_per_conn);
|
||||||
client_ctx->hcc_total_n_reqs : client_ctx->hcc_reqs_per_conn;
|
|
||||||
client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs;
|
client_ctx->hcc_total_n_reqs -= conn_h->ch_n_reqs;
|
||||||
TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch);
|
TAILQ_INSERT_TAIL(&client_ctx->conn_ctxs, conn_h, next_ch);
|
||||||
++conn_h->client_ctx->hcc_n_open_conns;
|
++conn_h->client_ctx->hcc_n_open_conns;
|
||||||
lsquic_conn_make_stream(conn);
|
create_streams(client_ctx, conn_h);
|
||||||
return conn_h;
|
return conn_h;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -509,20 +527,29 @@ http_client_on_close (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
|
||||||
}
|
}
|
||||||
|
|
||||||
LSQ_INFO("%s called", __func__);
|
LSQ_INFO("%s called", __func__);
|
||||||
|
struct http_client_ctx *const client_ctx = st_h->client_ctx;
|
||||||
lsquic_conn_t *conn = lsquic_stream_conn(stream);
|
lsquic_conn_t *conn = lsquic_stream_conn(stream);
|
||||||
lsquic_conn_ctx_t *conn_h;
|
lsquic_conn_ctx_t *conn_h;
|
||||||
TAILQ_FOREACH(conn_h, &st_h->client_ctx->conn_ctxs, next_ch)
|
TAILQ_FOREACH(conn_h, &client_ctx->conn_ctxs, next_ch)
|
||||||
if (conn_h->conn == conn)
|
if (conn_h->conn == conn)
|
||||||
break;
|
break;
|
||||||
assert(conn_h);
|
assert(conn_h);
|
||||||
--conn_h->ch_n_reqs;
|
--conn_h->ch_n_reqs;
|
||||||
|
--conn_h->ch_n_cc_streams;
|
||||||
if (0 == conn_h->ch_n_reqs)
|
if (0 == conn_h->ch_n_reqs)
|
||||||
{
|
{
|
||||||
LSQ_INFO("all requests completed, closing connection");
|
LSQ_INFO("all requests completed, closing connection");
|
||||||
lsquic_conn_close(conn_h->conn);
|
lsquic_conn_close(conn_h->conn);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
lsquic_conn_make_stream(conn);
|
{
|
||||||
|
LSQ_INFO("%u active stream, %u request remain, creating %u new stream",
|
||||||
|
conn_h->ch_n_cc_streams,
|
||||||
|
conn_h->ch_n_reqs - conn_h->ch_n_cc_streams,
|
||||||
|
MIN((conn_h->ch_n_reqs - conn_h->ch_n_cc_streams),
|
||||||
|
(client_ctx->hcc_cc_reqs_per_conn - conn_h->ch_n_cc_streams)));
|
||||||
|
create_streams(client_ctx, conn_h);
|
||||||
|
}
|
||||||
if (st_h->reader.lsqr_ctx)
|
if (st_h->reader.lsqr_ctx)
|
||||||
destroy_lsquic_reader_ctx(st_h->reader.lsqr_ctx);
|
destroy_lsquic_reader_ctx(st_h->reader.lsqr_ctx);
|
||||||
free(st_h);
|
free(st_h);
|
||||||
|
@ -553,8 +580,10 @@ usage (const char *prog)
|
||||||
" -p PATH Path to request. May be specified more than once.\n"
|
" -p PATH Path to request. May be specified more than once.\n"
|
||||||
" -n CONNS Number of concurrent connections. Defaults to 1.\n"
|
" -n CONNS Number of concurrent connections. Defaults to 1.\n"
|
||||||
" -r NREQS Total number of requests to send. Defaults to 1.\n"
|
" -r NREQS Total number of requests to send. Defaults to 1.\n"
|
||||||
" -R NREQS Maximum number of requests per single connection. Some\n"
|
" -R MAXREQS Maximum number of requests per single connection. Some\n"
|
||||||
" connections will have fewer requests than this.\n"
|
" connections will have fewer requests than this.\n"
|
||||||
|
" -w CONCUR Number of concurrent requests per single connection.\n"
|
||||||
|
" Defaults to 1.\n"
|
||||||
" -m METHOD Method. Defaults to GET.\n"
|
" -m METHOD Method. Defaults to GET.\n"
|
||||||
" -P PAYLOAD Name of the file that contains payload to be used in the\n"
|
" -P PAYLOAD Name of the file that contains payload to be used in the\n"
|
||||||
" request. This adds two more headers to the request:\n"
|
" request. This adds two more headers to the request:\n"
|
||||||
|
@ -566,8 +595,8 @@ usage (const char *prog)
|
||||||
" -6 Prefer IPv6 when resolving hostname\n"
|
" -6 Prefer IPv6 when resolving hostname\n"
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
" -C DIR Certificate store. If specified, server certificate will\n"
|
" -C DIR Certificate store. If specified, server certificate will\n"
|
||||||
#endif
|
|
||||||
" be verified.\n"
|
" be verified.\n"
|
||||||
|
#endif
|
||||||
, prog);
|
, prog);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -779,11 +808,11 @@ main (int argc, char **argv)
|
||||||
|
|
||||||
TAILQ_INIT(&sports);
|
TAILQ_INIT(&sports);
|
||||||
memset(&client_ctx, 0, sizeof(client_ctx));
|
memset(&client_ctx, 0, sizeof(client_ctx));
|
||||||
client_ctx.hcc_concurrency = 1;
|
|
||||||
TAILQ_INIT(&client_ctx.hcc_path_elems);
|
TAILQ_INIT(&client_ctx.hcc_path_elems);
|
||||||
TAILQ_INIT(&client_ctx.conn_ctxs);
|
TAILQ_INIT(&client_ctx.conn_ctxs);
|
||||||
client_ctx.method = "GET";
|
client_ctx.method = "GET";
|
||||||
client_ctx.hcc_concurrency = 1;
|
client_ctx.hcc_concurrency = 1;
|
||||||
|
client_ctx.hcc_cc_reqs_per_conn = 1;
|
||||||
client_ctx.hcc_reqs_per_conn = 1;
|
client_ctx.hcc_reqs_per_conn = 1;
|
||||||
client_ctx.hcc_total_n_reqs = 1;
|
client_ctx.hcc_total_n_reqs = 1;
|
||||||
client_ctx.prog = &prog;
|
client_ctx.prog = &prog;
|
||||||
|
@ -794,7 +823,7 @@ main (int argc, char **argv)
|
||||||
|
|
||||||
prog_init(&prog, LSENG_HTTP, &sports, &http_client_if, &client_ctx);
|
prog_init(&prog, LSENG_HTTP, &sports, &http_client_if, &client_ctx);
|
||||||
|
|
||||||
while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:H:p:h"
|
while (-1 != (opt = getopt(argc, argv, PROG_OPTS "46Br:R:IKu:EP:M:n:w:H:p:h"
|
||||||
#ifndef WIN32
|
#ifndef WIN32
|
||||||
"C:"
|
"C:"
|
||||||
#endif
|
#endif
|
||||||
|
@ -833,6 +862,9 @@ main (int argc, char **argv)
|
||||||
case 'n':
|
case 'n':
|
||||||
client_ctx.hcc_concurrency = atoi(optarg);
|
client_ctx.hcc_concurrency = atoi(optarg);
|
||||||
break;
|
break;
|
||||||
|
case 'w':
|
||||||
|
client_ctx.hcc_cc_reqs_per_conn = atoi(optarg);
|
||||||
|
break;
|
||||||
case 'P':
|
case 'P':
|
||||||
client_ctx.payload = optarg;
|
client_ctx.payload = optarg;
|
||||||
if (0 != stat(optarg, &st))
|
if (0 != stat(optarg, &st))
|
||||||
|
|
Loading…
Reference in a new issue