Release 2.25.0

- [API, FEATURE] Add es_delay_onclose option to delay on_close until all
  data is ACKed.  Use new function lsquic_stream_has_unacked_data() to
  learn whether peer acknowledged all data written to stream.
- [API] Add optional on_reset() stream callback to get notifications
  when RESET or STOP_SENDING frames are received.
- [BUGFIX] On STOP_SENDING, make conn tickable is _writeable_, not
  readable.
This commit is contained in:
Dmitri Tikhonov 2020-12-04 11:29:14 -05:00
parent 57fe5a13ac
commit 7f96c7c7f3
13 changed files with 300 additions and 58 deletions

View file

@ -390,6 +390,7 @@ lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
settings->es_ptpc_int_gain = LSQUIC_DF_PTPC_INT_GAIN;
settings->es_ptpc_err_thresh = LSQUIC_DF_PTPC_ERR_THRESH;
settings->es_ptpc_err_divisor= LSQUIC_DF_PTPC_ERR_DIVISOR;
settings->es_delay_onclose = LSQUIC_DF_DELAY_ONCLOSE;
}

View file

@ -1330,6 +1330,8 @@ new_stream (struct full_conn *conn, lsquic_stream_id_t stream_id,
flags |= SCF_HTTP;
if (conn->fc_enpub->enp_settings.es_rw_once)
flags |= SCF_DISP_RW_ONCE;
if (conn->fc_enpub->enp_settings.es_delay_onclose)
flags |= SCF_DELAY_ONCLOSE;
return new_stream_ext(conn, stream_id, STREAM_IF_STD, flags);
}
@ -4025,6 +4027,7 @@ headers_stream_on_push_promise (void *ctx, struct uncompressed_headers *uh)
}
stream = new_stream_ext(conn, uh->uh_oth_stream_id, STREAM_IF_STD,
(conn->fc_enpub->enp_settings.es_delay_onclose?SCF_DELAY_ONCLOSE:0)|
SCF_DI_AUTOSWITCH|(conn->fc_enpub->enp_settings.es_rw_once ?
SCF_DISP_RW_ONCE : 0));
if (!stream)

View file

@ -1079,6 +1079,8 @@ create_bidi_stream_out (struct ietf_full_conn *conn)
flags = SCF_IETF|SCF_DI_AUTOSWITCH;
if (conn->ifc_enpub->enp_settings.es_rw_once)
flags |= SCF_DISP_RW_ONCE;
if (conn->ifc_enpub->enp_settings.es_delay_onclose)
flags |= SCF_DELAY_ONCLOSE;
if (conn->ifc_flags & IFC_HTTP)
{
flags |= SCF_HTTP;
@ -1117,6 +1119,8 @@ create_push_stream (struct ietf_full_conn *conn)
flags = SCF_IETF|SCF_HTTP;
if (conn->ifc_enpub->enp_settings.es_rw_once)
flags |= SCF_DISP_RW_ONCE;
if (conn->ifc_enpub->enp_settings.es_delay_onclose)
flags |= SCF_DELAY_ONCLOSE;
stream_id = generate_stream_id(conn, SD_UNI);
stream = lsquic_stream_new(stream_id, &conn->ifc_pub,
@ -5267,6 +5271,8 @@ new_stream (struct ietf_full_conn *conn, lsquic_stream_id_t stream_id,
stream_ctx = conn->ifc_enpub->enp_stream_if_ctx;
if (conn->ifc_enpub->enp_settings.es_rw_once)
flags |= SCF_DISP_RW_ONCE;
if (conn->ifc_enpub->enp_settings.es_delay_onclose)
flags |= SCF_DELAY_ONCLOSE;
if (conn->ifc_flags & IFC_HTTP)
{
flags |= SCF_HTTP;

View file

@ -1,20 +1,6 @@
/* Copyright (c) 2017 - 2020 LiteSpeed Technologies Inc. See LICENSE. */
/*
* lsquic_stream.c -- stream processing
*
* To clear up terminology, here are some of our stream states (in order).
* They are not codified, but they are referred to in both code and comments.
*
* CLOSED STREAM_U_READ_DONE and STREAM_U_WRITE_DONE are set. At this
* point, on_close() gets called.
* FINISHED FIN or RST has been sent to peer. Stream is scheduled to be
* finished (freed): it gets put onto the `service_streams'
* list for connection to clean it up.
* DESTROYED All remaining memory associated with the stream is released.
* If on_close() has not been called yet, it is called now.
* The stream pointer is now invalid.
*
* When connection is aborted, a stream may go directly to DESTROYED state.
*/
#include <assert.h>
@ -476,7 +462,7 @@ lsquic_stream_new (lsquic_stream_id_t id,
stream->sm_readable = stream_readable_non_http;
if ((ctor_flags & (SCF_HTTP|SCF_HTTP_PRIO))
== (SCF_HTTP|SCF_HTTP_PRIO))
lsquic_stream_set_priority_internal(stream, LSQUIC_DEF_HTTP_URGENCY);
lsquic_stream_set_priority_internal(stream, LSQUIC_DEF_HTTP_URGENCY);
else
lsquic_stream_set_priority_internal(stream,
LSQUIC_STREAM_DEFAULT_PRIO);
@ -708,13 +694,16 @@ lsquic_stream_destroy (lsquic_stream_t *stream)
static int
stream_is_finished (const lsquic_stream_t *stream)
stream_is_finished (struct lsquic_stream *stream)
{
return lsquic_stream_is_closed(stream)
&& (stream->sm_bflags & SMBF_DELAY_ONCLOSE ?
/* Need a stricter check when on_close() is delayed: */
!lsquic_stream_has_unacked_data(stream) :
/* n_unacked checks that no outgoing packets that reference this
* stream are outstanding:
*/
&& 0 == stream->n_unacked
0 == stream->n_unacked)
&& 0 == (stream->sm_qflags & (
/* This checks that no packets that reference this stream will
* become outstanding:
@ -756,6 +745,8 @@ maybe_schedule_call_on_close (lsquic_stream_t *stream)
if ((stream->stream_flags & (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|
STREAM_ONNEW_DONE|STREAM_ONCLOSE_DONE))
== (STREAM_U_READ_DONE|STREAM_U_WRITE_DONE|STREAM_ONNEW_DONE)
&& (!(stream->sm_bflags & SMBF_DELAY_ONCLOSE)
|| !lsquic_stream_has_unacked_data(stream))
&& !(stream->sm_qflags & SMQF_CALL_ONCLOSE))
{
if (0 == (stream->sm_qflags & SMQF_SERVICE_FLAGS))
@ -1224,6 +1215,28 @@ lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
return -1;
}
if (stream->stream_if->on_reset
&& !(stream->stream_flags & STREAM_ONCLOSE_DONE))
{
if (stream->sm_bflags & SMBF_IETF)
{
if (!(stream->sm_dflags & SMDF_ONRESET0))
{
stream->stream_if->on_reset(stream, stream->st_ctx, 0);
stream->sm_dflags |= SMDF_ONRESET0;
}
}
else
{
if ((stream->sm_dflags & (SMDF_ONRESET0|SMDF_ONRESET1))
!= (SMDF_ONRESET0|SMDF_ONRESET1))
{
stream->stream_if->on_reset(stream, stream->st_ctx, 2);
stream->sm_dflags |= SMDF_ONRESET0|SMDF_ONRESET1;
}
}
}
/* Let user collect error: */
maybe_conn_to_tickable_if_readable(stream);
@ -1270,8 +1283,15 @@ lsquic_stream_stop_sending_in (struct lsquic_stream *stream,
SM_HISTORY_APPEND(stream, SHE_STOP_SENDIG_IN);
stream->stream_flags |= STREAM_SS_RECVD;
if (stream->stream_if->on_reset && !(stream->sm_dflags & SMDF_ONRESET1)
&& !(stream->stream_flags & STREAM_ONCLOSE_DONE))
{
stream->stream_if->on_reset(stream, stream->st_ctx, 1);
stream->sm_dflags |= SMDF_ONRESET1;
}
/* Let user collect error: */
maybe_conn_to_tickable_if_readable(stream);
maybe_conn_to_tickable_if_writeable(stream, 0);
lsquic_sfcw_consume_rem(&stream->fc);
drop_frames_in(stream);
@ -4283,7 +4303,10 @@ lsquic_stream_acked (struct lsquic_stream *stream,
stream->stream_flags |= STREAM_RST_ACKED;
}
if (0 == stream->n_unacked)
{
maybe_schedule_call_on_close(stream);
maybe_finish_stream(stream);
}
}
@ -5411,3 +5434,10 @@ lsquic_stream_set_http_prio (struct lsquic_stream *stream,
else
return -1;
}
int
lsquic_stream_has_unacked_data (struct lsquic_stream *stream)
{
return stream->n_unacked > 0 || stream->sm_n_buffered > 0;
}

View file

@ -191,7 +191,17 @@ enum stream_b_flags
SMBF_HTTP_PRIO = 1 <<10, /* Extensible HTTP Priorities are used */
SMBF_INCREMENTAL = 1 <<11, /* Value of the "incremental" HTTP Priority parameter */
SMBF_HPRIO_SET = 1 <<12, /* Extensible HTTP Priorities have been set once */
#define N_SMBF_FLAGS 13
SMBF_DELAY_ONCLOSE= 1 <<13, /* Delay calling on_close() until peer ACKs everything */
#define N_SMBF_FLAGS 14
};
/* Stream "callback done" flags */
/* TODO: move STREAM.*DONE flags from stream_flags here */
enum stream_d_flags
{
SMDF_ONRESET0 = 1 << 0, /* Called on_reset(0) */
SMDF_ONRESET1 = 1 << 1, /* Called on_reset(1) */
};
@ -364,6 +374,7 @@ struct lsquic_stream
SSHS_ENC_SENDING, /* Sending encoder stream data */
SSHS_HBLOCK_SENDING,/* Sending header block data */
} sm_send_headers_state:8;
enum stream_d_flags sm_dflags:8;
signed char sm_saved_want_write;
signed char sm_has_frame;
@ -396,6 +407,7 @@ enum stream_ctor_flags
SCF_CRYPTO = SMBF_CRYPTO,
SCF_HEADERS = SMBF_HEADERS,
SCF_HTTP_PRIO = SMBF_HTTP_PRIO,
SCF_DELAY_ONCLOSE = SMBF_DELAY_ONCLOSE,
};