Release 2.19.9

- [FEATURE] Add lsquic_stream_pwritev().  This function allows one to
  reduce the number of system calls required to read a file from disk
  by using lsquic_stream_pwritev() together with preadv(2).
- [BUGFIX] When stream is reset, it is writeable -- let user collect
  the error.
- [BUGFIX] Calculate correct conn flow control if reading ends early.
- [BUGFIX] Remove stream from read and write queues on internal
  shutdown.  This is a regression introduced in 2.19.7.
- [BUGFIX] Swapped arguments in IETF RESET_FRAME generation.
- Turn off mini conn history when compiling with Visual Studio; this
  allows the project to compile on Windows again.
- http_client: Add -3 flag to stop reading from streams early; code
  cleanup.
- Don't use -Werror.
This commit is contained in:
Dmitri Tikhonov 2020-09-08 11:43:03 -04:00
parent 49f1f4f620
commit 2f2f436324
25 changed files with 1545 additions and 85 deletions

View file

@ -86,6 +86,11 @@ struct network_path
#define NP_PEER_SA(path_) ((struct sockaddr *) (path_)->np_peer_addr)
#define NP_IS_IPv6(path_) (AF_INET6 == NP_LOCAL_SA(path_)->sa_family)
struct ack_state
{
uint32_t arr[6];
};
struct conn_iface
{
enum tick_st
@ -259,6 +264,12 @@ struct conn_iface
/* Optional method. It is called when RTO occurs. */
void
(*ci_retx_timeout) (struct lsquic_conn *);
void
(*ci_ack_snapshot) (struct lsquic_conn *, struct ack_state *);
void
(*ci_ack_rollback) (struct lsquic_conn *, struct ack_state *);
};
#define LSCONN_CCE_BITS 3

View file

@ -58,12 +58,12 @@
#include "lsquic_bw_sampler.h"
#include "lsquic_minmax.h"
#include "lsquic_bbr.h"
#include "lsquic_send_ctl.h"
#include "lsquic_set.h"
#include "lsquic_conn_flow.h"
#include "lsquic_sfcw.h"
#include "lsquic_hash.h"
#include "lsquic_conn.h"
#include "lsquic_send_ctl.h"
#include "lsquic_full_conn.h"
#include "lsquic_util.h"
#include "lsquic_qtags.h"

View file

@ -42,7 +42,6 @@
#include "lsquic_bw_sampler.h"
#include "lsquic_minmax.h"
#include "lsquic_bbr.h"
#include "lsquic_send_ctl.h"
#include "lsquic_set.h"
#include "lsquic_malo.h"
#include "lsquic_chsk_stream.h"
@ -66,6 +65,7 @@
#include "lsquic_attq.h"
#include "lsquic_conn.h"
#include "lsquic_send_ctl.h"
#include "lsquic_conn_public.h"
#include "lsquic_ver_neg.h"
#include "lsquic_mini_conn.h"
@ -1492,6 +1492,18 @@ process_padding_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
}
static void
log_conn_flow_control (struct full_conn *conn)
{
LSQ_DEBUG("connection flow cap: wrote: %"PRIu64
"; max: %"PRIu64, conn->fc_pub.conn_cap.cc_sent,
conn->fc_pub.conn_cap.cc_max);
LSQ_DEBUG("connection flow control window: read: %"PRIu64
"; max: %"PRIu64, conn->fc_pub.cfcw.cf_max_recv_off,
conn->fc_pub.cfcw.cf_recv_off);
}
static unsigned
process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
const unsigned char *p, size_t len)
@ -1500,6 +1512,8 @@ process_ping_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
*/
EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID);
LSQ_DEBUG("received PING");
if (conn->fc_flags & FC_SERVER)
log_conn_flow_control(conn);
return 1;
}
@ -2763,6 +2777,8 @@ generate_ping_frame (struct full_conn *conn)
lsquic_send_ctl_incr_pack_sz(&conn->fc_send_ctl, packet_out, sz);
packet_out->po_frame_types |= 1 << QUIC_FRAME_PING;
LSQ_DEBUG("wrote PING frame");
if (!(conn->fc_flags & FC_SERVER))
log_conn_flow_control(conn);
}
@ -3271,6 +3287,53 @@ full_conn_ci_can_write_ack (struct lsquic_conn *lconn)
}
struct full_ack_state
{
enum full_conn_flags conn_flags;
enum alarm_id_bit armed_set;
unsigned n_slack_akbl;
unsigned n_stop_waiting;
};
typedef char ack_state_size[sizeof(struct full_ack_state)
<= sizeof(struct ack_state) ? 1 : - 1];
static void
full_conn_ci_ack_snapshot (struct lsquic_conn *lconn, struct ack_state *opaque)
{
struct full_conn *conn = (struct full_conn *) lconn;
struct full_ack_state *const ack_state = (struct full_ack_state *) opaque;
ack_state->conn_flags = conn->fc_flags;
ack_state->armed_set = conn->fc_alset.as_armed_set;
ack_state->n_slack_akbl = conn->fc_n_slack_akbl;
ack_state->n_stop_waiting
= lsquic_send_ctl_n_stop_waiting(&conn->fc_send_ctl);
LSQ_DEBUG("take ACK snapshot");
}
static void
full_conn_ci_ack_rollback (struct lsquic_conn *lconn, struct ack_state *opaque)
{
struct full_ack_state *const ack_state = (struct full_ack_state *) opaque;
struct full_conn *conn = (struct full_conn *) lconn;
conn->fc_flags &= ~(FC_ACK_HAD_MISS|FC_ACK_QUEUED);
conn->fc_flags |= (FC_ACK_HAD_MISS|FC_ACK_QUEUED)
& ack_state->conn_flags;
conn->fc_alset.as_armed_set &= ~ALBIT_ACK_APP;
conn->fc_alset.as_armed_set |= ALBIT_ACK_APP & ack_state->armed_set;
conn->fc_n_slack_akbl = ack_state->n_slack_akbl;
conn->fc_send_ctl.sc_n_stop_waiting = ack_state->n_stop_waiting;
LSQ_DEBUG("roll back ACK state");
}
/* This should be called before lsquic_alarmset_ring_expired() */
static void
maybe_set_noprogress_alarm (struct full_conn *conn, lsquic_time_t now)
@ -4404,6 +4467,8 @@ static const struct headers_stream_callbacks *headers_callbacks_ptr = &headers_c
static const struct conn_iface full_conn_iface = {
.ci_abort = full_conn_ci_abort,
.ci_abort_error = full_conn_ci_abort_error,
.ci_ack_rollback = full_conn_ci_ack_rollback,
.ci_ack_snapshot = full_conn_ci_ack_snapshot,
.ci_can_write_ack = full_conn_ci_can_write_ack,
.ci_cancel_pending_streams
= full_conn_ci_cancel_pending_streams,

View file

@ -1654,6 +1654,62 @@ generate_timestamp_frame (struct ietf_full_conn *conn,
}
struct ietf_ack_state
{
enum ifull_conn_flags conn_flags;
enum send_flags send_flags;
enum alarm_id_bit armed_set;
unsigned n_slack_akbl;
unsigned n_slack_all;
unsigned char unretx_thresh;
};
typedef char ack_state_size[sizeof(struct ietf_ack_state)
<= sizeof(struct ack_state) ? 1 : - 1];
static void
ietf_full_conn_ci_ack_snapshot (struct lsquic_conn *lconn,
struct ack_state *opaque)
{
struct ietf_full_conn *conn = (struct ietf_full_conn *) lconn;
struct ietf_ack_state *const ack_state = (struct ietf_ack_state *) opaque;
ack_state->conn_flags = conn->ifc_flags;
ack_state->send_flags = conn->ifc_send_flags;
ack_state->armed_set = conn->ifc_alset.as_armed_set;
ack_state->n_slack_akbl = conn->ifc_n_slack_akbl[PNS_APP];
ack_state->n_slack_all = conn->ifc_n_slack_all;
ack_state->unretx_thresh= conn->ifc_ping_unretx_thresh;
LSQ_DEBUG("take ACK snapshot");
}
static void
ietf_full_conn_ci_ack_rollback (struct lsquic_conn *lconn,
struct ack_state *opaque)
{
struct ietf_ack_state *const ack_state = (struct ietf_ack_state *) opaque;
struct ietf_full_conn *conn = (struct ietf_full_conn *) lconn;
conn->ifc_flags &= ~(IFC_ACK_HAD_MISS|IFC_ACK_QUED_APP);
conn->ifc_flags |= (IFC_ACK_HAD_MISS|IFC_ACK_QUED_APP)
& ack_state->conn_flags;
conn->ifc_send_flags &= ~SF_SEND_PING;
conn->ifc_send_flags |= SF_SEND_PING & ack_state->send_flags;
conn->ifc_alset.as_armed_set &= ~ALBIT_ACK_APP;
conn->ifc_alset.as_armed_set |= ALBIT_ACK_APP & ack_state->armed_set;
conn->ifc_n_slack_akbl[PNS_APP] = ack_state->n_slack_akbl;
conn->ifc_n_slack_all = ack_state->n_slack_all;
conn->ifc_ping_unretx_thresh = ack_state->unretx_thresh;
LSQ_DEBUG("roll back ACK state");
}
static int
generate_ack_frame_for_pns (struct ietf_full_conn *conn,
struct lsquic_packet_out *packet_out, enum packnum_space pns,
@ -2262,7 +2318,7 @@ generate_stream_blocked_frame (struct ietf_full_conn *conn,
static int
generate_stop_sending_frame (struct ietf_full_conn *conn,
generate_stop_sending_frame_by_id (struct ietf_full_conn *conn,
lsquic_stream_id_t stream_id, enum http_error_code error_code)
{
struct lsquic_packet_out *packet_out;
@ -2301,6 +2357,21 @@ generate_stop_sending_frame (struct ietf_full_conn *conn,
}
/* Return true if generated, false otherwise */
static int
generate_stop_sending_frame (struct ietf_full_conn *conn,
struct lsquic_stream *stream)
{
if (0 == generate_stop_sending_frame_by_id(conn, stream->id, HEC_NO_ERROR))
{
lsquic_stream_ss_frame_sent(stream);
return 1;
}
else
return 0;
}
static void
generate_stop_sending_frames (struct ietf_full_conn *conn, lsquic_time_t now)
{
@ -2311,7 +2382,7 @@ generate_stop_sending_frames (struct ietf_full_conn *conn, lsquic_time_t now)
while (!STAILQ_EMPTY(&conn->ifc_stream_ids_to_ss))
{
sits = STAILQ_FIRST(&conn->ifc_stream_ids_to_ss);
if (0 == generate_stop_sending_frame(conn, sits->sits_stream_id,
if (0 == generate_stop_sending_frame_by_id(conn, sits->sits_stream_id,
sits->sits_error_code))
{
STAILQ_REMOVE_HEAD(&conn->ifc_stream_ids_to_ss, sits_next);
@ -2614,6 +2685,8 @@ process_stream_ready_to_send (struct ietf_full_conn *conn,
r &= generate_stream_blocked_frame(conn, stream);
if (stream->sm_qflags & SMQF_SEND_RST)
r &= generate_rst_stream_frame(conn, stream);
if (stream->sm_qflags & SMQF_SEND_STOP_SENDING)
r &= generate_stop_sending_frame(conn, stream);
return r;
}
@ -3994,6 +4067,18 @@ generate_connection_close_packet (struct ietf_full_conn *conn)
}
static void
log_conn_flow_control (struct ietf_full_conn *conn)
{
LSQ_DEBUG("connection flow cap: wrote: %"PRIu64
"; max: %"PRIu64, conn->ifc_pub.conn_cap.cc_sent,
conn->ifc_pub.conn_cap.cc_max);
LSQ_DEBUG("connection flow control window: read: %"PRIu64
"; max: %"PRIu64, conn->ifc_pub.cfcw.cf_max_recv_off,
conn->ifc_pub.cfcw.cf_recv_off);
}
static void
generate_ping_frame (struct ietf_full_conn *conn, lsquic_time_t unused)
{
@ -4023,6 +4108,8 @@ generate_ping_frame (struct ietf_full_conn *conn, lsquic_time_t unused)
packet_out->po_frame_types |= 1 << QUIC_FRAME_PING;
LSQ_DEBUG("wrote PING frame");
conn->ifc_send_flags &= ~SF_SEND_PING;
if (!(conn->ifc_flags & IFC_SERVER))
log_conn_flow_control(conn);
}
@ -5268,6 +5355,8 @@ process_ping_frame (struct ietf_full_conn *conn,
*/
EV_LOG_PING_FRAME_IN(LSQUIC_LOG_CONN_ID);
LSQ_DEBUG("received PING");
if (conn->ifc_flags & IFC_SERVER)
log_conn_flow_control(conn);
return 1;
}
@ -7802,6 +7891,8 @@ ietf_full_conn_ci_count_garbage (struct lsquic_conn *lconn, size_t garbage_sz)
#define IETF_FULL_CONN_FUNCS \
.ci_abort = ietf_full_conn_ci_abort, \
.ci_abort_error = ietf_full_conn_ci_abort_error, \
.ci_ack_snapshot = ietf_full_conn_ci_ack_snapshot, \
.ci_ack_rollback = ietf_full_conn_ci_ack_rollback, \
.ci_retire_cid = ietf_full_conn_ci_retire_cid, \
.ci_can_write_ack = ietf_full_conn_ci_can_write_ack, \
.ci_cancel_pending_streams = ietf_full_conn_ci_cancel_pending_streams, \

View file

@ -20,7 +20,7 @@ struct lsquic_packet_out;
struct lsquic_engine_public;
#ifndef LSQUIC_KEEP_MINICONN_HISTORY
# ifndef NDEBUG
# if !defined(NDEBUG) && !defined(_MSC_VER)
# define LSQUIC_KEEP_MINICONN_HISTORY 1
# else
# define LSQUIC_KEEP_MINICONN_HISTORY 0

View file

@ -105,6 +105,11 @@ struct parse_funcs
int
(*pf_parse_crypto_frame) (const unsigned char *buf, size_t rem_packet_sz,
struct stream_frame *);
/* Return true if STREAM frame extends to the end of the packet and thus
* does not contain a Length field (no update).
*/
int
(*pf_dec_stream_frame_size) (unsigned char *buf, size_t new_size);
int
(*pf_parse_ack_frame) (const unsigned char *buf, size_t buf_len,
struct ack_info *ack_info, uint8_t exp);

View file

@ -318,6 +318,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q046 =
.pf_gen_stream_frame = lsquic_gquic_be_gen_stream_frame,
.pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic,
.pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame,
.pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size,
.pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame,
.pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame,
.pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame,

View file

@ -848,6 +848,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q050 =
.pf_gen_stream_frame = lsquic_gquic_be_gen_stream_frame,
.pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic,
.pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame,
.pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size,
.pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame,
.pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame,
.pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame,

View file

@ -293,6 +293,26 @@ lsquic_gquic_be_gen_stream_frame (unsigned char *buf, size_t buf_len,
return p - buf;
}
int
lsquic_gquic_be_dec_stream_frame_size (unsigned char *buf, size_t new_size)
{
/* 1fdoooss */
const unsigned char type = buf[0];
if (!(type & 0x20))
return 1;
const unsigned offset_len = ((type >> 2) & 7) + 1 - !((type >> 2) & 7);
const unsigned stream_id_len = 1 + (type & 3);
uint16_t len = new_size;
#if __BYTE_ORDER == __LITTLE_ENDIAN
len = bswap_16(len);
#endif
memcpy(buf + 1 + offset_len + stream_id_len, &len, 2);
return 0;
}
/* return parsed (used) buffer length */
int
@ -1032,6 +1052,7 @@ const struct parse_funcs lsquic_parse_funcs_gquic_Q043 =
.pf_calc_stream_frame_header_sz = lsquic_calc_stream_frame_header_sz_gquic,
.pf_parse_stream_frame = lsquic_gquic_be_parse_stream_frame,
.pf_parse_ack_frame = lsquic_gquic_be_parse_ack_frame,
.pf_dec_stream_frame_size = lsquic_gquic_be_dec_stream_frame_size,
.pf_gen_ack_frame = lsquic_gquic_be_gen_ack_frame,
.pf_gen_stop_waiting_frame = lsquic_gquic_be_gen_stop_waiting_frame,
.pf_parse_stop_waiting_frame = lsquic_gquic_be_parse_stop_waiting_frame,

View file

@ -111,4 +111,7 @@ lsquic_gquic_be_gen_ack_frame (unsigned char *outbuf, size_t outbuf_sz,
void *rechist, lsquic_time_t now, int *has_missing, lsquic_packno_t *,
const uint64_t *);
int
lsquic_gquic_be_dec_stream_frame_size (unsigned char *buf, size_t new_size);
#endif

View file

@ -472,6 +472,38 @@ ietf_v1_gen_crypto_frame (unsigned char *buf, size_t buf_len,
}
static int
ietf_v1_dec_stream_frame_size (unsigned char *p, size_t new_size)
{
/* 0b00001XXX
* 0x4 OFF
* 0x2 LEN
* 0x1 FIN
*/
unsigned bits;
const char type = *p++;
if (!(type & 0x2))
return 1;
/* Stream ID */
bits = *p >> 6;
p += 1 << bits;
if (type & 0x4)
{
/* Offset */
bits = *p >> 6;
p += 1 << bits;
}
/* Write new size */
bits = *p >> 6;
vint_write(p, new_size, bits, 1 << bits);
return 0;
}
/* return parsed (used) buffer length */
static int
ietf_v1_parse_stream_frame (const unsigned char *buf, size_t rem_packet_sz,
@ -697,7 +729,7 @@ ietf_v1_rst_frame_size (lsquic_stream_id_t stream_id, uint64_t error_code,
static int
ietf_v1_gen_rst_frame (unsigned char *buf, size_t buf_len,
lsquic_stream_id_t stream_id, uint64_t error_code, uint64_t final_size)
lsquic_stream_id_t stream_id, uint64_t final_size, uint64_t error_code)
{
unsigned vbits;
unsigned char *p;
@ -2122,6 +2154,7 @@ const struct parse_funcs lsquic_parse_funcs_ietf_v1 =
.pf_gen_stream_frame = ietf_v1_gen_stream_frame,
.pf_calc_stream_frame_header_sz = ietf_v1_calc_stream_frame_header_sz,
.pf_parse_stream_frame = ietf_v1_parse_stream_frame,
.pf_dec_stream_frame_size = ietf_v1_dec_stream_frame_size,
.pf_parse_ack_frame = ietf_v1_parse_ack_frame,
.pf_gen_ack_frame = ietf_v1_gen_ack_frame,
.pf_gen_blocked_frame = ietf_v1_gen_blocked_frame,

View file

@ -30,7 +30,6 @@
#include "lsquic_bw_sampler.h"
#include "lsquic_minmax.h"
#include "lsquic_bbr.h"
#include "lsquic_send_ctl.h"
#include "lsquic_util.h"
#include "lsquic_sfcw.h"
#include "lsquic_varint.h"
@ -40,6 +39,7 @@
#include "lsquic_ver_neg.h"
#include "lsquic_ev_log.h"
#include "lsquic_conn.h"
#include "lsquic_send_ctl.h"
#include "lsquic_conn_flow.h"
#include "lsquic_conn_public.h"
#include "lsquic_cong_ctl.h"
@ -482,6 +482,9 @@ set_retx_alarm (struct lsquic_send_ctl *ctl, enum packnum_space pns,
#define SC_PACK_SIZE(ctl_) (+(ctl_)->sc_conn_pub->path->np_pack_size)
/* XXX can we optimize this by caching the value of this function? It should
* not change within one tick.
*/
static lsquic_time_t
send_ctl_transfer_time (void *ctx)
{
@ -3522,3 +3525,186 @@ lsquic_send_ctl_disable_ecn (struct lsquic_send_ctl *ctl)
TAILQ_FOREACH(packet_out, &ctl->sc_scheduled_packets, po_next)
lsquic_packet_out_set_ecn(packet_out, ECN_NOT_ECT);
}
void
lsquic_send_ctl_snapshot (struct lsquic_send_ctl *ctl,
struct send_ctl_state *ctl_state)
{
struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn;
int buffered, repace;
buffered = !lsquic_send_ctl_schedule_stream_packets_immediately(ctl);
repace = !buffered && (ctl->sc_flags & SC_PACE);
if (repace)
ctl_state->pacer = ctl->sc_pacer;
if (buffered)
{
lconn->cn_if->ci_ack_snapshot(lconn, &ctl_state->ack_state);
ctl_state->buf_counts[BPT_OTHER_PRIO]
= ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_count;
ctl_state->buf_counts[BPT_HIGHEST_PRIO]
= ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count;
}
}
static void
send_ctl_repace (struct lsquic_send_ctl *ctl, const struct pacer *pacer,
unsigned count)
{
unsigned n;
int in_rec;
LSQ_DEBUG("repace, count: %u", count);
ctl->sc_pacer = *pacer;
in_rec = send_ctl_in_recovery(ctl);
for (n = 0; n < count; ++n)
lsquic_pacer_packet_scheduled(&ctl->sc_pacer,
ctl->sc_n_in_flight_retx + ctl->sc_n_scheduled + n, in_rec,
send_ctl_transfer_time, ctl);
}
void
lsquic_send_ctl_rollback (struct lsquic_send_ctl *ctl,
struct send_ctl_state *ctl_state, const struct iovec *last_iov,
size_t shortfall)
{
struct lsquic_conn *const lconn = ctl->sc_conn_pub->lconn;
struct lsquic_packet_out *packet_out, *next;
struct lsquic_packets_tailq *packets;
struct stream_frame stream_frame;
struct packet_out_frec_iter pofi;
enum buf_packet_type packet_type;
unsigned orig_count, new_count;
enum quic_ft_bit lost_types;
int buffered, repace, len, to_end;
unsigned short prev_frec_len;
struct frame_rec *frec;
buffered = !lsquic_send_ctl_schedule_stream_packets_immediately(ctl);
repace = !buffered && (ctl->sc_flags & SC_PACE);
if (!buffered)
{
orig_count = ctl->sc_n_scheduled;
packets = &ctl->sc_scheduled_packets;
packet_type = 0; /* Not necessary, but compiler complains */
}
else if (ctl_state->buf_counts[BPT_HIGHEST_PRIO]
< ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count)
{
packets = &ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_packets;
orig_count = ctl->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_count;
packet_type = BPT_HIGHEST_PRIO;
}
else
{
packets = &ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets;
orig_count = ctl->sc_buffered_packets[BPT_OTHER_PRIO].bpq_count;
packet_type = BPT_OTHER_PRIO;
}
/* Now find last packet: */
TAILQ_FOREACH(packet_out, packets, po_next)
if ((unsigned char *) last_iov->iov_base >= packet_out->po_data
&& (unsigned char *) last_iov->iov_base
< packet_out->po_data + packet_out->po_data_sz)
break;
if (!packet_out)
{
lconn->cn_if->ci_internal_error(lconn,
"rollback failed: cannot find packet");
return;
}
for (frec = lsquic_pofi_first(&pofi, packet_out); frec;
frec = lsquic_pofi_next(&pofi))
if (frec->fe_frame_type == QUIC_FRAME_STREAM
/* At the time of this writing, pwritev() generates a single STREAM
* frame per packet. To keep code future-proof, we use an extra
* check.
*/
&& (unsigned char *) last_iov->iov_base
> packet_out->po_data + frec->fe_off
&& (unsigned char *) last_iov->iov_base
< packet_out->po_data + frec->fe_off + frec->fe_len)
break;
if (!frec)
{
lconn->cn_if->ci_internal_error(lconn,
"rollback failed: cannot find frame record");
return;
}
/* Strictly less because of the STREAM frame header */
assert(last_iov->iov_len < frec->fe_len);
len = lconn->cn_pf->pf_parse_stream_frame(
packet_out->po_data + frec->fe_off, frec->fe_len, &stream_frame);
if (len < 0)
{
lconn->cn_if->ci_internal_error(lconn,
"error parsing own STREAM frame");
return;
}
if (stream_frame.data_frame.df_size > last_iov->iov_len - shortfall)
{
packet_out->po_data_sz = (unsigned char *) last_iov->iov_base
+ last_iov->iov_len - shortfall - packet_out->po_data;
prev_frec_len = frec->fe_len;
frec->fe_len = packet_out->po_data_sz - frec->fe_off;
to_end = lconn->cn_pf->pf_dec_stream_frame_size(
packet_out->po_data + frec->fe_off,
stream_frame.data_frame.df_size - (prev_frec_len - frec->fe_len));
if (to_end)
{ /* A frame that's too short may be generated when pwritev runs out
* of iovecs. In that case, we adjust it here.
*/
if (!(packet_out->po_flags & PO_STREAM_END))
LSQ_DEBUG("set stream-end flag on truncated packet");
packet_out->po_flags |= PO_STREAM_END;
}
if (!buffered)
ctl->sc_bytes_scheduled -= prev_frec_len - frec->fe_len;
}
else
assert(stream_frame.data_frame.df_size
== last_iov->iov_len - shortfall);
/* Drop any frames that follow */
for (frec = lsquic_pofi_next(&pofi); frec; frec = lsquic_pofi_next(&pofi))
frec->fe_frame_type = 0;
/* Return unused packets */
new_count = orig_count;
lost_types = 0;
for (packet_out = TAILQ_NEXT(packet_out, po_next); packet_out != NULL;
packet_out = next)
{
next = TAILQ_NEXT(packet_out, po_next);
--new_count;
lost_types |= packet_out->po_frame_types;
/* Undo lsquic_send_ctl_get_packet_for_stream() */
if (!buffered)
send_ctl_sched_remove(ctl, packet_out);
else
{
TAILQ_REMOVE(packets, packet_out, po_next);
--ctl->sc_buffered_packets[packet_type].bpq_count;
}
send_ctl_destroy_packet(ctl, packet_out);
}
if (new_count < orig_count && repace)
send_ctl_repace(ctl, &ctl_state->pacer, new_count);
if (buffered && (lost_types & QUIC_FTBIT_ACK))
lconn->cn_if->ci_ack_rollback(lconn, &ctl_state->ack_state);
}

View file

@ -412,4 +412,18 @@ lsquic_send_ctl_can_send_probe (const struct lsquic_send_ctl *,
void
lsquic_send_ctl_disable_ecn (struct lsquic_send_ctl *);
struct send_ctl_state
{
struct pacer pacer;
struct ack_state ack_state;
unsigned buf_counts[BPT_OTHER_PRIO + 1];
};
void
lsquic_send_ctl_snapshot (struct lsquic_send_ctl *, struct send_ctl_state *);
void
lsquic_send_ctl_rollback (struct lsquic_send_ctl *, struct send_ctl_state *,
const struct iovec *, size_t);
#endif

View file

@ -93,8 +93,15 @@ stream_wantread (lsquic_stream_t *stream, int is_want);
static int
stream_wantwrite (lsquic_stream_t *stream, int is_want);
enum stream_write_options
{
SWO_BUFFER = 1 << 0, /* Allow buffering in sm_buf */
};
static ssize_t
stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t,
enum stream_write_options);
static ssize_t
save_to_buffer (lsquic_stream_t *, struct lsquic_reader *, size_t len);
@ -181,6 +188,7 @@ enum stream_history_event
SHE_EMPTY = '\0', /* Special entry. No init besides memset required */
SHE_PLUS = '+', /* Special entry: previous event occured more than once */
SHE_REACH_FIN = 'a',
SHE_EARLY_READ_STOP = 'A',
SHE_BLOCKED_OUT = 'b',
SHE_CREATED = 'C',
SHE_FRAME_IN = 'd',
@ -197,13 +205,15 @@ enum stream_history_event
SHE_ONCLOSE_CALL = 'L',
SHE_ONNEW = 'N',
SHE_SET_PRIO = 'p',
SHE_SHORT_WRITE = 'q',
SHE_USER_READ = 'r',
SHE_SHUTDOWN_READ = 'R',
SHE_RST_IN = 's',
SHE_SS_IN = 'S',
SHE_STOP_SENDIG_IN = 'S',
SHE_RST_OUT = 't',
SHE_RST_ACKED = 'T',
SHE_FLUSH = 'u',
SHE_STOP_SENDIG_OUT = 'U',
SHE_USER_WRITE_DATA = 'w',
SHE_SHUTDOWN_WRITE = 'W',
SHE_CLOSE = 'X',
@ -643,10 +653,13 @@ stream_is_finished (const lsquic_stream_t *stream)
* stream are outstanding:
*/
&& 0 == stream->n_unacked
&& 0 == (stream->sm_qflags & (
/* This checks that no packets that reference this stream will
* become outstanding:
*/
&& 0 == (stream->sm_qflags & SMQF_SEND_RST)
SMQF_SEND_RST
/* Can't finish stream until all "self" flags are unset: */
| SMQF_SELF_FLAGS))
&& ((stream->stream_flags & STREAM_FORCE_FINISH)
|| (stream->stream_flags & (STREAM_FIN_SENT |STREAM_RST_SENT)));
}
@ -822,6 +835,22 @@ lsquic_stream_readable (struct lsquic_stream *stream)
}
static int
stream_writeable (struct lsquic_stream *stream)
{
/* A stream is writeable if one of the following is true: */
return
/* - The stream is reset, by either side. In this case,
* lsquic_stream_write() will return -1 (we want the user to be
* able to collect the error).
*/
lsquic_stream_is_reset(stream)
/* - Data can be written to stream: */
|| lsquic_stream_write_avail(stream)
;
}
static size_t
stream_write_avail_no_frames (struct lsquic_stream *stream)
{
@ -1008,6 +1037,7 @@ lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
{
SM_HISTORY_APPEND(stream, SHE_FIN_IN);
stream->stream_flags |= STREAM_FIN_RECVD;
stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF;
stream->sm_fin_off = DF_END(frame);
maybe_finish_stream(stream);
}
@ -1127,10 +1157,22 @@ lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
lsquic_sfcw_consume_rem(&stream->fc);
drop_frames_in(stream);
drop_buffered_data(stream);
maybe_elide_stream_frames(stream);
if (!(stream->stream_flags & (STREAM_RST_SENT|STREAM_FIN_SENT))
if (!(stream->sm_bflags & SMBF_IETF))
{
drop_buffered_data(stream);
maybe_elide_stream_frames(stream);
}
if (stream->sm_qflags & SMQF_WAIT_FIN_OFF)
{
stream->sm_qflags &= ~SMQF_WAIT_FIN_OFF;
LSQ_DEBUG("final offset is now known: %"PRIu64, offset);
}
if (!(stream->stream_flags &
(STREAM_RST_SENT|STREAM_SS_SENT|STREAM_FIN_SENT))
&& !(stream->sm_bflags & SMBF_IETF)
&& !(stream->sm_qflags & SMQF_SEND_RST))
lsquic_stream_reset_ext(stream, 7 /* QUIC_RST_ACKNOWLEDGEMENT */, 0);
@ -1153,7 +1195,7 @@ lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
return;
}
SM_HISTORY_APPEND(stream, SHE_SS_IN);
SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_IN);
stream->stream_flags |= STREAM_SS_RECVD;
/* Let user collect error: */
@ -1577,11 +1619,62 @@ lsquic_stream_read (lsquic_stream_t *stream, void *buf, size_t len)
}
void
lsquic_stream_ss_frame_sent (struct lsquic_stream *stream)
{
assert(stream->sm_qflags & SMQF_SEND_STOP_SENDING);
SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_OUT);
stream->sm_qflags &= ~SMQF_SEND_STOP_SENDING;
stream->stream_flags |= STREAM_SS_SENT;
if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
TAILQ_REMOVE(&stream->conn_pub->sending_streams, stream, next_send_stream);
}
static void
handle_early_read_shutdown_ietf (struct lsquic_stream *stream)
{
if (!(stream->sm_qflags & SMQF_SENDING_FLAGS))
TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream,
next_send_stream);
stream->sm_qflags |= SMQF_SEND_STOP_SENDING|SMQF_WAIT_FIN_OFF;
}
static void
handle_early_read_shutdown_gquic (struct lsquic_stream *stream)
{
if (!(stream->stream_flags & STREAM_RST_SENT))
{
lsquic_stream_reset_ext(stream, 7 /* QUIC_STREAM_CANCELLED */, 0);
stream->sm_qflags |= SMQF_WAIT_FIN_OFF;
}
}
static void
handle_early_read_shutdown (struct lsquic_stream *stream)
{
if (stream->sm_bflags & SMBF_IETF)
handle_early_read_shutdown_ietf(stream);
else
handle_early_read_shutdown_gquic(stream);
}
static void
stream_shutdown_read (lsquic_stream_t *stream)
{
if (!(stream->stream_flags & STREAM_U_READ_DONE))
{
if (!(stream->stream_flags & STREAM_FIN_REACHED))
{
LSQ_DEBUG("read shut down before reading FIN. (FIN received: %d)",
!!(stream->stream_flags & STREAM_FIN_RECVD));
SM_HISTORY_APPEND(stream, SHE_EARLY_READ_STOP);
if (!(stream->stream_flags & (STREAM_FIN_RECVD|STREAM_RST_RECVD)))
handle_early_read_shutdown(stream);
}
SM_HISTORY_APPEND(stream, SHE_SHUTDOWN_READ);
stream->stream_flags |= STREAM_U_READ_DONE;
stream->sm_readable = stream_readable_discard;
@ -1625,7 +1718,9 @@ stream_shutdown_write (lsquic_stream_t *stream)
if (!(stream->sm_bflags & SMBF_CRYPTO)
&& !(stream->stream_flags & (STREAM_FIN_SENT|STREAM_RST_SENT))
&& !stream_is_incoming_unidir(stream)
&& !(stream->sm_qflags & SMQF_SEND_RST))
/* In gQUIC, receiving a RESET means "stop sending" */
&& !(!(stream->sm_qflags & SMBF_IETF)
&& (stream->stream_flags & STREAM_RST_RECVD)))
{
if ((stream->sm_bflags & SMBF_USE_HEADERS)
&& !(stream->stream_flags & STREAM_HEADERS_SENT))
@ -1705,6 +1800,8 @@ lsquic_stream_shutdown_internal (lsquic_stream_t *stream)
{
LSQ_DEBUG("internal shutdown");
stream->stream_flags |= STREAM_U_READ_DONE|STREAM_U_WRITE_DONE;
stream_wantwrite(stream, 0);
stream_wantread(stream, 0);
if (lsquic_stream_is_critical(stream))
{
LSQ_DEBUG("add flag to force-finish special stream");
@ -2054,7 +2151,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream)
stream->stream_flags |= STREAM_LAST_WRITE_OK;
while ((stream->sm_qflags & SMQF_WANT_WRITE)
&& (stream->stream_flags & STREAM_LAST_WRITE_OK)
&& lsquic_stream_write_avail(stream))
&& stream_writeable(stream))
{
progress = stream_progress(stream);
@ -2173,7 +2270,7 @@ lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
if (stream->sm_bflags & SMBF_RW_ONCE)
{
if ((stream->sm_qflags & SMQF_WANT_WRITE)
&& lsquic_stream_write_avail(stream))
&& stream_writeable(stream))
{
on_write = select_on_write(stream);
on_write(stream, stream->st_ctx);
@ -2229,7 +2326,7 @@ stream_flush (lsquic_stream_t *stream)
empty_reader.lsqr_size = inner_reader_empty_size;
empty_reader.lsqr_read = inner_reader_empty_read;
empty_reader.lsqr_ctx = NULL; /* pro forma */
nw = stream_write_to_packets(stream, &empty_reader, 0);
nw = stream_write_to_packets(stream, &empty_reader, 0, SWO_BUFFER);
if (nw >= 0)
{
@ -2655,6 +2752,27 @@ stream_activate_hq_frame (struct lsquic_stream *stream, uint64_t off,
}
struct hq_arr
{
unsigned char **p;
unsigned count;
unsigned max;
};
static int
save_hq_ptr (struct hq_arr *hq_arr, void *p)
{
if (hq_arr->count < hq_arr->max)
{
hq_arr->p[hq_arr->count++] = p;
return 0;
}
else
return -1;
}
static size_t
frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
{
@ -2708,6 +2826,11 @@ frame_hq_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
if (0 == (shf->shf_flags & (SHF_FIXED_SIZE|SHF_PHANTOM)))
{
shf->shf_frame_ptr = p;
if (stream->sm_hq_arr && 0 != save_hq_ptr(stream->sm_hq_arr, p))
{
stream_hq_frame_put(stream, shf);
break;
}
memset(p, 0, frame_sz);
p += frame_sz;
}
@ -3079,7 +3202,7 @@ maybe_close_varsize_hq_frame (struct lsquic_stream *stream)
static ssize_t
stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
size_t thresh)
size_t thresh, enum stream_write_options swo)
{
size_t size;
ssize_t nw;
@ -3146,7 +3269,7 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
if (use_framing && seen_ok)
maybe_close_varsize_hq_frame(stream);
if (thresh)
if (thresh && (swo & SWO_BUFFER))
{
assert(size < thresh);
assert(size >= stream->sm_n_buffered);
@ -3159,7 +3282,8 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
fg_ctx.fgc_nread_from_reader += nw; /* Make this cleaner? */
}
}
else
#ifndef NDEBUG
else if (swo & SWO_BUFFER)
{
/* We count flushed data towards both stream and connection limits,
* so we should have been able to packetize all of it:
@ -3167,6 +3291,7 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
assert(0 == stream->sm_n_buffered);
assert(size == 0);
}
#endif
maybe_mark_as_blocked(stream);
@ -3333,7 +3458,8 @@ save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
static ssize_t
stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader,
enum stream_write_options swo)
{
const struct stream_hq_frame *shf;
size_t thresh, len, frames, total_len, n_allowed, nwritten;
@ -3354,6 +3480,8 @@ stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
n_allowed = stream_get_n_allowed(stream);
if (total_len <= n_allowed && total_len < thresh)
{
if (!(swo & SWO_BUFFER))
return 0;
nwritten = 0;
do
{
@ -3370,7 +3498,7 @@ stream_write (lsquic_stream_t *stream, struct lsquic_reader *reader)
return nwritten;
}
else
return stream_write_to_packets(stream, reader, thresh);
return stream_write_to_packets(stream, reader, thresh, swo);
}
@ -3450,7 +3578,7 @@ lsquic_stream_writev (lsquic_stream_t *stream, const struct iovec *iov,
.lsqr_ctx = &iro,
};
return stream_write(stream, &reader);
return stream_write(stream, &reader, SWO_BUFFER);
}
@ -3459,7 +3587,221 @@ lsquic_stream_writef (lsquic_stream_t *stream, struct lsquic_reader *reader)
{
COMMON_WRITE_CHECKS();
SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
return stream_write(stream, reader);
return stream_write(stream, reader, SWO_BUFFER);
}
/* Configuration for lsquic_stream_pwritev: */
#ifndef LSQUIC_PWRITEV_DEF_IOVECS
#define LSQUIC_PWRITEV_DEF_IOVECS 16
#endif
/* This is an overkill, this limit should only be reached during testing: */
#ifndef LSQUIC_PWRITEV_DEF_FRAMES
#define LSQUIC_PWRITEV_DEF_FRAMES (LSQUIC_PWRITEV_DEF_IOVECS * 2)
#endif
#ifdef NDEBUG
#define PWRITEV_IOVECS LSQUIC_PWRITEV_DEF_IOVECS
#define PWRITEV_FRAMES LSQUIC_PWRITEV_DEF_FRAMES
#else
static unsigned
PWRITEV_IOVECS = LSQUIC_PWRITEV_DEF_IOVECS,
PWRITEV_FRAMES = LSQUIC_PWRITEV_DEF_FRAMES;
void
lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames)
{
PWRITEV_IOVECS = iovecs;
PWRITEV_FRAMES = frames;
}
#endif
struct pwritev_ctx
{
struct iovec *iov;
const struct hq_arr *hq_arr;
size_t total_bytes;
size_t n_to_write;
unsigned n_iovecs, max_iovecs;
};
static size_t
pwritev_size (void *lsqr_ctx)
{
struct pwritev_ctx *const ctx = lsqr_ctx;
if (ctx->n_iovecs < ctx->max_iovecs
&& ctx->hq_arr->count < ctx->hq_arr->max)
return ctx->n_to_write - ctx->total_bytes;
else
return 0;
}
static size_t
pwritev_read (void *lsqr_ctx, void *buf, size_t count)
{
struct pwritev_ctx *const ctx = lsqr_ctx;
assert(ctx->n_iovecs < ctx->max_iovecs);
ctx->iov[ctx->n_iovecs].iov_base = buf;
ctx->iov[ctx->n_iovecs].iov_len = count;
++ctx->n_iovecs;
ctx->total_bytes += count;
return count;
}
/* pwritev works as follows: allocate packets via lsquic_stream_writef() call
* and record pointers and sizes into an iovec array. Then issue a single call
* to user-supplied preadv() to populate all packets in one shot.
*
* Unwinding state changes due to a short write is by far the most complicated
* part of the machinery that follows. We optimize the normal path: it should
* be cheap to be prepared for the unwinding; unwinding itself can be more
* expensive, as we do not expect it to happen often.
*/
ssize_t
lsquic_stream_pwritev (struct lsquic_stream *stream,
ssize_t (*preadv)(void *user_data, const struct iovec *iov, int iovcnt),
void *user_data, size_t n_to_write)
{
struct lsquic_send_ctl *const ctl = stream->conn_pub->send_ctl;
struct iovec iovecs[PWRITEV_IOVECS], *last_iov;
unsigned char *hq_frames[PWRITEV_FRAMES];
struct pwritev_ctx ctx;
struct lsquic_reader reader;
struct send_ctl_state ctl_state;
struct hq_arr hq_arr;
ssize_t nw;
size_t n_allocated, sum;
#ifndef NDEBUG
const unsigned short n_buffered = stream->sm_n_buffered;
#endif
COMMON_WRITE_CHECKS();
SM_HISTORY_APPEND(stream, SHE_USER_WRITE_DATA);
lsquic_send_ctl_snapshot(ctl, &ctl_state);
ctx.total_bytes = 0;
ctx.n_to_write = n_to_write;
ctx.n_iovecs = 0;
ctx.max_iovecs = sizeof(iovecs) / sizeof(iovecs[0]);
ctx.iov = iovecs;
ctx.hq_arr = &hq_arr;
hq_arr.p = hq_frames;
hq_arr.count = 0;
hq_arr.max = sizeof(hq_frames) / sizeof(hq_frames[0]);
stream->sm_hq_arr = &hq_arr;
reader.lsqr_ctx = &ctx;
reader.lsqr_size = pwritev_size;
reader.lsqr_read = pwritev_read;
nw = stream_write(stream, &reader, 0);
LSQ_DEBUG("pwritev: stream_write returned %zd, n_iovecs: %d", nw,
ctx.n_iovecs);
if (nw > 0)
{
/* Amount of buffered data shouldn't have increased */
assert(n_buffered >= stream->sm_n_buffered);
n_allocated = (size_t) nw;
nw = preadv(user_data, ctx.iov, ctx.n_iovecs);
LSQ_DEBUG("pwritev: preadv returned %zd", nw);
if (nw >= 0 && (size_t) nw < n_allocated)
goto unwind_short_write;
}
cleanup:
stream->sm_hq_arr = NULL;
return nw;
unwind_short_write:
/* What follows is not the most efficient process. The emphasis here is
* on being simple instead. We expect short writes to be rare, so being
* slower than possible is a good tradeoff for being correct.
*/
LSQ_DEBUG("short write occurred, unwind");
SM_HISTORY_APPEND(stream, SHE_SHORT_WRITE);
/* First, adjust connection cap and stream offsets, and HTTP/3 framing,
* if necessary.
*/
if ((stream->sm_bflags & (SMBF_USE_HEADERS|SMBF_IETF))
== (SMBF_USE_HEADERS|SMBF_IETF))
{
size_t shortfall, payload_sz, decr;
unsigned char *p;
unsigned bits;
assert(hq_arr.count > 0);
shortfall = n_allocated - (size_t) nw;
do
{
const unsigned count = hq_arr.count;
(void) count;
p = hq_frames[--hq_arr.count];
assert(p[0] == HQFT_DATA);
assert(!(p[1] & 0x80)); /* Only one- and two-byte frame sizes */
if (p[1] & 0x40)
{
payload_sz = (p[1] & 0x3F) << 8;
payload_sz |= p[2];
}
else
payload_sz = p[1];
if (payload_sz > shortfall)
{
bits = p[1] >> 6;
vint_write(p + 1, payload_sz - shortfall, bits, 1 << bits);
decr = shortfall;
if (stream->sm_bflags & SMBF_CONN_LIMITED)
stream->conn_pub->conn_cap.cc_sent -= decr;
stream->sm_payload -= decr;
stream->tosend_off -= decr;
shortfall = 0;
}
else
{
decr = payload_sz + 2 + (p[1] >> 6);
if (stream->sm_bflags & SMBF_CONN_LIMITED)
stream->conn_pub->conn_cap.cc_sent -= decr;
stream->sm_payload -= payload_sz;
stream->tosend_off -= decr;
shortfall -= payload_sz;
}
}
while (hq_arr.count);
assert(shortfall == 0);
}
else
{
const size_t shortfall = n_allocated - (size_t) nw;
if (stream->sm_bflags & SMBF_CONN_LIMITED)
stream->conn_pub->conn_cap.cc_sent -= shortfall;
stream->sm_payload -= shortfall;
stream->tosend_off -= shortfall;
}
/* Find last iovec: */
sum = 0;
for (last_iov = iovecs; last_iov
< iovecs + sizeof(iovecs)/sizeof(iovecs[0]); ++last_iov)
{
sum += last_iov->iov_len;
if ((last_iov == iovecs || (size_t) nw > sum - last_iov->iov_len)
&& (size_t) nw <= sum)
break;
}
assert(last_iov < iovecs + sizeof(iovecs)/sizeof(iovecs[0]));
lsquic_send_ctl_rollback(ctl, &ctl_state, last_iov, sum - nw);
goto cleanup;
}
@ -3478,7 +3820,7 @@ stream_write_buf (struct lsquic_stream *stream, const void *buf, size_t sz)
.lsqr_size = inner_reader_iovec_size,
.lsqr_ctx = &iro,
};
return stream_write(stream, &reader);
return stream_write(stream, &reader, SWO_BUFFER);
}
@ -4747,7 +5089,7 @@ on_write_pp_wrapper (struct lsquic_stream *stream, lsquic_stream_ctx_t *h)
promise = SLIST_FIRST(&stream->sm_promises);
init_pp_reader(promise, &pp_reader);
nw = stream_write(stream, &pp_reader);
nw = stream_write(stream, &pp_reader, SWO_BUFFER);
if (nw > 0)
{
LSQ_DEBUG("wrote %zd bytes more of push promise (%s)",
@ -4801,7 +5143,7 @@ lsquic_stream_push_promise (struct lsquic_stream *stream,
stream->stream_flags |= STREAM_PUSHING;
init_pp_reader(promise, &pp_reader);
nw = stream_write(stream, &pp_reader);
nw = stream_write(stream, &pp_reader, SWO_BUFFER);
if (nw > 0)
{
SLIST_INSERT_HEAD(&stream->sm_promises, promise, pp_next);

View file

@ -22,11 +22,7 @@ TAILQ_HEAD(lsquic_streams_tailq, lsquic_stream);
#ifndef LSQUIC_KEEP_STREAM_HISTORY
# ifdef NDEBUG
# define LSQUIC_KEEP_STREAM_HISTORY 0
# else
# define LSQUIC_KEEP_STREAM_HISTORY 1
# endif
# define LSQUIC_KEEP_STREAM_HISTORY 1
#endif
@ -149,11 +145,13 @@ enum stream_q_flags
* connections's sending_streams queue. Note that writing STREAM
* frames is done separately.
*/
#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED)
#define SMQF_SENDING_FLAGS (SMQF_SEND_WUF|SMQF_SEND_RST|SMQF_SEND_BLOCKED\
|SMQF_SEND_STOP_SENDING)
/* sending_streams: */
SMQF_SEND_WUF = 1 << 3, /* WUF: Window Update Frame */
SMQF_SEND_BLOCKED = 1 << 4,
SMQF_SEND_RST = 1 << 5, /* Error: want to send RST_STREAM */
SMQF_SEND_STOP_SENDING = 1 << 10,
/* The equivalent of WINDOW_UPDATE frame for streams in IETF QUIC is
* the MAX_STREAM_DATA frame. Define an alias for use in the IETF
@ -167,6 +165,10 @@ enum stream_q_flags
SMQF_ABORT_CONN = 1 << 8, /* Unrecoverable error occurred */
SMQF_QPACK_DEC = 1 << 9, /* QPACK decoder handler is holding a reference to this stream */
/* The stream can reference itself, preventing its own destruction: */
#define SMQF_SELF_FLAGS SMQF_WAIT_FIN_OFF
SMQF_WAIT_FIN_OFF = 1 << 11, /* Waiting for final offset: FIN or RST */
};
@ -209,7 +211,7 @@ enum stream_flags {
STREAM_PUSHING = 1 << 18,
STREAM_NOPUSH = 1 << 19, /* Disallow further push promises */
STREAM_GOAWAY_IN = 1 << 20, /* Incoming GOAWAY has been processed */
STREAM_UNUSED21 = 1 << 21, /* Unused */
STREAM_SS_SENT = 1 << 21, /* STOP_SENDING sent */
STREAM_RST_ACKED = 1 << 22, /* Packet containing RST has been acked */
STREAM_BLOCKED_SENT = 1 << 23, /* Stays set once a STREAM_BLOCKED frame is sent */
STREAM_RST_READ = 1 << 24, /* User code collected the error */
@ -266,6 +268,9 @@ struct lsquic_stream
struct hq_filter sm_hq_filter;
/* Optional tap for pwritev undo */
struct hq_arr *sm_hq_arr;
/* We can safely use sm_hq_filter */
#define sm_uni_type_state sm_hq_filter.hqfi_vint2_state.vr2s_varint_state
@ -602,4 +607,12 @@ lsquic_stream_verify_len (struct lsquic_stream *, unsigned long long);
#define lsquic_stream_is_blocked(stream_) ((stream_)->blocked_off && \
(stream_)->blocked_off == (stream_)->max_send_off)
void
lsquic_stream_ss_frame_sent (struct lsquic_stream *);
#ifndef NDEBUG
void
lsquic_stream_set_pwritev_params (unsigned iovecs, unsigned frames);
#endif
#endif