Release 1.17.15

- [BUGFIX] http_client: make sure only one read per on_read() callback
  is performed in the header conversion bypass (-B) mode.
- http_client: with -E, assign random priority when stream is created.
- [OPTIMIZATION] On immediate write, place an ACK frame into the first
  buffered packet if an ACK is queued.  This reduces the number of
  standalone ACK packets.
- [OPTIMIZATION] Allow placing more than one STREAM frame from the same
  stream into an outgoing packet.  This change minimizes the number of
  buffered packets required to store several small HTTP messages by
  virtue of allowing more than one STREAM frame from HEADERS stream in
  the same packet.
- [OPTIMIZATION] Flush headers when writing to buffered packets.  This
  causes the headers to be written to the same buffered packet queue,
  thereby improving packet utilization, especially for small HTTP
  messages.
This commit is contained in:
Dmitri Tikhonov 2019-01-21 09:07:02 -05:00
parent 046163b8ab
commit 4d83f5bd45
14 changed files with 401 additions and 241 deletions

View file

@ -1,3 +1,21 @@
2019-01-17
- 1.17.15
- [BUGFIX] http_client: make sure only one read per on_read() callback
is performed in the header conversion bypass (-B) mode.
- http_client: with -E, assign random priority when stream is created.
- [OPTIMIZATION] On immediate write, place an ACK frame into the first
buffered packet if an ACK is queued. This reduces the number of
standalone ACK packets.
- [OPTIMIZATION] Allow placing more than one STREAM frame from the same
stream into an outgoing packet. This change minimizes the number of
buffered packets required to store several small HTTP messages by
virtue of allowing more than one STREAM frame from HEADERS stream in
the same packet.
- [OPTIMIZATION] Flush headers when writing to buffered packets. This
causes the headers to be written to the same buffered packet queue,
thereby improving packet utilization, especially for small HTTP
messages.
2019-01-16
- 1.17.14
- [FEATURE] http_client can now collect stats: time to connect, TTFB,

View file

@ -25,7 +25,7 @@ extern "C" {
#define LSQUIC_MAJOR_VERSION 1
#define LSQUIC_MINOR_VERSION 17
#define LSQUIC_PATCH_VERSION 14
#define LSQUIC_PATCH_VERSION 15
/**
* Engine flags:

View file

@ -87,6 +87,13 @@ struct conn_iface
lsquic_time_t
(*ci_next_tick_time) (struct lsquic_conn *);
int
(*ci_can_write_ack) (struct lsquic_conn *);
/* No return status: best effort */
void
(*ci_write_ack) (struct lsquic_conn *, struct lsquic_packet_out *);
#if LSQUIC_CONN_STATS
const struct conn_stats *
(*ci_get_stats) (struct lsquic_conn *);

View file

@ -903,6 +903,93 @@ reset_ack_state (struct full_conn *conn)
}
#if 1
# define verify_ack_frame(a, b, c)
#else
static void
verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz)
{
unsigned i;
int parsed_len;
struct ack_info *ack_info;
const struct lsquic_packno_range *range;
char ack_buf[512];
unsigned buf_off = 0;
int nw;
ack_info = conn->fc_pub.mm->acki;
parsed_len = parse_ack_frame(buf, bufsz, ack_info);
assert(parsed_len == bufsz);
for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range;
range = lsquic_rechist_next(&conn->fc_rechist), ++i)
{
assert(i < ack_info->n_ranges);
assert(range->high == ack_info->ranges[i].high);
assert(range->low == ack_info->ranges[i].low);
if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
{
nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off,
"[%"PRIu64"-%"PRIu64"]", range->high, range->low);
assert(nw >= 0);
buf_off += nw;
}
}
assert(i == ack_info->n_ranges);
LSQ_DEBUG("Sent ACK frame %s", ack_buf);
}
#endif
static void
full_conn_ci_write_ack (struct lsquic_conn *lconn,
struct lsquic_packet_out *packet_out)
{
struct full_conn *conn = (struct full_conn *) lconn;
lsquic_time_t now;
int has_missing, w;
now = lsquic_time_now();
w = conn->fc_conn.cn_pf->pf_gen_ack_frame(
packet_out->po_data + packet_out->po_data_sz,
lsquic_packet_out_avail(packet_out),
(gaf_rechist_first_f) lsquic_rechist_first,
(gaf_rechist_next_f) lsquic_rechist_next,
(gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv,
&conn->fc_rechist, now, &has_missing, &packet_out->po_ack2ed);
if (w < 0) {
ABORT_ERROR("generating ACK frame failed: %d", errno);
return;
}
#if LSQUIC_CONN_STATS
++conn->fc_stats.out.acks;
#endif
EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf,
packet_out->po_data + packet_out->po_data_sz, w);
verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w);
lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl);
packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK;
lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, w);
packet_out->po_regen_sz += w;
if (has_missing)
conn->fc_flags |= FC_ACK_HAD_MISS;
else
conn->fc_flags &= ~FC_ACK_HAD_MISS;
LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w);
if (conn->fc_conn.cn_version >= LSQVER_039 &&
conn->fc_n_cons_unretx >= 20 &&
!lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl))
{
LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx "
"packets sent", conn->fc_n_cons_unretx);
conn->fc_flags |= FC_SEND_WUF;
}
reset_ack_state(conn);
}
static lsquic_stream_t *
new_stream_ext (struct full_conn *conn, uint32_t stream_id, int if_idx,
enum stream_ctor_flags stream_ctor_flags)
@ -2232,7 +2319,7 @@ generate_rst_stream_frame (struct full_conn *conn, lsquic_stream_t *stream)
lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, sz);
packet_out->po_frame_types |= 1 << QUIC_FRAME_RST_STREAM;
s = lsquic_packet_out_add_stream(packet_out, conn->fc_pub.mm, stream,
QUIC_FRAME_RST_STREAM, 0, 0);
QUIC_FRAME_RST_STREAM, packet_out->po_data_sz, sz);
if (s != 0)
{
ABORT_ERROR("adding stream to packet failed: %s", strerror(errno));
@ -2578,96 +2665,19 @@ process_hsk_stream_write_events (struct full_conn *conn)
}
#if 1
# define verify_ack_frame(a, b, c)
#else
static void
verify_ack_frame (struct full_conn *conn, const unsigned char *buf, int bufsz)
{
unsigned i;
int parsed_len;
struct ack_info *ack_info;
const struct lsquic_packno_range *range;
char ack_buf[512];
unsigned buf_off = 0;
int nw;
ack_info = conn->fc_pub.mm->acki;
parsed_len = parse_ack_frame(buf, bufsz, ack_info);
assert(parsed_len == bufsz);
for (range = lsquic_rechist_first(&conn->fc_rechist), i = 0; range;
range = lsquic_rechist_next(&conn->fc_rechist), ++i)
{
assert(i < ack_info->n_ranges);
assert(range->high == ack_info->ranges[i].high);
assert(range->low == ack_info->ranges[i].low);
if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
{
nw = snprintf(ack_buf + buf_off, sizeof(ack_buf) - buf_off,
"[%"PRIu64"-%"PRIu64"]", range->high, range->low);
assert(nw >= 0);
buf_off += nw;
}
}
assert(i == ack_info->n_ranges);
LSQ_DEBUG("Sent ACK frame %s", ack_buf);
}
#endif
static void
generate_ack_frame (struct full_conn *conn)
{
lsquic_packet_out_t *packet_out;
lsquic_time_t now;
int has_missing, w;
packet_out = lsquic_send_ctl_new_packet_out(&conn->fc_send_ctl, 0);
if (!packet_out)
if (packet_out)
{
ABORT_ERROR("cannot allocate packet: %s", strerror(errno));
return;
lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out);
full_conn_ci_write_ack(&conn->fc_conn, packet_out);
}
lsquic_send_ctl_scheduled_one(&conn->fc_send_ctl, packet_out);
now = lsquic_time_now();
w = conn->fc_conn.cn_pf->pf_gen_ack_frame(
packet_out->po_data + packet_out->po_data_sz,
lsquic_packet_out_avail(packet_out),
(gaf_rechist_first_f) lsquic_rechist_first,
(gaf_rechist_next_f) lsquic_rechist_next,
(gaf_rechist_largest_recv_f) lsquic_rechist_largest_recv,
&conn->fc_rechist, now, &has_missing, &packet_out->po_ack2ed);
if (w < 0) {
ABORT_ERROR("generating ACK frame failed: %d", errno);
return;
}
#if LSQUIC_CONN_STATS
++conn->fc_stats.out.acks;
#endif
EV_LOG_GENERATED_ACK_FRAME(LSQUIC_LOG_CONN_ID, conn->fc_conn.cn_pf,
packet_out->po_data + packet_out->po_data_sz, w);
verify_ack_frame(conn, packet_out->po_data + packet_out->po_data_sz, w);
lsquic_send_ctl_scheduled_ack(&conn->fc_send_ctl);
packet_out->po_frame_types |= 1 << QUIC_FRAME_ACK;
lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, w);
packet_out->po_regen_sz += w;
if (has_missing)
conn->fc_flags |= FC_ACK_HAD_MISS;
else
conn->fc_flags &= ~FC_ACK_HAD_MISS;
LSQ_DEBUG("Put %d bytes of ACK frame into packet on outgoing queue", w);
if (conn->fc_conn.cn_version >= LSQVER_039 &&
conn->fc_n_cons_unretx >= 20 &&
!lsquic_send_ctl_have_outgoing_retx_frames(&conn->fc_send_ctl))
{
LSQ_DEBUG("schedule WINDOW_UPDATE frame after %u non-retx "
"packets sent", conn->fc_n_cons_unretx);
conn->fc_flags |= FC_SEND_WUF;
}
ABORT_ERROR("cannot allocate packet: %s", strerror(errno));
}
@ -2775,6 +2785,14 @@ should_generate_ack (const struct full_conn *conn)
}
static int
full_conn_ci_can_write_ack (struct lsquic_conn *lconn)
{
struct full_conn *conn = (struct full_conn *) lconn;
return should_generate_ack(conn);
}
static enum tick_st
full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
{
@ -2874,7 +2892,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
*/
generate_ack_frame(conn);
CLOSE_IF_NECESSARY();
reset_ack_state(conn);
/* Try to send STOP_WAITING frame at the same time we send an ACK
* This follows reference implementation.
@ -3575,6 +3592,7 @@ static const struct headers_stream_callbacks headers_callbacks =
static const struct headers_stream_callbacks *headers_callbacks_ptr = &headers_callbacks;
static const struct conn_iface full_conn_iface = {
.ci_can_write_ack = full_conn_ci_can_write_ack,
.ci_destroy = full_conn_ci_destroy,
#if LSQUIC_CONN_STATS
.ci_get_stats = full_conn_ci_get_stats,
@ -3588,6 +3606,7 @@ static const struct conn_iface full_conn_iface = {
.ci_packet_not_sent = full_conn_ci_packet_not_sent,
.ci_packet_sent = full_conn_ci_packet_sent,
.ci_tick = full_conn_ci_tick,
.ci_write_ack = full_conn_ci_write_ack,
};
static const struct conn_iface *full_conn_iface_ptr = &full_conn_iface;

View file

@ -31,7 +31,7 @@ static struct stream_rec *
srec_one_posi_first (struct packet_out_srec_iter *posi,
struct lsquic_packet_out *packet_out)
{
if (packet_out->po_srecs.one.sr_frame_types)
if (packet_out->po_srecs.one.sr_frame_type)
return &packet_out->po_srecs.one;
else
return NULL;
@ -53,7 +53,7 @@ srec_arr_posi_next (struct packet_out_srec_iter *posi)
for (; posi->srec_idx < sizeof(posi->cur_srec_arr->srecs) / sizeof(posi->cur_srec_arr->srecs[0]);
++posi->srec_idx)
{
if (posi->cur_srec_arr->srecs[ posi->srec_idx ].sr_frame_types)
if (posi->cur_srec_arr->srecs[ posi->srec_idx ].sr_frame_type)
return &posi->cur_srec_arr->srecs[ posi->srec_idx++ ];
}
posi->cur_srec_arr = TAILQ_NEXT(posi->cur_srec_arr, next_stream_rec_arr);
@ -106,15 +106,9 @@ posi_next (struct packet_out_srec_iter *posi)
}
/* Assumption: there can only be one STREAM and only one RST_STREAM frame
* for a particular stream per packet. The latter is true because a stream
* will only send out one RST_STREAM frame. The former is true because we
* make sure only to place one STREAM frame from a particular stream into a
* packet.
*
/*
* Assumption: frames are added to the packet_out in order of their placement
* in packet_out->po_data. There is an assertion in this function that guards
* for this.
* in packet_out->po_data. There is no assertion to guard for for this.
*/
int
lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out,
@ -123,41 +117,17 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out,
enum QUIC_FRAME_TYPE frame_type,
unsigned short off, unsigned short len)
{
struct packet_out_srec_iter posi;
struct stream_rec_arr *srec_arr;
struct stream_rec *srec;
int last_taken;
unsigned i;
assert(!(new_stream->stream_flags & STREAM_FINISHED));
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
if (srec->sr_stream == new_stream)
{
switch (frame_type)
{
case QUIC_FRAME_STREAM:
assert(!(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)));
srec->sr_frame_types |= (1 << QUIC_FRAME_STREAM);
srec->sr_off = off;
srec->sr_len = len;
break;
default:
assert(QUIC_FRAME_RST_STREAM == frame_type);
assert(!(srec->sr_frame_types & (1 << QUIC_FRAME_RST_STREAM)));
srec->sr_frame_types |= (1 << QUIC_FRAME_RST_STREAM);
break;
}
return 0; /* Update existing record */
}
else if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM) & (1 << frame_type))
assert(srec->sr_off < off); /* Check that STREAM frames are added in order */
if (!(packet_out->po_flags & PO_SREC_ARR))
{
if (!srec_taken(&packet_out->po_srecs.one))
{
packet_out->po_srecs.one.sr_frame_types = (1 << frame_type);
packet_out->po_srecs.one.sr_frame_type = frame_type;
packet_out->po_srecs.one.sr_stream = new_stream;
packet_out->po_srecs.one.sr_off = off;
packet_out->po_srecs.one.sr_len = len;
@ -188,7 +158,7 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out,
if (i < sizeof(srec_arr->srecs) / sizeof(srec_arr->srecs[0]))
{
set_elem:
srec_arr->srecs[i].sr_frame_types = (1 << frame_type);
srec_arr->srecs[i].sr_frame_type = frame_type;
srec_arr->srecs[i].sr_stream = new_stream;
srec_arr->srecs[i].sr_off = off;
srec_arr->srecs[i].sr_len = len;
@ -201,7 +171,7 @@ lsquic_packet_out_add_stream (lsquic_packet_out_t *packet_out,
return -1;
memset(srec_arr, 0, sizeof(*srec_arr));
srec_arr->srecs[0].sr_frame_types = (1 << frame_type);
srec_arr->srecs[0].sr_frame_type = frame_type;
srec_arr->srecs[0].sr_stream = new_stream;
srec_arr->srecs[0].sr_off = off;
srec_arr->srecs[0].sr_len = len;
@ -315,7 +285,7 @@ lsquic_packet_out_elide_reset_stream_frames (lsquic_packet_out_t *packet_out,
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
{
if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM))
if (srec->sr_frame_type == QUIC_FRAME_STREAM)
{
++n_stream_frames;
@ -345,9 +315,8 @@ lsquic_packet_out_elide_reset_stream_frames (lsquic_packet_out_t *packet_out,
packet_out->po_data_sz -= srec->sr_len;
/* See what we can do with the stream */
srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM);
if (!srec_taken(srec))
lsquic_stream_acked(srec->sr_stream);
srec->sr_frame_type = 0;
lsquic_stream_acked(srec->sr_stream);
}
}
}
@ -374,28 +343,11 @@ lsquic_packet_out_chop_regen (lsquic_packet_out_t *packet_out)
packet_out->po_regen_sz = 0;
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
if (srec->sr_frame_types & (1 << QUIC_FRAME_STREAM))
if (srec->sr_frame_type == QUIC_FRAME_STREAM)
srec->sr_off -= delta;
}
int
lsquic_packet_out_has_frame (struct lsquic_packet_out *packet_out,
const struct lsquic_stream *stream,
enum QUIC_FRAME_TYPE frame_type)
{
struct packet_out_srec_iter posi;
struct stream_rec *srec;
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
if (srec->sr_stream == stream &&
srec->sr_frame_types & (1 << frame_type))
return 1;
return 0;
}
int
lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *packet_out)
{
@ -403,7 +355,7 @@ lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *packet_out)
struct stream_rec *srec;
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM))
if (srec->sr_frame_type == QUIC_FRAME_STREAM
&& LSQUIC_STREAM_HANDSHAKE == srec->sr_stream->id)
{
return 1;
@ -439,7 +391,7 @@ split_off_last_frames (struct lsquic_mm *mm, lsquic_packet_out_t *packet_out,
srec->sr_stream, QUIC_FRAME_STREAM,
new_packet_out->po_data_sz, srec->sr_len))
return -1;
srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM);
srec->sr_frame_type = 0;
assert(srec->sr_stream->n_unacked > 1);
--srec->sr_stream->n_unacked;
new_packet_out->po_data_sz += srec->sr_len;
@ -469,7 +421,7 @@ move_largest_frame (struct lsquic_mm *mm, lsquic_packet_out_t *packet_out,
new_packet_out->po_data_sz, max_srec->sr_len))
return -1;
max_srec->sr_frame_types &= ~(1 << QUIC_FRAME_STREAM);
max_srec->sr_frame_type = 0;
assert(max_srec->sr_stream->n_unacked > 1);
--max_srec->sr_stream->n_unacked;
new_packet_out->po_data_sz += max_srec->sr_len;
@ -613,7 +565,7 @@ verify_srecs (lsquic_packet_out_t *packet_out)
for ( ; srec; srec = posi_next(&posi))
{
assert(srec->sr_off == off);
assert(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
off += srec->sr_len;
}
@ -652,7 +604,7 @@ lsquic_packet_out_split_in_two (struct lsquic_mm *mm,
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
{
/* We only expect references to STREAM frames (buffered packets): */
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
if (n_srecs >= n_srecs_alloced)
{
n_srecs_alloced *= 2;
@ -787,7 +739,7 @@ lsquic_packet_out_turn_on_fin (struct lsquic_packet_out *packet_out,
int len;
for (srec = posi_first(&posi, packet_out); srec; srec = posi_next(&posi))
if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM))
if (srec->sr_frame_type == QUIC_FRAME_STREAM
&& srec->sr_stream == stream)
{
len = pf->pf_parse_stream_frame(packet_out->po_data + srec->sr_off,

View file

@ -17,12 +17,10 @@ struct parse_funcs;
/* Each stream_rec is associated with one packet_out. packet_out can have
* zero or more stream_rec structures. stream_rec keeps a pointer to a stream
* that has STREAM or RST_STREAM frames inside packet_out. `sr_frame_types'
* is a bitmask that records which of these two frames are in the packet.
* If this value is zero, values of the other struct members are not valid.
* `sr_off' indicates where inside packet_out->po_data STREAM frame begins
* and `sr_len' is its length. These values are not kept for RST_STREAM
* frames.
* that has STREAM or RST_STREAM frames inside packet_out. `sr_frame_type'
* specifies the type of the frame; if this value is zero, values of the
* other struct members are not valid. `sr_off' indicates where inside
* packet_out->po_data the frame begins and `sr_len' is its length.
*
* We need this information for three reasons:
* 1. A stream is not destroyed until all of its STREAM and RST_STREAM
@ -40,10 +38,10 @@ struct stream_rec {
struct lsquic_stream *sr_stream;
unsigned short sr_off,
sr_len;
enum quic_ft_bit sr_frame_types:16;
enum QUIC_FRAME_TYPE sr_frame_type:16;
};
#define srec_taken(srec) ((srec)->sr_frame_types)
#define srec_taken(srec) ((srec)->sr_frame_type)
struct stream_rec_arr {
TAILQ_ENTRY(stream_rec_arr) next_stream_rec_arr;
@ -229,10 +227,6 @@ lsquic_packet_out_split_in_two (struct lsquic_mm *, lsquic_packet_out_t *,
void
lsquic_packet_out_chop_regen (lsquic_packet_out_t *);
int
lsquic_packet_out_has_frame (struct lsquic_packet_out *,
const struct lsquic_stream *, enum QUIC_FRAME_TYPE);
int
lsquic_packet_out_has_hsk_frames (struct lsquic_packet_out *);

View file

@ -1228,8 +1228,6 @@ lsquic_send_ctl_new_packet_out (lsquic_send_ctl_t *ctl, unsigned need_at_least)
}
/* Do not use for STREAM frames
*/
lsquic_packet_out_t *
lsquic_send_ctl_get_writeable_packet (lsquic_send_ctl_t *ctl,
unsigned need_at_least, int *is_err)
@ -1248,48 +1246,20 @@ lsquic_send_ctl_get_writeable_packet (lsquic_send_ctl_t *ctl,
if (!lsquic_send_ctl_can_send(ctl))
{
*is_err = 0;
if (is_err)
*is_err = 0;
return NULL;
}
packet_out = lsquic_send_ctl_new_packet_out(ctl, need_at_least);
if (packet_out)
lsquic_send_ctl_scheduled_one(ctl, packet_out);
else
else if (is_err)
*is_err = 1;
return packet_out;
}
static lsquic_packet_out_t *
send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl,
unsigned need_at_least, const lsquic_stream_t *stream)
{
lsquic_packet_out_t *packet_out;
assert(need_at_least > 0);
packet_out = lsquic_send_ctl_last_scheduled(ctl);
if (packet_out
&& !(packet_out->po_flags & PO_STREAM_END)
&& lsquic_packet_out_avail(packet_out) >= need_at_least
&& !lsquic_packet_out_has_frame(packet_out, stream, QUIC_FRAME_STREAM))
{
return packet_out;
}
if (!lsquic_send_ctl_can_send(ctl))
return NULL;
packet_out = lsquic_send_ctl_new_packet_out(ctl, need_at_least);
if (!packet_out)
return NULL;
lsquic_send_ctl_scheduled_one(ctl, packet_out);
return packet_out;
}
static void
update_for_resending (lsquic_send_ctl_t *ctl, lsquic_packet_out_t *packet_out)
{
@ -1670,6 +1640,24 @@ send_ctl_max_bpq_count (const lsquic_send_ctl_t *ctl,
}
static void
send_ctl_move_ack (struct lsquic_send_ctl *ctl, struct lsquic_packet_out *dst,
struct lsquic_packet_out *src)
{
assert(dst->po_data_sz == 0);
if (lsquic_packet_out_avail(dst) >= src->po_regen_sz)
{
memcpy(dst->po_data, src->po_data, src->po_regen_sz);
dst->po_data_sz = src->po_regen_sz;
dst->po_regen_sz = src->po_regen_sz;
dst->po_frame_types |= (QFRAME_REGEN_MASK & src->po_frame_types);
src->po_frame_types &= ~QFRAME_REGEN_MASK;
lsquic_packet_out_chop_regen(src);
}
}
static lsquic_packet_out_t *
send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl,
enum buf_packet_type packet_type, unsigned need_at_least,
@ -1677,14 +1665,15 @@ send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl,
{
struct buf_packet_q *const packet_q =
&ctl->sc_buffered_packets[packet_type];
struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn;
lsquic_packet_out_t *packet_out;
enum lsquic_packno_bits bits;
enum { AA_STEAL, AA_GENERATE, AA_NONE, } ack_action;
packet_out = TAILQ_LAST(&packet_q->bpq_packets, lsquic_packets_tailq);
if (packet_out
&& !(packet_out->po_flags & PO_STREAM_END)
&& lsquic_packet_out_avail(packet_out) >= need_at_least
&& !lsquic_packet_out_has_frame(packet_out, stream, QUIC_FRAME_STREAM))
&& lsquic_packet_out_avail(packet_out) >= need_at_least)
{
return packet_out;
}
@ -1693,10 +1682,56 @@ send_ctl_get_buffered_packet (lsquic_send_ctl_t *ctl,
return NULL;
bits = lsquic_send_ctl_guess_packno_bits(ctl);
if (packet_q->bpq_count == 0)
{
/* If ACK was written to the low-priority queue first, steal it */
if (packet_q == &ctl->sc_buffered_packets[BPT_HIGHEST_PRIO]
&& !TAILQ_EMPTY(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets)
&& (TAILQ_FIRST(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets)
->po_frame_types & QUIC_FTBIT_ACK))
{
LSQ_DEBUG("steal ACK frame from low-priority buffered queue");
ack_action = AA_STEAL;
bits = PACKNO_LEN_6;
}
/* If ACK can be generated, write it to the first buffered packet. */
else if (lconn->cn_if->ci_can_write_ack(lconn))
{
LSQ_DEBUG("generate ACK frame for first buffered packet in "
"queue #%u", packet_type);
ack_action = AA_GENERATE;
/* Packet length is set to the largest possible size to guarantee
* that buffered packet with the ACK will not need to be split.
*/
bits = PACKNO_LEN_6;
}
else
goto no_ack_action;
}
else
{
no_ack_action:
ack_action = AA_NONE;
bits = lsquic_send_ctl_guess_packno_bits(ctl);
}
packet_out = send_ctl_allocate_packet(ctl, bits, need_at_least);
if (!packet_out)
return NULL;
switch (ack_action)
{
case AA_STEAL:
send_ctl_move_ack(ctl, packet_out,
TAILQ_FIRST(&ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets));
break;
case AA_GENERATE:
lconn->cn_if->ci_write_ack(lconn, packet_out);
break;
case AA_NONE:
break;
}
TAILQ_INSERT_TAIL(&packet_q->bpq_packets, packet_out, po_next);
++packet_q->bpq_count;
LSQ_DEBUG("Add new packet to buffered queue #%u; count: %u",
@ -1712,7 +1747,7 @@ lsquic_send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl,
enum buf_packet_type packet_type;
if (lsquic_send_ctl_schedule_stream_packets_immediately(ctl))
return send_ctl_get_packet_for_stream(ctl, need_at_least, stream);
return lsquic_send_ctl_get_writeable_packet(ctl, need_at_least, NULL);
else
{
packet_type = send_ctl_lookup_bpt(ctl, stream);
@ -1722,6 +1757,15 @@ lsquic_send_ctl_get_packet_for_stream (lsquic_send_ctl_t *ctl,
}
int
lsquic_send_ctl_buffered_and_same_prio_as_headers (struct lsquic_send_ctl *ctl,
const struct lsquic_stream *stream)
{
return !lsquic_send_ctl_schedule_stream_packets_immediately(ctl)
&& BPT_HIGHEST_PRIO == send_ctl_lookup_bpt(ctl, stream);
}
#ifdef NDEBUG
static
#elif __GNUC__
@ -1817,9 +1861,10 @@ lsquic_send_ctl_schedule_buffered (lsquic_send_ctl_t *ctl,
}
TAILQ_REMOVE(&packet_q->bpq_packets, packet_out, po_next);
--packet_q->bpq_count;
LSQ_DEBUG("Remove packet from buffered queue #%u; count: %u",
packet_type, packet_q->bpq_count);
packet_out->po_packno = send_ctl_next_packno(ctl);
LSQ_DEBUG("Remove packet from buffered queue #%u; count: %u. "
"It becomes packet %"PRIu64, packet_type, packet_q->bpq_count,
packet_out->po_packno);
lsquic_send_ctl_scheduled_one(ctl, packet_out);
}

View file

@ -279,4 +279,8 @@ lsquic_send_ctl_pacer_blocked (struct lsquic_send_ctl *);
int
lsquic_send_ctl_sched_is_blocked (const struct lsquic_send_ctl *);
int
lsquic_send_ctl_buffered_and_same_prio_as_headers (struct lsquic_send_ctl *,
const struct lsquic_stream *);
#endif

View file

@ -1564,6 +1564,25 @@ stream_write_to_packet (struct frame_gen_ctx *fg_ctx, const size_t size)
lsquic_packet_out_t *packet_out;
int len, s, hsk;
if ((stream->stream_flags & (STREAM_HEADERS_SENT|STREAM_HDRS_FLUSHED))
== STREAM_HEADERS_SENT
&& lsquic_send_ctl_buffered_and_same_prio_as_headers(send_ctl, stream))
{
struct lsquic_stream *const headers_stream
= lsquic_headers_stream_get_stream(stream->conn_pub->hs);
if (lsquic_stream_has_data_to_flush(headers_stream))
{
LSQ_DEBUG("flushing headers stream before potential write to a "
"buffered packet");
(void) lsquic_stream_flush(headers_stream);
}
else
/* Some other stream must have flushed it: this means our headers
* are flushed.
*/
stream->stream_flags |= STREAM_HDRS_FLUSHED;
}
stream_header_sz = pf->pf_calc_stream_frame_header_sz(stream->id,
stream->tosend_off);
need_at_least = stream_header_sz + (size > 0);

View file

@ -63,6 +63,7 @@ struct lsquic_stream
STREAM_AUTOSWITCH = (1 <<27),
STREAM_RW_ONCE = (1 <<28), /* When set, read/write events are dispatched once per call */
STREAM_CRITICAL = (1 <<29),
STREAM_HDRS_FLUSHED = (1 <<30), /* Only used in buffered packets mode */
} stream_flags;
/* There are more than one reason that a stream may be put onto

View file

@ -391,6 +391,8 @@ http_client_on_new_stream (void *stream_if_ctx, lsquic_stream_t *stream)
st_h->reader.lsqr_ctx = NULL;
LSQ_INFO("created new stream, path: %s", st_h->path);
lsquic_stream_wantwrite(stream, 1);
if (randomly_reprioritize_streams)
lsquic_stream_set_priority(stream, 1 + (random() & 0xFF));
return st_h;
}
@ -542,28 +544,26 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
srand(GetTickCount());
#endif
if (g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS))
{
hset = lsquic_stream_get_hset(stream);
if (!hset)
{
LSQ_ERROR("could not get header set from stream");
exit(2);
}
st_h->sh_ttfb = lsquic_time_now();
update_sample_stats(&s_stat_ttfb, st_h->sh_ttfb - st_h->sh_created);
if (s_discard_response)
LSQ_DEBUG("discard response: do not dump headers");
else
hset_dump(hset, stdout);
hset_destroy(hset);
st_h->sh_flags |= PROCESSED_HEADERS;
}
do
{
nread = lsquic_stream_read(stream, buf, sizeof(buf));
if (nread > 0)
if (g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS))
{
hset = lsquic_stream_get_hset(stream);
if (!hset)
{
LSQ_ERROR("could not get header set from stream");
exit(2);
}
st_h->sh_ttfb = lsquic_time_now();
update_sample_stats(&s_stat_ttfb, st_h->sh_ttfb - st_h->sh_created);
if (s_discard_response)
LSQ_DEBUG("discard response: do not dump headers");
else
hset_dump(hset, stdout);
hset_destroy(hset);
st_h->sh_flags |= PROCESSED_HEADERS;
}
else if (nread = lsquic_stream_read(stream, buf, sizeof(buf)), nread > 0)
{
s_stat_downloaded_bytes += nread;
if (!g_header_bypass && !(st_h->sh_flags & PROCESSED_HEADERS))
@ -585,7 +585,7 @@ http_client_on_read (lsquic_stream_t *stream, lsquic_stream_ctx_t *st_h)
#endif
lsquic_stream_set_priority(stream, new_prio);
assert(s == 0);
LSQ_NOTICE("changed stream %u priority from %u to %u",
LSQ_DEBUG("changed stream %u priority from %u to %u",
lsquic_stream_id(stream), old_prio, new_prio);
}
}

View file

@ -25,7 +25,7 @@
static const struct parse_funcs *const pf = select_pf_by_ver(LSQVER_035);
static struct {
char buf[0x100];
unsigned char buf[0x1000];
size_t bufsz;
uint64_t off;
} stream_contents;
@ -40,6 +40,15 @@ setup_stream_contents (uint64_t off, const char *str)
}
void
setup_stream_contents_n (uint64_t off, const unsigned char *buf, size_t size)
{
stream_contents.bufsz = size;
stream_contents.off = off;
memcpy(stream_contents.buf, buf, size);
}
int
lsquic_stream_tosend_fin (const lsquic_stream_t *stream)
{
@ -121,6 +130,78 @@ elide_single_stream_frame (void)
}
/* In this test, we check that if the last STREAM frame is moved due to
* elision and PO_STREAM_END is set, the packet size is adjusted. This
* is needed to prevent data corruption for STREAM frames that have
* implicit length.
*/
static void
shrink_packet_post_elision (void)
{
struct packet_out_srec_iter posi;
struct lsquic_engine_public enpub;
lsquic_stream_t streams[2];
lsquic_packet_out_t *packet_out;
const struct stream_rec *srec;
int len, off = 0;
unsigned char stream2_data[0x1000];
memset(stream2_data, '2', sizeof(stream2_data));
memset(streams, 0, sizeof(streams));
memset(&enpub, 0, sizeof(enpub));
lsquic_mm_init(&enpub.enp_mm);
packet_out = lsquic_mm_get_packet_out(&enpub.enp_mm, NULL, QUIC_MAX_PAYLOAD_SZ);
setup_stream_contents(123, "Dude, where is my car?");
len = pf->pf_gen_stream_frame(packet_out->po_data + packet_out->po_data_sz,
lsquic_packet_out_avail(packet_out),
streams[0].id, lsquic_stream_tosend_offset(&streams[0]),
lsquic_stream_tosend_fin(&streams[0]),
lsquic_stream_tosend_sz(&streams[0]),
(gsf_read_f) lsquic_stream_tosend_read,
&streams[0]);
packet_out->po_data_sz += len;
packet_out->po_frame_types |= (1 << QUIC_FRAME_STREAM);
lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[0],
QUIC_FRAME_STREAM, off, len);
/* We want to fill the packet just right so that PO_STREAM_END gets set */
const int exp = lsquic_packet_out_avail(packet_out);
setup_stream_contents_n(0, stream2_data, exp - 2);
len = pf->pf_gen_stream_frame(packet_out->po_data + packet_out->po_data_sz,
lsquic_packet_out_avail(packet_out),
streams[1].id, lsquic_stream_tosend_offset(&streams[1]),
lsquic_stream_tosend_fin(&streams[1]),
lsquic_stream_tosend_sz(&streams[1]),
(gsf_read_f) lsquic_stream_tosend_read,
&streams[1]);
assert(len == exp);
packet_out->po_data_sz += len;
packet_out->po_frame_types |= (1 << QUIC_FRAME_STREAM);
lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[1],
QUIC_FRAME_STREAM, off, len);
assert(0 == lsquic_packet_out_avail(packet_out)); /* Same as len == exp check really */
packet_out->po_flags |= PO_STREAM_END;
assert(1 == streams[0].n_unacked);
assert(1 == streams[1].n_unacked);
assert(posi_first(&posi, packet_out));
streams[0].stream_flags |= STREAM_RST_SENT;
lsquic_packet_out_elide_reset_stream_frames(packet_out, 0);
assert(0 == streams[0].n_unacked);
assert(QUIC_FTBIT_STREAM == packet_out->po_frame_types);
srec = posi_first(&posi, packet_out);
assert(srec->sr_stream == &streams[1]);
assert(packet_out->po_data_sz == exp);
lsquic_packet_out_destroy(packet_out, &enpub, NULL);
lsquic_mm_cleanup(&enpub.enp_mm);
}
/* This test is more involved. We will construct the following packet:
*
* | ACK | STREAM A | STREAM B | STREAM C | RST A | STREAM D | STREAM E
@ -237,7 +318,7 @@ elide_three_stream_frames (int chop_regen)
len = pf->pf_gen_rst_frame(packet_out->po_data + packet_out->po_data_sz,
lsquic_packet_out_avail(packet_out), 'A', 133, 0);
lsquic_packet_out_add_stream(packet_out, &enpub.enp_mm, &streams[0],
QUIC_FRAME_RST_STREAM, 0, 0);
QUIC_FRAME_RST_STREAM, packet_out->po_data_sz, len);
packet_out->po_data_sz += len;
/* STREAM D */
setup_stream_contents(123, "DDDDDDDDDD");
@ -293,17 +374,17 @@ elide_three_stream_frames (int chop_regen)
assert(packet_out->po_frame_types == ((1 << QUIC_FRAME_STREAM) | (1 << QUIC_FRAME_RST_STREAM)));
srec = posi_first(&posi, packet_out);
assert(srec->sr_stream == &streams[0]);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_RST_STREAM));
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[1]);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
assert(srec->sr_off == b_off - (chop_regen ? 5 : 0));
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[0]);
assert(srec->sr_frame_type == QUIC_FRAME_RST_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[3]);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
assert(srec->sr_off == d_off - (chop_regen ? 5 : 0));
srec = posi_next(&posi);
@ -320,6 +401,7 @@ main (void)
{
/* TODO-ENDIAN: test with every PF */
elide_single_stream_frame();
shrink_packet_post_elision();
elide_three_stream_frames(0);
elide_three_stream_frames(1);

View file

@ -54,32 +54,37 @@ main (void)
srec = posi_first(&posi, packet_out);
assert(srec->sr_stream == &streams[0]);
assert(srec->sr_off == 7);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[1]);
assert(srec->sr_off == 8);
assert(srec->sr_frame_types == ((1 << QUIC_FRAME_STREAM)|(1 << QUIC_FRAME_RST_STREAM)));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[2]);
assert(srec->sr_off == 9);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[1]);
assert(srec->sr_off == 10);
assert(srec->sr_frame_type == QUIC_FRAME_RST_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[3]);
assert(srec->sr_off == 11);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[4]);
assert(srec->sr_off == 12);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
srec = posi_next(&posi);
assert(srec->sr_stream == &streams[5]);
assert(srec->sr_off == 13);
assert(srec->sr_frame_types == (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
assert((void *) 0 == posi_next(&posi));

View file

@ -202,7 +202,7 @@ read_from_scheduled_packets (lsquic_send_ctl_t *send_ctl, uint32_t stream_id,
{
if (fullcheck)
{
assert(srec->sr_frame_types & (1 << QUIC_FRAME_STREAM));
assert(srec->sr_frame_type == QUIC_FRAME_STREAM);
if (packet_out->po_packno != 1)
{
/* First packet may contain two stream frames, do not
@ -216,7 +216,7 @@ read_from_scheduled_packets (lsquic_send_ctl_t *send_ctl, uint32_t stream_id,
}
}
}
if ((srec->sr_frame_types & (1 << QUIC_FRAME_STREAM)) &&
if (srec->sr_frame_type == QUIC_FRAME_STREAM &&
srec->sr_stream->id == stream_id)
{
assert(!fin);
@ -264,6 +264,19 @@ struct test_objs {
};
static int
unit_test_doesnt_write_ack (struct lsquic_conn *lconn)
{
return 0;
}
static const struct conn_iface our_conn_if =
{
.ci_can_write_ack = unit_test_doesnt_write_ack,
};
static void
init_test_objs (struct test_objs *tobjs, unsigned initial_conn_window,
unsigned initial_stream_window, const struct parse_funcs *pf)
@ -271,6 +284,7 @@ init_test_objs (struct test_objs *tobjs, unsigned initial_conn_window,
memset(tobjs, 0, sizeof(*tobjs));
tobjs->lconn.cn_pf = pf ? pf : g_pf;
tobjs->lconn.cn_pack_size = 1370;
tobjs->lconn.cn_if = &our_conn_if;
lsquic_mm_init(&tobjs->eng_pub.enp_mm);
TAILQ_INIT(&tobjs->conn_pub.sending_streams);
TAILQ_INIT(&tobjs->conn_pub.read_streams);
@ -1871,8 +1885,8 @@ test_writing_to_stream_schedule_stream_packets_immediately (void)
assert(("9 bytes written correctly", nw == 9));
s = lsquic_stream_flush(stream);
assert(0 == s);
assert(("packetized -- 2 packets now",
2 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl)));
assert(("packetized -- still 1 packet",
1 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl)));
assert(("connection cap is reduced by 23 bytes",
lsquic_conn_cap_avail(conn_cap) == 0x4000 - 23));
@ -1934,7 +1948,7 @@ test_writing_to_stream_outside_callback (void)
assert(("9 bytes written correctly", nw == 9));
s = lsquic_stream_flush(stream);
assert(0 == s);
assert(("packetized -- 2 packets now", 2 == bpq->bpq_count));
assert(("packetized -- still 1 packet", 1 == bpq->bpq_count));
assert(("connection cap is reduced by 23 bytes",
lsquic_conn_cap_avail(conn_cap) == 0x4000 - 23));
@ -1943,8 +1957,8 @@ test_writing_to_stream_outside_callback (void)
g_ctl_settings.tcs_schedule_stream_packets_immediately = 1;
lsquic_send_ctl_schedule_buffered(&tobjs.send_ctl,
g_ctl_settings.tcs_bp_type);
assert(("packetized -- 2 packets now",
2 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl)));
assert(("packetized -- 1 packet",
1 == lsquic_send_ctl_n_scheduled(&tobjs.send_ctl)));
nw = read_from_scheduled_packets(&tobjs.send_ctl, stream->id, buf,
sizeof(buf), 0, NULL, 0);