Latest changes

- [BUGFIX] Add connection to Tickable Queue on stream write
- cmake: use MSVC variable instead of trying to detect
- engine: improve connection incref/decref logging
- stream: don't ignore errors that may occur on triggered flush
- connection: remove obsolete method
- engine: indicate connection as tickable if previous call went
  over threshold
This commit is contained in:
Dmitri Tikhonov 2018-04-19 13:47:37 -04:00
parent 1279d5bfcd
commit 82f3bcef32
6 changed files with 63 additions and 42 deletions

View file

@ -1,3 +1,13 @@
2018-04-19
- [BUGFIX] Add connection to Tickable Queue on stream write
- cmake: use MSVC variable instead of trying to detect
- engine: improve connection incref/decref logging
- stream: don't ignore errors that may occur on triggered flush
- connection: remove obsolete method
- engine: indicate connection as tickable if previous call went
over threshold
2018-04-09
[API Change, OPTIMIZATION] Only process conns that need to be processed

View file

@ -344,12 +344,14 @@ struct lsquic_engine_settings {
/**
* If set, this value specifies that number of microseconds that
* @ref lsquic_engine_process_conns() is allowed to spend before
* returning.
* @ref lsquic_engine_process_conns() and
* @ref lsquic_engine_send_unsent_packets() are allowed to spend
* before returning.
*
* This is not an exact science and the connections must make
* progress, so the deadline is checked after all connections get
* a chance to tick and at least one batch of packets is sent out.
* a chance to tick (in the case of @ref lsquic_engine_process_conns())
* and at least one batch of packets is sent out.
*
* When processing function runs out of its time slice, immediate
* calls to @ref lsquic_engine_has_unsent_packets() return false.

View file

@ -74,9 +74,6 @@ struct conn_iface
void
(*ci_handshake_failed) (struct lsquic_conn *);
int
(*ci_user_wants_read) (struct lsquic_conn *);
void
(*ci_destroy) (struct lsquic_conn *);

View file

@ -525,14 +525,6 @@ process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
}
int
lsquic_engine_has_tickable (lsquic_engine_t *engine)
{
return !(engine->flags & ENG_PAST_DEADLINE)
&& lsquic_mh_count(&engine->conns_tickable) > 0;
}
void
lsquic_engine_destroy (lsquic_engine_t *engine)
{
@ -634,12 +626,13 @@ refflags2str (enum lsquic_conn_flags flags, char s[6])
static void
engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
{
char str[6];
char str[2][6];
assert(flag & CONN_REF_FLAGS);
assert(!(conn->cn_flags & flag));
conn->cn_flags |= flag;
LSQ_DEBUG("incref conn %"PRIu64", now '%s'", conn->cn_cid,
(refflags2str(conn->cn_flags, str), str));
LSQ_DEBUG("incref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
(refflags2str(conn->cn_flags & ~flag, str[0]), str[0]),
(refflags2str(conn->cn_flags, str[1]), str[1]));
}
@ -647,7 +640,7 @@ static lsquic_conn_t *
engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
enum lsquic_conn_flags flags)
{
char str[6];
char str[2][6];
assert(flags & CONN_REF_FLAGS);
assert(conn->cn_flags & flags);
#ifndef NDEBUG
@ -655,8 +648,9 @@ engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
assert(0 == (conn->cn_flags & LSCONN_HASHED));
#endif
conn->cn_flags &= ~flags;
LSQ_DEBUG("decref conn %"PRIu64", now '%s'", conn->cn_cid,
(refflags2str(conn->cn_flags, str), str));
LSQ_DEBUG("decref conn %"PRIu64", '%s' -> '%s'", conn->cn_cid,
(refflags2str(conn->cn_flags | flags, str[0]), str[0]),
(refflags2str(conn->cn_flags, str[1]), str[1]));
if (0 == (conn->cn_flags & CONN_REF_FLAGS))
{
eng_hist_inc(&engine->history, 0, sl_del_full_conns);
@ -1127,9 +1121,7 @@ send_packets_out (struct lsquic_engine *engine,
int
lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
{
return !(engine->flags & ENG_PAST_DEADLINE)
&& ( lsquic_mh_count(&engine->conns_out) > 0
)
return lsquic_mh_count(&engine->conns_out) > 0
;
}
@ -1300,7 +1292,9 @@ lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
const lsquic_time_t *next_time;
lsquic_time_t now;
if (lsquic_mh_count(&engine->conns_tickable))
if (((engine->flags & ENG_PAST_DEADLINE)
&& lsquic_mh_count(&engine->conns_out))
|| lsquic_mh_count(&engine->conns_tickable))
{
*diff = 0;
return 1;

View file

@ -2982,14 +2982,6 @@ full_conn_ci_handshake_failed (lsquic_conn_t *lconn)
}
static int
full_conn_ci_user_wants_read (lsquic_conn_t *lconn)
{
struct full_conn *conn = (struct full_conn *) lconn;
return !TAILQ_EMPTY(&conn->fc_pub.read_streams);
}
void
lsquic_conn_abort (lsquic_conn_t *lconn)
{
@ -3400,7 +3392,6 @@ static const struct conn_iface full_conn_iface = {
.ci_packet_not_sent = full_conn_ci_packet_not_sent,
.ci_packet_sent = full_conn_ci_packet_sent,
.ci_tick = full_conn_ci_tick,
.ci_user_wants_read = full_conn_ci_user_wants_read,
};
static const struct conn_iface *full_conn_iface_ptr = &full_conn_iface;

View file

@ -169,6 +169,15 @@ stream_inside_callback (const lsquic_stream_t *stream)
}
static void
maybe_conn_to_tickable (lsquic_stream_t *stream)
{
if (!stream_inside_callback(stream))
lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
stream->conn_pub->lconn);
}
/* Here, "readable" means that the user is able to read from the stream. */
static void
maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
@ -183,12 +192,18 @@ maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
/* Here, "writeable" means that data can be put into packets to be
* scheduled to be sent out.
*
* If `check_can_send' is false, it means that we do not need to check
* whether packets can be sent. This check was already performed when
* we packetized stream data.
*/
static void
maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream)
maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream,
int check_can_send)
{
if (!stream_inside_callback(stream) &&
lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
(!check_can_send
|| lsquic_send_ctl_can_send(stream->conn_pub->send_ctl)) &&
! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
{
lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
@ -788,7 +803,7 @@ lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
stream->stream_flags |= STREAM_SEND_WUF;
maybe_conn_to_tickable_if_writeable(stream);
maybe_conn_to_tickable_if_writeable(stream, 1);
}
}
@ -889,7 +904,7 @@ lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
maybe_finish_stream(stream);
maybe_schedule_call_on_close(stream);
if (how)
maybe_conn_to_tickable_if_writeable(stream);
maybe_conn_to_tickable_if_writeable(stream, 1);
return 0;
}
@ -1055,7 +1070,11 @@ int
lsquic_stream_wantwrite (lsquic_stream_t *stream, int is_want)
{
if (0 == (stream->stream_flags & STREAM_U_WRITE_DONE))
{
if (is_want)
maybe_conn_to_tickable_if_writeable(stream, 1);
return stream_wantwrite(stream, is_want);
}
else
{
errno = EBADF;
@ -1558,6 +1577,7 @@ abort_connection (struct lsquic_stream *stream)
next_service_stream);
stream->stream_flags |= STREAM_ABORT_CONN;
LSQ_WARN("connection will be aborted");
maybe_conn_to_tickable(stream);
}
@ -1567,18 +1587,22 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
{
size_t size;
ssize_t nw;
unsigned seen_ok;
struct frame_gen_ctx fg_ctx = {
.fgc_stream = stream,
.fgc_reader = reader,
.fgc_nread_from_reader = 0,
};
seen_ok = 0;
while ((size = frame_gen_size(&fg_ctx), thresh ? size >= thresh : size > 0)
|| frame_gen_fin(&fg_ctx))
{
switch (stream_write_to_packet(&fg_ctx, size))
{
case SWTP_OK:
if (!seen_ok++)
maybe_conn_to_tickable_if_writeable(stream, 0);
if (frame_gen_fin(&fg_ctx))
{
stream->stream_flags |= STREAM_FIN_SENT;
@ -1633,13 +1657,15 @@ stream_write_to_packets (lsquic_stream_t *stream, struct lsquic_reader *reader,
* None of them flushes, which means that data is not sent and connection
* WINDOW_UPDATE frame never arrives from peer. Stall.
*/
static void
static int
maybe_flush_stream (struct lsquic_stream *stream)
{
if (stream->sm_n_buffered > 0
&& (stream->stream_flags & STREAM_CONN_LIMITED)
&& lsquic_conn_cap_avail(&stream->conn_pub->conn_cap) == 0)
stream_flush_nocheck(stream);
return stream_flush_nocheck(stream);
else
return 0;
}
@ -1668,7 +1694,8 @@ save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
incr_conn_cap(stream, n_written);
LSQ_DEBUG("buffered %zd bytes; %hu bytes are now in buffer",
n_written, stream->sm_n_buffered);
maybe_flush_stream(stream);
if (0 != maybe_flush_stream(stream))
return -1;
return n_written;
}
@ -1887,7 +1914,7 @@ lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
if (do_close)
lsquic_stream_close(stream);
else
maybe_conn_to_tickable_if_writeable(stream);
maybe_conn_to_tickable_if_writeable(stream, 1);
}
@ -1920,7 +1947,7 @@ lsquic_stream_close (lsquic_stream_t *stream)
stream_shutdown_read(stream);
maybe_schedule_call_on_close(stream);
maybe_finish_stream(stream);
maybe_conn_to_tickable_if_writeable(stream);
maybe_conn_to_tickable_if_writeable(stream, 1);
return 0;
}