mirror of
https://gitea.invidious.io/iv-org/litespeed-quic.git
synced 2024-08-15 00:53:43 +00:00
[API Change, OPTIMIZATION] Only process conns that need to be processed
The API is simplified: do not expose the user code to several queues. A "connection queue" is now an internal concept. The user processes connections using the single function lsquic_engine_process_conns(). When this function is called, only those connections are processed that need to be processed. A connection needs to be processed when: 1. New incoming packets have been fed to the connection. 2. User wants to read from a stream that is readable. 3. User wants to write to a stream that is writeable. 4. There are buffered packets that can be sent out. (This means that the user wrote to a stream outside of the lsquic library callback.) 5. A control frame (such as BLOCKED) needs to be sent out. 6. A stream needs to be serviced or delayed stream needs to be created. 7. An alarm rings. 8. Pacer timer expires. To achieve this, the library places the connections into two priority queues (min heaps): 1. Tickable Queue; and 2. Advisory Tick Time queue (ATTQ). Each time lsquic_engine_process_conns() is called, the Tickable Queue is emptied. After the connections have been ticked, they are queried again: if a connection is not being closed, it is placed either in the Tickable Queue if it is ready to be ticked again or it is placed in the Advisory Tick Time Queue. It is assumed that a connection always has at least one timer set (the idle alarm). The connections in the Tickable Queue are arranged in the least recently ticked order. This lets connections that have been quiet longer to get their packets scheduled first. This change means that the library no longer needs to be ticked periodically. The user code can query the library when is the next tick event and schedule it exactly. When connections are processed, only the tickable connections are processed, not *all* the connections. When there are no tick events, it means that no timer event is necessary -- only the file descriptor READ event is active. The following are improvements and simplifications that have been triggered: - Queue of connections with incoming packets is gone. - "Pending Read/Write Events" Queue is gone (along with its history and progress checks). This queue has become the Tickable Queue. - The connection hash no longer needs to track the connection insertion order.
This commit is contained in:
parent
eef4f2fcfb
commit
e8bd737db4
27 changed files with 621 additions and 890 deletions
62
APIs.txt
62
APIs.txt
|
@ -127,44 +127,35 @@ drive QUIC connections:
|
|||
ea_packets_out callback.
|
||||
|
||||
|
||||
Connection Queues
|
||||
-----------------
|
||||
Connection Management
|
||||
---------------------
|
||||
|
||||
Each connection lives in one or more queues. These are:
|
||||
A connections needs to be processed once in a while. It needs to be
|
||||
processed when one of the following is true:
|
||||
|
||||
- "All" queue. This is not really a queue, but rather a hash where
|
||||
connections can be looked up by ID. It is possible for a connection
|
||||
to exist *outside* of this queue: this happens when the connection is
|
||||
closed, but still has packets to send. In this case, the connection
|
||||
is present in
|
||||
- Outgoing queue. This queue contains connections which have packets
|
||||
to send. The connection in this queue are ordered by priority: the
|
||||
connection that has gone longest without sending is first.
|
||||
- Incoming queue. This queue contains connections that have incoming
|
||||
packets.
|
||||
- Pending RW Events queue. (See Connection Read-Write Events).
|
||||
- Advisory Tick Time queue. This queue is used when packet pacing is
|
||||
turned on (see es_pace_packets option).
|
||||
- There are incoming packets;
|
||||
- A stream is both readable by the user code and the user code wants
|
||||
to read from it;
|
||||
- A stream is both writeable by the user code and the user code wants
|
||||
to write to it;
|
||||
- User has written to stream outside of on_write() callbacks (that is
|
||||
allowed) and now there are packets ready to be sent;
|
||||
- A timer (pacer, retransmission, idle, etc) has expired;
|
||||
- A control frame needs to be sent out;
|
||||
- A stream needs to be serviced or created.
|
||||
|
||||
Each of these queues can be processed by a specialized function. They are,
|
||||
respectively:
|
||||
Each of these use cases is handled by a single function:
|
||||
|
||||
- lsquic_engine_proc_all()
|
||||
- lsquic_engine_send_unsent_packets()
|
||||
- lsquic_engine_process_conns_with_incoming()
|
||||
- lsquic_engine_process_conns_with_pend_rw()
|
||||
- lsquic_engine_process_conns_to_tick()
|
||||
lsquic_engine_process_conns()
|
||||
|
||||
Processing, or "ticking," a connection removes it from Incoming, Pending
|
||||
RW Events, and Advisory Tick Time queues. The connection gets placed
|
||||
onto these queues as necessary.
|
||||
The connections to which the conditions above apply are processed (or
|
||||
"ticked") in the least recently ticked order. After calling this function,
|
||||
you can see when is the next time a connection needs to be processed using
|
||||
|
||||
A simple approach is to
|
||||
- Read packets from socket, give it to the engine using
|
||||
lsquic_engine_packet_in(), and call
|
||||
lsquic_engine_process_conns_with_incoming(); and
|
||||
- Call lsquic_engine_proc_all() every few dozen milliseconds.
|
||||
lsquic_engine_earliest_adv_tick()
|
||||
|
||||
Based on this value, next event can be scheduled (in the event loop of
|
||||
your choice).
|
||||
|
||||
Connection
|
||||
----------
|
||||
|
@ -235,12 +226,3 @@ Versions
|
|||
|
||||
QUIC version are listed in enum lsquic_version. To specify a list of
|
||||
versions, they are usually placed in a bitmask, e.g. es_versions.
|
||||
|
||||
|
||||
Connection Read-Write Events
|
||||
----------------------------
|
||||
|
||||
TODO.
|
||||
|
||||
(Do not worry about it if you are not writing to streams outside
|
||||
of on_write() callback.)
|
||||
|
|
58
CHANGELOG
58
CHANGELOG
|
@ -1,3 +1,61 @@
|
|||
2018-04-09
|
||||
|
||||
[API Change, OPTIMIZATION] Only process conns that need to be processed
|
||||
|
||||
The API is simplified: do not expose the user code to several
|
||||
queues. A "connection queue" is now an internal concept.
|
||||
The user processes connections using the single function
|
||||
lsquic_engine_process_conns(). When this function is called,
|
||||
only those connections are processed that need to be processed.
|
||||
A connection needs to be processed when:
|
||||
|
||||
1. New incoming packets have been fed to the connection.
|
||||
2. User wants to read from a stream that is readable.
|
||||
3. User wants to write to a stream that is writeable.
|
||||
4. There are buffered packets that can be sent out. (This
|
||||
means that the user wrote to a stream outside of the
|
||||
lsquic library callback.)
|
||||
5. A control frame (such as BLOCKED) needs to be sent out.
|
||||
6. A stream needs to be serviced or delayed stream needs to
|
||||
be created.
|
||||
7. An alarm rings.
|
||||
8. Pacer timer expires.
|
||||
|
||||
To achieve this, the library places the connections into two
|
||||
priority queues (min heaps):
|
||||
|
||||
1. Tickable Queue; and
|
||||
2. Advisory Tick Time queue (ATTQ).
|
||||
|
||||
Each time lsquic_engine_process_conns() is called, the Tickable
|
||||
Queue is emptied. After the connections have been ticked, they are
|
||||
queried again: if a connection is not being closed, it is placed
|
||||
either in the Tickable Queue if it is ready to be ticked again or
|
||||
it is placed in the Advisory Tick Time Queue. It is assumed that
|
||||
a connection always has at least one timer set (the idle alarm).
|
||||
|
||||
The connections in the Tickable Queue are arranged in the least
|
||||
recently ticked order. This lets connections that have been quiet
|
||||
longer to get their packets scheduled first.
|
||||
|
||||
This change means that the library no longer needs to be ticked
|
||||
periodically. The user code can query the library when is the
|
||||
next tick event and schedule it exactly. When connections are
|
||||
processed, only the tickable connections are processed, not *all*
|
||||
the connections. When there are no tick events, it means that no
|
||||
timer event is necessary -- only the file descriptor READ event
|
||||
is active.
|
||||
|
||||
The following are improvements and simplifications that have
|
||||
been triggered:
|
||||
|
||||
- Queue of connections with incoming packets is gone.
|
||||
- "Pending Read/Write Events" Queue is gone (along with its
|
||||
history and progress checks). This queue has become the
|
||||
Tickable Queue.
|
||||
- The connection hash no longer needs to track the connection
|
||||
insertion order.
|
||||
|
||||
2018-04-02
|
||||
|
||||
- [FEATURE] Windows support
|
||||
|
|
|
@ -178,9 +178,6 @@ struct lsquic_stream_if {
|
|||
/** By default, infinite loop checks are turned on */
|
||||
#define LSQUIC_DF_PROGRESS_CHECK 1000
|
||||
|
||||
/** By default, Pending RW Queue infinite loop checks are turned on: */
|
||||
#define LSQUIC_DF_PENDRW_CHECK 10
|
||||
|
||||
/** By default, read/write events are dispatched in a loop */
|
||||
#define LSQUIC_DF_RW_ONCE 0
|
||||
|
||||
|
@ -331,23 +328,6 @@ struct lsquic_engine_settings {
|
|||
*/
|
||||
unsigned es_progress_check;
|
||||
|
||||
/**
|
||||
* A non-zero value enables internal checks to identify suspected
|
||||
* infinite loops in Pending RW Queue logic. The value of this
|
||||
* setting is the number of times a connection on Pending RW Queue
|
||||
* is allowed to be processed without making progress before it is
|
||||
* banished from Pending RW Queue.
|
||||
*
|
||||
* Progress is considered to have happened if any of the following
|
||||
* occurs:
|
||||
* - User reads data, FIN, or new error (due to a reset) from a
|
||||
* stream.
|
||||
* - A new stream-related frame is packetized.
|
||||
*
|
||||
* The defaut value is @ref LSQUIC_DF_PENDRW_CHECK.
|
||||
*/
|
||||
unsigned es_pendrw_check;
|
||||
|
||||
/**
|
||||
* A non-zero value make stream dispatch its read-write events once
|
||||
* per call.
|
||||
|
@ -364,19 +344,15 @@ struct lsquic_engine_settings {
|
|||
|
||||
/**
|
||||
* If set, this value specifies that number of microseconds that
|
||||
* functions @ref lsquic_engine_proc_all(),
|
||||
* @ref lsquic_engine_process_conns_with_incoming(),
|
||||
* @ref lsquic_engine_process_conns_to_tick(), and
|
||||
* @ref lsquic_engine_process_conns_with_pend_rw() are allowed
|
||||
* to spend before returning.
|
||||
* @ref lsquic_engine_process_conns() is allowed to spend before
|
||||
* returning.
|
||||
*
|
||||
* This is not an exact science and the connections must make
|
||||
* progress, so the deadline is checked after all connections get
|
||||
* a chance to tick and at least one batch of packets is sent out.
|
||||
*
|
||||
* When processing function runs out of its time slice, immediate
|
||||
* calls to @ref lsquic_engine_has_pend_rw() and
|
||||
* @ref lsquic_engine_has_unsent_packets() return false.
|
||||
* calls to @ref lsquic_engine_has_unsent_packets() return false.
|
||||
*
|
||||
* The default value is @ref LSQUIC_DF_PROC_TIME_THRESH.
|
||||
*/
|
||||
|
@ -503,50 +479,11 @@ lsquic_engine_packet_in (lsquic_engine_t *,
|
|||
void *peer_ctx);
|
||||
|
||||
/**
|
||||
* Process all connections. This function must be called often enough so
|
||||
* Process tickable connections. This function must be called often enough so
|
||||
* that packets and connections do not expire.
|
||||
*/
|
||||
void
|
||||
lsquic_engine_proc_all (lsquic_engine_t *engine);
|
||||
|
||||
/**
|
||||
* Process connections that have incoming packets. Call this after adding
|
||||
* one or more incoming packets using lsquic_engine_packet_in().
|
||||
*/
|
||||
void
|
||||
lsquic_engine_process_conns_with_incoming (lsquic_engine_t *);
|
||||
|
||||
/**
|
||||
* Process connections in Advisory Tick Time queue whose tick times are in
|
||||
* the past.
|
||||
*/
|
||||
void
|
||||
lsquic_engine_process_conns_to_tick (lsquic_engine_t *);
|
||||
|
||||
/**
|
||||
* Returns true if engine has connections that have pending read or write
|
||||
* events.
|
||||
*
|
||||
* Connections with pending read or write events are those that have at
|
||||
* least one stream whose state changed outside of the regular callback
|
||||
* mechanism. The simplest example is writing directly to the stream
|
||||
* object when data comes in.
|
||||
*
|
||||
* A call to @ref lsquic_engine_proc_all,
|
||||
* @ref lsquic_engine_process_conns_with_incoming,
|
||||
* @ref lsquic_engine_process_conns_to_tick, or
|
||||
* @ref lsquic_engine_process_conns_with_pend_rw removes processed connection
|
||||
* from Pending RW queue.
|
||||
*/
|
||||
int
|
||||
lsquic_engine_has_pend_rw (lsquic_engine_t *);
|
||||
|
||||
/**
|
||||
* Process connections that have pending read or write events (@see
|
||||
* lsquic_engine_has_pend_rw for description).
|
||||
*/
|
||||
void
|
||||
lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *);
|
||||
lsquic_engine_process_conns (lsquic_engine_t *engine);
|
||||
|
||||
/**
|
||||
* Returns true if engine has some unsent packets. This happens if
|
||||
|
@ -895,10 +832,10 @@ void
|
|||
lsquic_conn_abort (lsquic_conn_t *c);
|
||||
|
||||
/**
|
||||
* Returns true if there is a connection on the Advisory Tick Time queue,
|
||||
* false otherwise. If true, `diff' is set to the difference between
|
||||
* the earliest advisory tick time and now. If the former is in the past,
|
||||
* the value of `diff' is negative.
|
||||
* Returns true if there are connections to be processed, false otherwise.
|
||||
* If true, `diff' is set to the difference between the earliest advisory
|
||||
* tick time and now. If the former is in the past, the value of `diff'
|
||||
* is negative.
|
||||
*/
|
||||
int
|
||||
lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff);
|
||||
|
|
|
@ -52,6 +52,7 @@ SET(lsquic_STAT_SRCS
|
|||
lsquic_hpack_enc.c
|
||||
lsquic_xxhash.c
|
||||
lsquic_buf.c
|
||||
lsquic_min_heap.c
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -52,3 +52,25 @@ lsquic_alarmset_ring_expired (lsquic_alarmset_t *alset, lsquic_time_t now)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
lsquic_time_t
|
||||
lsquic_alarmset_mintime (const lsquic_alarmset_t *alset)
|
||||
{
|
||||
lsquic_time_t expiry;
|
||||
enum alarm_id al_id;
|
||||
|
||||
if (alset->as_armed_set)
|
||||
{
|
||||
expiry = UINT64_MAX;
|
||||
for (al_id = 0; al_id < MAX_LSQUIC_ALARMS; ++al_id)
|
||||
if ((alset->as_armed_set & (1 << al_id))
|
||||
&& alset->as_expiry[al_id] < expiry)
|
||||
{
|
||||
expiry = alset->as_expiry[al_id];
|
||||
}
|
||||
return expiry;
|
||||
}
|
||||
else
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -67,4 +67,7 @@ lsquic_alarmset_init_alarm (lsquic_alarmset_t *, enum alarm_id,
|
|||
void
|
||||
lsquic_alarmset_ring_expired (lsquic_alarmset_t *, lsquic_time_t now);
|
||||
|
||||
lsquic_time_t
|
||||
lsquic_alarmset_mintime (const lsquic_alarmset_t *);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -26,7 +26,6 @@ struct attq
|
|||
{
|
||||
struct malo *aq_elem_malo;
|
||||
struct attq_elem **aq_heap;
|
||||
lsquic_time_t aq_min;
|
||||
unsigned aq_nelem;
|
||||
unsigned aq_nalloc;
|
||||
};
|
||||
|
@ -91,17 +90,6 @@ attq_verify (struct attq *q)
|
|||
#endif
|
||||
|
||||
|
||||
int
|
||||
attq_maybe_add (struct attq *q, struct lsquic_conn *conn,
|
||||
lsquic_time_t advisory_time)
|
||||
{
|
||||
if (advisory_time < q->aq_min)
|
||||
return 1;
|
||||
else
|
||||
return attq_add(q, conn, advisory_time);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
attq_swap (struct attq *q, unsigned a, unsigned b)
|
||||
{
|
||||
|
@ -273,13 +261,3 @@ attq_next_time (struct attq *q)
|
|||
else
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
lsquic_time_t
|
||||
attq_set_min (struct attq *q, lsquic_time_t new_min)
|
||||
{
|
||||
lsquic_time_t prev_value;
|
||||
prev_value = q->aq_min;
|
||||
q->aq_min = new_min;
|
||||
return prev_value;
|
||||
}
|
||||
|
|
|
@ -27,11 +27,6 @@ attq_create (void);
|
|||
void
|
||||
attq_destroy (struct attq *);
|
||||
|
||||
/* Return 1 if advisory_time is too small, 0 on success, -1 on failure */
|
||||
int
|
||||
attq_maybe_add (struct attq *, struct lsquic_conn *,
|
||||
lsquic_time_t advisory_time);
|
||||
|
||||
/* Return 0 on success, -1 on failure (malloc) */
|
||||
int
|
||||
attq_add (struct attq *, struct lsquic_conn *, lsquic_time_t advisory_time);
|
||||
|
@ -48,7 +43,4 @@ attq_count_before (struct attq *, lsquic_time_t cutoff);
|
|||
const lsquic_time_t *
|
||||
attq_next_time (struct attq *);
|
||||
|
||||
lsquic_time_t
|
||||
attq_set_min (struct attq *, lsquic_time_t new_min);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -24,7 +24,6 @@ struct parse_funcs;
|
|||
struct attq_elem;
|
||||
|
||||
enum lsquic_conn_flags {
|
||||
LSCONN_HAS_INCOMING = (1 << 0),
|
||||
LSCONN_HAS_OUTGOING = (1 << 1),
|
||||
LSCONN_HASHED = (1 << 2),
|
||||
LSCONN_HAS_PEER_SA = (1 << 4),
|
||||
|
@ -35,25 +34,19 @@ enum lsquic_conn_flags {
|
|||
LSCONN_TCID0 = (1 << 9),
|
||||
LSCONN_VER_SET = (1 <<10), /* cn_version is set */
|
||||
LSCONN_EVANESCENT = (1 <<11), /* evanescent connection */
|
||||
LSCONN_RW_PENDING = (1 <<12),
|
||||
LSCONN_TICKABLE = (1 <<12), /* Connection is in the Tickable Queue */
|
||||
LSCONN_COI_ACTIVE = (1 <<13),
|
||||
LSCONN_COI_INACTIVE = (1 <<14),
|
||||
LSCONN_SEND_BLOCKED = (1 <<15), /* Send connection blocked frame */
|
||||
LSCONN_NEVER_PEND_RW = (1 <<17), /* Do not put onto Pending RW queue */
|
||||
LSCONN_NEVER_TICKABLE = (1 <<17), /* Do not put onto the Tickable Queue */
|
||||
LSCONN_ATTQ = (1 <<19),
|
||||
};
|
||||
|
||||
#define TICK_BIT_PROGRESS 2
|
||||
|
||||
/* A connection may have things to send and be closed at the same time.
|
||||
*/
|
||||
enum tick_st {
|
||||
TICK_SEND = (1 << 0),
|
||||
TICK_CLOSE = (1 << 1),
|
||||
/* Progress was made (see @ref es_pendrw_check for definition of
|
||||
* "progress.")
|
||||
*/
|
||||
TICK_PROGRESS= (1 << TICK_BIT_PROGRESS),
|
||||
};
|
||||
|
||||
#define TICK_QUIET 0
|
||||
|
@ -86,10 +79,13 @@ struct conn_iface
|
|||
|
||||
void
|
||||
(*ci_destroy) (struct lsquic_conn *);
|
||||
};
|
||||
|
||||
#define RW_HIST_BITS 6
|
||||
typedef unsigned char rw_hist_idx_t;
|
||||
int
|
||||
(*ci_is_tickable) (struct lsquic_conn *);
|
||||
|
||||
lsquic_time_t
|
||||
(*ci_next_tick_time) (struct lsquic_conn *);
|
||||
};
|
||||
|
||||
struct lsquic_conn
|
||||
{
|
||||
|
@ -99,22 +95,18 @@ struct lsquic_conn
|
|||
*cn_esf;
|
||||
lsquic_cid_t cn_cid;
|
||||
STAILQ_ENTRY(lsquic_conn) cn_next_closed_conn;
|
||||
TAILQ_ENTRY(lsquic_conn) cn_next_all,
|
||||
cn_next_in,
|
||||
cn_next_pend_rw,
|
||||
cn_next_out,
|
||||
STAILQ_ENTRY(lsquic_conn) cn_next_ticked;
|
||||
TAILQ_ENTRY(lsquic_conn) cn_next_out,
|
||||
cn_next_hash;
|
||||
const struct conn_iface *cn_if;
|
||||
const struct parse_funcs *cn_pf;
|
||||
struct attq_elem *cn_attq_elem;
|
||||
lsquic_time_t cn_last_sent;
|
||||
lsquic_time_t cn_last_ticked;
|
||||
enum lsquic_conn_flags cn_flags;
|
||||
enum lsquic_version cn_version;
|
||||
unsigned cn_noprogress_count;
|
||||
unsigned cn_hash;
|
||||
unsigned short cn_pack_size;
|
||||
rw_hist_idx_t cn_rw_hist_idx;
|
||||
unsigned char cn_rw_hist_buf[ 1 << RW_HIST_BITS ];
|
||||
unsigned char cn_peer_addr[sizeof(struct sockaddr_in6)],
|
||||
cn_local_addr[sizeof(struct sockaddr_in6)];
|
||||
};
|
||||
|
|
|
@ -20,24 +20,19 @@
|
|||
|
||||
|
||||
int
|
||||
conn_hash_init (struct conn_hash *conn_hash, unsigned max_count)
|
||||
conn_hash_init (struct conn_hash *conn_hash)
|
||||
{
|
||||
unsigned n;
|
||||
|
||||
if (!max_count)
|
||||
max_count = 1000000;
|
||||
|
||||
memset(conn_hash, 0, sizeof(*conn_hash));
|
||||
conn_hash->ch_max_count = max_count;
|
||||
conn_hash->ch_nbits = 1; /* Start small */
|
||||
TAILQ_INIT(&conn_hash->ch_all);
|
||||
conn_hash->ch_buckets = malloc(sizeof(conn_hash->ch_buckets[0]) *
|
||||
n_buckets(conn_hash->ch_nbits));
|
||||
if (!conn_hash->ch_buckets)
|
||||
return -1;
|
||||
for (n = 0; n < n_buckets(conn_hash->ch_nbits); ++n)
|
||||
TAILQ_INIT(&conn_hash->ch_buckets[n]);
|
||||
LSQ_INFO("initialized: max_count: %u", conn_hash->ch_max_count);
|
||||
LSQ_INFO("initialized");
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -105,8 +100,6 @@ conn_hash_add (struct conn_hash *conn_hash, struct lsquic_conn *lconn)
|
|||
{
|
||||
const unsigned hash = XXH32(&lconn->cn_cid, sizeof(lconn->cn_cid),
|
||||
(uintptr_t) conn_hash);
|
||||
if (conn_hash->ch_count >= conn_hash->ch_max_count)
|
||||
return -1;
|
||||
if (conn_hash->ch_count >=
|
||||
n_buckets(conn_hash->ch_nbits) * CONN_HASH_MAX_PER_BUCKET &&
|
||||
conn_hash->ch_nbits < sizeof(hash) * 8 - 1 &&
|
||||
|
@ -116,7 +109,6 @@ conn_hash_add (struct conn_hash *conn_hash, struct lsquic_conn *lconn)
|
|||
}
|
||||
const unsigned buckno = conn_hash_bucket_no(conn_hash, hash);
|
||||
lconn->cn_hash = hash;
|
||||
TAILQ_INSERT_TAIL(&conn_hash->ch_all, lconn, cn_next_all);
|
||||
TAILQ_INSERT_TAIL(&conn_hash->ch_buckets[buckno], lconn, cn_next_hash);
|
||||
++conn_hash->ch_count;
|
||||
return 0;
|
||||
|
@ -127,7 +119,6 @@ void
|
|||
conn_hash_remove (struct conn_hash *conn_hash, struct lsquic_conn *lconn)
|
||||
{
|
||||
const unsigned buckno = conn_hash_bucket_no(conn_hash, lconn->cn_hash);
|
||||
TAILQ_REMOVE(&conn_hash->ch_all, lconn, cn_next_all);
|
||||
TAILQ_REMOVE(&conn_hash->ch_buckets[buckno], lconn, cn_next_hash);
|
||||
--conn_hash->ch_count;
|
||||
}
|
||||
|
@ -136,7 +127,8 @@ conn_hash_remove (struct conn_hash *conn_hash, struct lsquic_conn *lconn)
|
|||
void
|
||||
conn_hash_reset_iter (struct conn_hash *conn_hash)
|
||||
{
|
||||
conn_hash->ch_next = TAILQ_FIRST(&conn_hash->ch_all);
|
||||
conn_hash->ch_iter.cur_buckno = 0;
|
||||
conn_hash->ch_iter.next_conn = TAILQ_FIRST(&conn_hash->ch_buckets[0]);
|
||||
}
|
||||
|
||||
|
||||
|
@ -151,8 +143,16 @@ conn_hash_first (struct conn_hash *conn_hash)
|
|||
struct lsquic_conn *
|
||||
conn_hash_next (struct conn_hash *conn_hash)
|
||||
{
|
||||
struct lsquic_conn *lconn = conn_hash->ch_next;
|
||||
struct lsquic_conn *lconn = conn_hash->ch_iter.next_conn;
|
||||
while (!lconn)
|
||||
{
|
||||
++conn_hash->ch_iter.cur_buckno;
|
||||
if (conn_hash->ch_iter.cur_buckno >= n_buckets(conn_hash->ch_nbits))
|
||||
return NULL;
|
||||
lconn = TAILQ_FIRST(&conn_hash->ch_buckets[
|
||||
conn_hash->ch_iter.cur_buckno]);
|
||||
}
|
||||
if (lconn)
|
||||
conn_hash->ch_next = TAILQ_NEXT(lconn, cn_next_all);
|
||||
conn_hash->ch_iter.next_conn = TAILQ_NEXT(lconn, cn_next_hash);
|
||||
return lconn;
|
||||
}
|
||||
|
|
|
@ -21,19 +21,20 @@ TAILQ_HEAD(lsquic_conn_head, lsquic_conn);
|
|||
|
||||
struct conn_hash
|
||||
{
|
||||
struct lsquic_conn_head ch_all;
|
||||
struct lsquic_conn_head *ch_buckets;
|
||||
struct lsquic_conn *ch_next;
|
||||
struct {
|
||||
unsigned cur_buckno;
|
||||
struct lsquic_conn *next_conn;
|
||||
} ch_iter;
|
||||
unsigned ch_count;
|
||||
unsigned ch_nbits;
|
||||
unsigned ch_max_count;
|
||||
};
|
||||
|
||||
#define conn_hash_count(conn_hash) (+(conn_hash)->ch_count)
|
||||
|
||||
/* Returns -1 if malloc fails */
|
||||
int
|
||||
conn_hash_init (struct conn_hash *, unsigned max_count);
|
||||
conn_hash_init (struct conn_hash *);
|
||||
|
||||
void
|
||||
conn_hash_cleanup (struct conn_hash *);
|
||||
|
|
|
@ -53,6 +53,7 @@
|
|||
#include "lsquic_version.h"
|
||||
#include "lsquic_hash.h"
|
||||
#include "lsquic_attq.h"
|
||||
#include "lsquic_min_heap.h"
|
||||
|
||||
#define LSQUIC_LOGGER_MODULE LSQLM_ENGINE
|
||||
#include "lsquic_logger.h"
|
||||
|
@ -73,7 +74,8 @@ struct out_batch
|
|||
typedef struct lsquic_conn * (*conn_iter_f)(struct lsquic_engine *);
|
||||
|
||||
static void
|
||||
process_connections (struct lsquic_engine *engine, conn_iter_f iter);
|
||||
process_connections (struct lsquic_engine *engine, conn_iter_f iter,
|
||||
lsquic_time_t now);
|
||||
|
||||
static void
|
||||
engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag);
|
||||
|
@ -96,26 +98,24 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
|
|||
(e)->pub.enp_flags &= ~ENPUB_PROC; \
|
||||
} while (0)
|
||||
|
||||
/* A connection can be referenced from one of six places:
|
||||
/* A connection can be referenced from one of five places:
|
||||
*
|
||||
* 1. Connection hash: a connection starts its life in one of those.
|
||||
*
|
||||
* 2. Outgoing queue.
|
||||
*
|
||||
* 3. Incoming queue.
|
||||
* 3. Tickable queue
|
||||
*
|
||||
* 4. Pending RW Events queue.
|
||||
* 4. Advisory Tick Time queue.
|
||||
*
|
||||
* 5. Advisory Tick Time queue.
|
||||
*
|
||||
* 6. Closing connections queue. This is a transient queue -- it only
|
||||
* 5. Closing connections queue. This is a transient queue -- it only
|
||||
* exists for the duration of process_connections() function call.
|
||||
*
|
||||
* The idea is to destroy the connection when it is no longer referenced.
|
||||
* For example, a connection tick may return TICK_SEND|TICK_CLOSE. In
|
||||
* that case, the connection is referenced from two places: (2) and (6).
|
||||
* After its packets are sent, it is only referenced in (6), and at the
|
||||
* end of the function call, when it is removed from (6), reference count
|
||||
* that case, the connection is referenced from two places: (2) and (5).
|
||||
* After its packets are sent, it is only referenced in (5), and at the
|
||||
* end of the function call, when it is removed from (5), reference count
|
||||
* goes to zero and the connection is destroyed. If not all packets can
|
||||
* be sent, at the end of the function call, the connection is referenced
|
||||
* by (2) and will only be removed once all outgoing packets have been
|
||||
|
@ -123,27 +123,11 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn);
|
|||
*/
|
||||
#define CONN_REF_FLAGS (LSCONN_HASHED \
|
||||
|LSCONN_HAS_OUTGOING \
|
||||
|LSCONN_HAS_INCOMING \
|
||||
|LSCONN_RW_PENDING \
|
||||
|LSCONN_TICKABLE \
|
||||
|LSCONN_CLOSING \
|
||||
|LSCONN_ATTQ)
|
||||
|
||||
|
||||
struct out_heap_elem
|
||||
{
|
||||
struct lsquic_conn *ohe_conn;
|
||||
lsquic_time_t ohe_last_sent;
|
||||
};
|
||||
|
||||
|
||||
struct out_heap
|
||||
{
|
||||
struct out_heap_elem *oh_elems;
|
||||
unsigned oh_nalloc,
|
||||
oh_nelem;
|
||||
};
|
||||
|
||||
|
||||
|
||||
|
||||
struct lsquic_engine
|
||||
|
@ -166,142 +150,22 @@ struct lsquic_engine
|
|||
lsquic_packets_out_f packets_out;
|
||||
void *packets_out_ctx;
|
||||
void *bad_handshake_ctx;
|
||||
struct conn_hash full_conns;
|
||||
TAILQ_HEAD(, lsquic_conn) conns_in, conns_pend_rw;
|
||||
struct out_heap conns_out;
|
||||
/* Use a union because only one iterator is being used at any one time */
|
||||
union {
|
||||
struct {
|
||||
/* This iterator does not have any state: it uses `conns_in' */
|
||||
int ignore;
|
||||
} conn_in;
|
||||
struct {
|
||||
/* This iterator does not have any state: it uses `conns_pend_rw' */
|
||||
int ignore;
|
||||
} rw_pend;
|
||||
struct {
|
||||
/* Iterator state to process connections in Advisory Tick Time
|
||||
* queue.
|
||||
*/
|
||||
lsquic_time_t cutoff;
|
||||
} attq;
|
||||
struct {
|
||||
/* Iterator state to process all connections */
|
||||
int ignore;
|
||||
} all;
|
||||
struct {
|
||||
lsquic_conn_t *conn;
|
||||
} one;
|
||||
} iter_state;
|
||||
struct conn_hash conns_hash;
|
||||
struct min_heap conns_tickable;
|
||||
struct min_heap conns_out;
|
||||
struct eng_hist history;
|
||||
unsigned batch_size;
|
||||
unsigned time_until_desired_tick;
|
||||
struct attq *attq;
|
||||
lsquic_time_t proc_time;
|
||||
/* Track time last time a packet was sent to give new connections
|
||||
* priority lower than that of existing connections.
|
||||
*/
|
||||
lsquic_time_t last_sent;
|
||||
unsigned n_conns;
|
||||
lsquic_time_t deadline;
|
||||
struct out_batch out_batch;
|
||||
};
|
||||
|
||||
|
||||
#define OHE_PARENT(i) ((i - 1) / 2)
|
||||
#define OHE_LCHILD(i) (2 * i + 1)
|
||||
#define OHE_RCHILD(i) (2 * i + 2)
|
||||
|
||||
|
||||
static void
|
||||
heapify_out_heap (struct out_heap *heap, unsigned i)
|
||||
{
|
||||
struct out_heap_elem el;
|
||||
unsigned smallest;
|
||||
|
||||
assert(i < heap->oh_nelem);
|
||||
|
||||
if (OHE_LCHILD(i) < heap->oh_nelem)
|
||||
{
|
||||
if (heap->oh_elems[ OHE_LCHILD(i) ].ohe_last_sent <
|
||||
heap->oh_elems[ i ].ohe_last_sent)
|
||||
smallest = OHE_LCHILD(i);
|
||||
else
|
||||
smallest = i;
|
||||
if (OHE_RCHILD(i) < heap->oh_nelem &&
|
||||
heap->oh_elems[ OHE_RCHILD(i) ].ohe_last_sent <
|
||||
heap->oh_elems[ smallest ].ohe_last_sent)
|
||||
smallest = OHE_RCHILD(i);
|
||||
}
|
||||
else
|
||||
smallest = i;
|
||||
|
||||
if (smallest != i)
|
||||
{
|
||||
el = heap->oh_elems[ smallest ];
|
||||
heap->oh_elems[ smallest ] = heap->oh_elems[ i ];
|
||||
heap->oh_elems[ i ] = el;
|
||||
heapify_out_heap(heap, smallest);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
oh_insert (struct out_heap *heap, lsquic_conn_t *conn)
|
||||
{
|
||||
struct out_heap_elem el;
|
||||
unsigned nalloc, i;
|
||||
|
||||
if (heap->oh_nelem == heap->oh_nalloc)
|
||||
{
|
||||
if (0 == heap->oh_nalloc)
|
||||
nalloc = 4;
|
||||
else
|
||||
nalloc = heap->oh_nalloc * 2;
|
||||
heap->oh_elems = realloc(heap->oh_elems,
|
||||
nalloc * sizeof(heap->oh_elems[0]));
|
||||
if (!heap->oh_elems)
|
||||
{ /* Not much we can do here */
|
||||
LSQ_ERROR("realloc failed");
|
||||
return;
|
||||
}
|
||||
heap->oh_nalloc = nalloc;
|
||||
}
|
||||
|
||||
heap->oh_elems[ heap->oh_nelem ].ohe_conn = conn;
|
||||
heap->oh_elems[ heap->oh_nelem ].ohe_last_sent = conn->cn_last_sent;
|
||||
++heap->oh_nelem;
|
||||
|
||||
i = heap->oh_nelem - 1;
|
||||
while (i > 0 && heap->oh_elems[ OHE_PARENT(i) ].ohe_last_sent >
|
||||
heap->oh_elems[ i ].ohe_last_sent)
|
||||
{
|
||||
el = heap->oh_elems[ OHE_PARENT(i) ];
|
||||
heap->oh_elems[ OHE_PARENT(i) ] = heap->oh_elems[ i ];
|
||||
heap->oh_elems[ i ] = el;
|
||||
i = OHE_PARENT(i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static struct lsquic_conn *
|
||||
oh_pop (struct out_heap *heap)
|
||||
{
|
||||
struct lsquic_conn *conn;
|
||||
|
||||
assert(heap->oh_nelem);
|
||||
|
||||
conn = heap->oh_elems[0].ohe_conn;
|
||||
--heap->oh_nelem;
|
||||
if (heap->oh_nelem > 0)
|
||||
{
|
||||
heap->oh_elems[0] = heap->oh_elems[ heap->oh_nelem ];
|
||||
heapify_out_heap(heap, 0);
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
|
||||
unsigned flags)
|
||||
|
@ -336,7 +200,6 @@ lsquic_engine_init_settings (struct lsquic_engine_settings *settings,
|
|||
settings->es_support_nstp = LSQUIC_DF_SUPPORT_NSTP;
|
||||
settings->es_honor_prst = LSQUIC_DF_HONOR_PRST;
|
||||
settings->es_progress_check = LSQUIC_DF_PROGRESS_CHECK;
|
||||
settings->es_pendrw_check = LSQUIC_DF_PENDRW_CHECK;
|
||||
settings->es_rw_once = LSQUIC_DF_RW_ONCE;
|
||||
settings->es_proc_time_thresh= LSQUIC_DF_PROC_TIME_THRESH;
|
||||
settings->es_pace_packets = LSQUIC_DF_PACE_PACKETS;
|
||||
|
@ -456,9 +319,7 @@ lsquic_engine_new (unsigned flags,
|
|||
engine->pub.enp_pmi_ctx = NULL;
|
||||
}
|
||||
engine->pub.enp_engine = engine;
|
||||
TAILQ_INIT(&engine->conns_in);
|
||||
TAILQ_INIT(&engine->conns_pend_rw);
|
||||
conn_hash_init(&engine->full_conns, ~0);
|
||||
conn_hash_init(&engine->conns_hash);
|
||||
engine->attq = attq_create();
|
||||
eng_hist_init(&engine->history);
|
||||
engine->batch_size = INITIAL_OUT_BATCH_SIZE;
|
||||
|
@ -489,23 +350,62 @@ shrink_batch_size (struct lsquic_engine *engine)
|
|||
static void
|
||||
destroy_conn (struct lsquic_engine *engine, lsquic_conn_t *conn)
|
||||
{
|
||||
conn->cn_flags |= LSCONN_NEVER_PEND_RW;
|
||||
--engine->n_conns;
|
||||
conn->cn_flags |= LSCONN_NEVER_TICKABLE;
|
||||
conn->cn_if->ci_destroy(conn);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
maybe_grow_conn_heaps (struct lsquic_engine *engine)
|
||||
{
|
||||
struct min_heap_elem *els;
|
||||
unsigned count;
|
||||
|
||||
if (engine->n_conns < lsquic_mh_nalloc(&engine->conns_tickable))
|
||||
return 0; /* Nothing to do */
|
||||
|
||||
if (lsquic_mh_nalloc(&engine->conns_tickable))
|
||||
count = lsquic_mh_nalloc(&engine->conns_tickable) * 2 * 2;
|
||||
else
|
||||
count = 8;
|
||||
|
||||
els = malloc(sizeof(els[0]) * count);
|
||||
if (!els)
|
||||
{
|
||||
LSQ_ERROR("%s: malloc failed", __func__);
|
||||
return -1;
|
||||
}
|
||||
|
||||
LSQ_DEBUG("grew heaps to %u elements", count / 2);
|
||||
memcpy(&els[0], engine->conns_tickable.mh_elems,
|
||||
sizeof(els[0]) * lsquic_mh_count(&engine->conns_tickable));
|
||||
memcpy(&els[count / 2], engine->conns_out.mh_elems,
|
||||
sizeof(els[0]) * lsquic_mh_count(&engine->conns_out));
|
||||
free(engine->conns_tickable.mh_elems);
|
||||
engine->conns_tickable.mh_elems = els;
|
||||
engine->conns_out.mh_elems = &els[count / 2];
|
||||
engine->conns_tickable.mh_nalloc = count / 2;
|
||||
engine->conns_out.mh_nalloc = count / 2;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static lsquic_conn_t *
|
||||
new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
|
||||
unsigned short max_packet_size)
|
||||
{
|
||||
lsquic_conn_t *conn;
|
||||
unsigned flags;
|
||||
if (0 != maybe_grow_conn_heaps(engine))
|
||||
return NULL;
|
||||
flags = engine->flags & (ENG_SERVER|ENG_HTTP);
|
||||
conn = full_conn_client_new(&engine->pub, engine->stream_if,
|
||||
engine->stream_if_ctx, flags, hostname, max_packet_size);
|
||||
if (!conn)
|
||||
return NULL;
|
||||
if (0 != conn_hash_add(&engine->full_conns, conn))
|
||||
++engine->n_conns;
|
||||
if (0 != conn_hash_add(&engine->conns_hash, conn))
|
||||
{
|
||||
LSQ_WARN("cannot add connection %"PRIu64" to hash - destroy",
|
||||
conn->cn_cid);
|
||||
|
@ -514,7 +414,7 @@ new_full_conn_client (lsquic_engine_t *engine, const char *hostname,
|
|||
}
|
||||
assert(!(conn->cn_flags &
|
||||
(CONN_REF_FLAGS
|
||||
& ~LSCONN_RW_PENDING /* This flag may be set as effect of user
|
||||
& ~LSCONN_TICKABLE /* This flag may be set as effect of user
|
||||
callbacks */
|
||||
)));
|
||||
conn->cn_flags |= LSCONN_HASHED;
|
||||
|
@ -542,7 +442,7 @@ find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
conn = conn_hash_find(&engine->full_conns, packet_in->pi_conn_id);
|
||||
conn = conn_hash_find(&engine->conns_hash, packet_in->pi_conn_id);
|
||||
if (conn)
|
||||
{
|
||||
conn->cn_pf->pf_parse_packet_in_finish(packet_in, ppstate);
|
||||
|
@ -553,41 +453,19 @@ find_or_create_conn (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
add_conn_to_pend_rw (lsquic_engine_t *engine, lsquic_conn_t *conn,
|
||||
enum rw_reason reason)
|
||||
{
|
||||
int hist_idx;
|
||||
|
||||
TAILQ_INSERT_TAIL(&engine->conns_pend_rw, conn, cn_next_pend_rw);
|
||||
engine_incref_conn(conn, LSCONN_RW_PENDING);
|
||||
|
||||
hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
|
||||
conn->cn_rw_hist_buf[ hist_idx ] = reason;
|
||||
++conn->cn_rw_hist_idx;
|
||||
|
||||
if ((int) sizeof(conn->cn_rw_hist_buf) - 1 == hist_idx)
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c'), "
|
||||
"rw_hist: %.*s", (char) reason,
|
||||
(int) sizeof(conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
|
||||
else
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid, "added to pending RW queue ('%c')",
|
||||
(char) reason);
|
||||
}
|
||||
|
||||
|
||||
#if !defined(NDEBUG) && __GNUC__
|
||||
__attribute__((weak))
|
||||
#endif
|
||||
void
|
||||
lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
|
||||
lsquic_conn_t *conn, enum rw_reason reason)
|
||||
lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
|
||||
lsquic_conn_t *conn)
|
||||
{
|
||||
if (0 == (enpub->enp_flags & ENPUB_PROC) &&
|
||||
0 == (conn->cn_flags & (LSCONN_RW_PENDING|LSCONN_NEVER_PEND_RW)))
|
||||
0 == (conn->cn_flags & (LSCONN_TICKABLE|LSCONN_NEVER_TICKABLE)))
|
||||
{
|
||||
lsquic_engine_t *engine = (lsquic_engine_t *) enpub;
|
||||
add_conn_to_pend_rw(engine, conn, reason);
|
||||
lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
|
||||
engine_incref_conn(conn, LSCONN_TICKABLE);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -597,72 +475,27 @@ lsquic_engine_add_conn_to_attq (struct lsquic_engine_public *enpub,
|
|||
lsquic_conn_t *conn, lsquic_time_t tick_time)
|
||||
{
|
||||
lsquic_engine_t *const engine = (lsquic_engine_t *) enpub;
|
||||
/* Instead of performing an update, we simply remove the connection from
|
||||
* the queue and add it back. This should not happen in at the time of
|
||||
* this writing.
|
||||
*/
|
||||
if (conn->cn_flags & LSCONN_ATTQ)
|
||||
if (conn->cn_flags & LSCONN_TICKABLE)
|
||||
{
|
||||
/* Optimization: no need to add the connection to the Advisory Tick
|
||||
* Time Queue: it is about to be ticked, after which it its next tick
|
||||
* time may be queried again.
|
||||
*/;
|
||||
}
|
||||
else if (conn->cn_flags & LSCONN_ATTQ)
|
||||
{
|
||||
if (lsquic_conn_adv_time(conn) != tick_time)
|
||||
{
|
||||
attq_remove(engine->attq, conn);
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
if (0 != attq_add(engine->attq, conn, tick_time))
|
||||
engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
}
|
||||
if (conn && !(conn->cn_flags & LSCONN_ATTQ) &&
|
||||
0 == attq_maybe_add(engine->attq, conn, tick_time))
|
||||
}
|
||||
else if (0 == attq_add(engine->attq, conn, tick_time))
|
||||
engine_incref_conn(conn, LSCONN_ATTQ);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
update_pend_rw_progress (lsquic_engine_t *engine, lsquic_conn_t *conn,
|
||||
int progress_made)
|
||||
{
|
||||
rw_hist_idx_t hist_idx;
|
||||
const unsigned char *empty;
|
||||
const unsigned pendrw_check = engine->pub.enp_settings.es_pendrw_check;
|
||||
|
||||
if (!pendrw_check)
|
||||
return;
|
||||
|
||||
/* Convert previous entry to uppercase: */
|
||||
hist_idx = (conn->cn_rw_hist_idx - 1) & ((1 << RW_HIST_BITS) - 1);
|
||||
conn->cn_rw_hist_buf[ hist_idx ] -= 0x20;
|
||||
|
||||
LSQ_DEBUG("conn %"PRIu64": progress: %d", conn->cn_cid, !!progress_made);
|
||||
if (progress_made)
|
||||
{
|
||||
conn->cn_noprogress_count = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid, "Pending RW Queue processing made "
|
||||
"no progress");
|
||||
++conn->cn_noprogress_count;
|
||||
if (conn->cn_noprogress_count <= pendrw_check)
|
||||
return;
|
||||
|
||||
conn->cn_flags |= LSCONN_NEVER_PEND_RW;
|
||||
empty = memchr(conn->cn_rw_hist_buf, RW_REASON_EMPTY,
|
||||
sizeof(conn->cn_rw_hist_buf));
|
||||
if (empty)
|
||||
LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
|
||||
"(rw_hist: %.*s): will not put it onto Pend RW queue again",
|
||||
conn->cn_cid, conn->cn_noprogress_count,
|
||||
(int) (empty - conn->cn_rw_hist_buf), conn->cn_rw_hist_buf);
|
||||
else
|
||||
{
|
||||
hist_idx = conn->cn_rw_hist_idx & ((1 << RW_HIST_BITS) - 1);
|
||||
LSQ_WARN("conn %"PRIu64" noprogress count reached %u "
|
||||
"(rw_hist: %.*s%.*s): will not put it onto Pend RW queue again",
|
||||
conn->cn_cid, conn->cn_noprogress_count,
|
||||
/* First part of history: */
|
||||
(int) (sizeof(conn->cn_rw_hist_buf) - hist_idx),
|
||||
conn->cn_rw_hist_buf + hist_idx,
|
||||
/* Second part of history: */
|
||||
hist_idx, conn->cn_rw_hist_buf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Return 0 if packet is being processed by a connections, otherwise return 1 */
|
||||
static int
|
||||
process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
|
||||
|
@ -678,9 +511,10 @@ process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
|
|||
return 1;
|
||||
}
|
||||
|
||||
if (0 == (conn->cn_flags & LSCONN_HAS_INCOMING)) {
|
||||
TAILQ_INSERT_TAIL(&engine->conns_in, conn, cn_next_in);
|
||||
engine_incref_conn(conn, LSCONN_HAS_INCOMING);
|
||||
if (0 == (conn->cn_flags & LSCONN_TICKABLE))
|
||||
{
|
||||
lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
|
||||
engine_incref_conn(conn, LSCONN_TICKABLE);
|
||||
}
|
||||
lsquic_conn_record_sockaddr(conn, sa_local, sa_peer);
|
||||
lsquic_packet_in_upref(packet_in);
|
||||
|
@ -691,101 +525,11 @@ process_packet_in (lsquic_engine_t *engine, lsquic_packet_in_t *packet_in,
|
|||
}
|
||||
|
||||
|
||||
static int
|
||||
conn_attq_expired (const struct lsquic_engine *engine,
|
||||
const lsquic_conn_t *conn)
|
||||
{
|
||||
assert(conn->cn_attq_elem);
|
||||
return lsquic_conn_adv_time(conn) < engine->proc_time;
|
||||
}
|
||||
|
||||
|
||||
/* Iterator for connections with incoming packets */
|
||||
static lsquic_conn_t *
|
||||
conn_iter_next_incoming (struct lsquic_engine *engine)
|
||||
{
|
||||
enum lsquic_conn_flags addl_flags;
|
||||
lsquic_conn_t *conn;
|
||||
while ((conn = TAILQ_FIRST(&engine->conns_in)))
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
|
||||
if (conn->cn_flags & LSCONN_RW_PENDING)
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid,
|
||||
"removed from pending RW queue (processing incoming)");
|
||||
}
|
||||
if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
|
||||
{
|
||||
addl_flags = LSCONN_ATTQ;
|
||||
attq_remove(engine->attq, conn);
|
||||
}
|
||||
else
|
||||
addl_flags = 0;
|
||||
conn = engine_decref_conn(engine, conn,
|
||||
LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
|
||||
if (conn)
|
||||
break;
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
/* Iterator for connections with that have pending read/write events */
|
||||
static lsquic_conn_t *
|
||||
conn_iter_next_rw_pend (struct lsquic_engine *engine)
|
||||
{
|
||||
enum lsquic_conn_flags addl_flags;
|
||||
lsquic_conn_t *conn;
|
||||
while ((conn = TAILQ_FIRST(&engine->conns_pend_rw)))
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid,
|
||||
"removed from pending RW queue (processing pending RW conns)");
|
||||
if (conn->cn_flags & LSCONN_HAS_INCOMING)
|
||||
TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
|
||||
if ((conn->cn_flags & LSCONN_ATTQ) && conn_attq_expired(engine, conn))
|
||||
{
|
||||
addl_flags = LSCONN_ATTQ;
|
||||
attq_remove(engine->attq, conn);
|
||||
}
|
||||
else
|
||||
addl_flags = 0;
|
||||
conn = engine_decref_conn(engine, conn,
|
||||
LSCONN_RW_PENDING|LSCONN_HAS_INCOMING|addl_flags);
|
||||
if (conn)
|
||||
break;
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_engine_process_conns_with_incoming (lsquic_engine_t *engine)
|
||||
{
|
||||
LSQ_DEBUG("process connections with incoming packets");
|
||||
ENGINE_IN(engine);
|
||||
process_connections(engine, conn_iter_next_incoming);
|
||||
assert(TAILQ_EMPTY(&engine->conns_in));
|
||||
ENGINE_OUT(engine);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
lsquic_engine_has_pend_rw (lsquic_engine_t *engine)
|
||||
lsquic_engine_has_tickable (lsquic_engine_t *engine)
|
||||
{
|
||||
return !(engine->flags & ENG_PAST_DEADLINE)
|
||||
&& !TAILQ_EMPTY(&engine->conns_pend_rw);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_engine_process_conns_with_pend_rw (lsquic_engine_t *engine)
|
||||
{
|
||||
LSQ_DEBUG("process connections with pending RW events");
|
||||
ENGINE_IN(engine);
|
||||
process_connections(engine, conn_iter_next_rw_pend);
|
||||
ENGINE_OUT(engine);
|
||||
&& lsquic_mh_count(&engine->conns_tickable) > 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -799,74 +543,33 @@ lsquic_engine_destroy (lsquic_engine_t *engine)
|
|||
engine->flags |= ENG_DTOR;
|
||||
#endif
|
||||
|
||||
while (engine->conns_out.oh_nelem > 0)
|
||||
while ((conn = lsquic_mh_pop(&engine->conns_out)))
|
||||
{
|
||||
--engine->conns_out.oh_nelem;
|
||||
conn = engine->conns_out.oh_elems[
|
||||
engine->conns_out.oh_nelem ].ohe_conn;
|
||||
assert(conn->cn_flags & LSCONN_HAS_OUTGOING);
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_HAS_OUTGOING);
|
||||
}
|
||||
|
||||
for (conn = conn_hash_first(&engine->full_conns); conn;
|
||||
conn = conn_hash_next(&engine->full_conns))
|
||||
while ((conn = lsquic_mh_pop(&engine->conns_tickable)))
|
||||
{
|
||||
assert(conn->cn_flags & LSCONN_TICKABLE);
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_TICKABLE);
|
||||
}
|
||||
|
||||
for (conn = conn_hash_first(&engine->conns_hash); conn;
|
||||
conn = conn_hash_next(&engine->conns_hash))
|
||||
force_close_conn(engine, conn);
|
||||
conn_hash_cleanup(&engine->full_conns);
|
||||
|
||||
conn_hash_cleanup(&engine->conns_hash);
|
||||
|
||||
assert(0 == engine->n_conns);
|
||||
attq_destroy(engine->attq);
|
||||
|
||||
assert(0 == engine->conns_out.oh_nelem);
|
||||
assert(TAILQ_EMPTY(&engine->conns_pend_rw));
|
||||
lsquic_mm_cleanup(&engine->pub.enp_mm);
|
||||
free(engine->conns_out.oh_elems);
|
||||
assert(0 == lsquic_mh_count(&engine->conns_out));
|
||||
assert(0 == lsquic_mh_count(&engine->conns_tickable));
|
||||
free(engine->conns_tickable.mh_elems);
|
||||
free(engine);
|
||||
}
|
||||
|
||||
|
||||
#if __GNUC__
|
||||
__attribute__((nonnull(3)))
|
||||
#endif
|
||||
static lsquic_conn_t *
|
||||
remove_from_inc_andor_pend_rw (lsquic_engine_t *engine,
|
||||
lsquic_conn_t *conn, const char *reason)
|
||||
{
|
||||
assert(conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING));
|
||||
if (conn->cn_flags & LSCONN_HAS_INCOMING)
|
||||
TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
|
||||
if (conn->cn_flags & LSCONN_RW_PENDING)
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid,
|
||||
"removed from pending RW queue (%s)", reason);
|
||||
}
|
||||
conn = engine_decref_conn(engine, conn,
|
||||
LSCONN_HAS_INCOMING|LSCONN_RW_PENDING);
|
||||
assert(conn);
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
static lsquic_conn_t *
|
||||
conn_iter_next_one (lsquic_engine_t *engine)
|
||||
{
|
||||
lsquic_conn_t *conn = engine->iter_state.one.conn;
|
||||
if (conn)
|
||||
{
|
||||
if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
|
||||
conn = remove_from_inc_andor_pend_rw(engine, conn, "connect");
|
||||
if (conn && (conn->cn_flags & LSCONN_ATTQ) &&
|
||||
conn_attq_expired(engine, conn))
|
||||
{
|
||||
attq_remove(engine->attq, conn);
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
}
|
||||
engine->iter_state.one.conn = NULL;
|
||||
}
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
lsquic_conn_t *
|
||||
lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
|
||||
void *peer_ctx, lsquic_conn_ctx_t *conn_ctx,
|
||||
|
@ -896,13 +599,13 @@ lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
|
|||
conn = new_full_conn_client(engine, hostname, max_packet_size);
|
||||
if (!conn)
|
||||
return NULL;
|
||||
lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
|
||||
engine_incref_conn(conn, LSCONN_TICKABLE);
|
||||
ENGINE_IN(engine);
|
||||
lsquic_conn_record_peer_sa(conn, peer_sa);
|
||||
conn->cn_peer_ctx = peer_ctx;
|
||||
lsquic_conn_set_ctx(conn, conn_ctx);
|
||||
engine->iter_state.one.conn = conn;
|
||||
full_conn_client_call_on_new(conn);
|
||||
process_connections(engine, conn_iter_next_one);
|
||||
ENGINE_OUT(engine);
|
||||
return conn;
|
||||
}
|
||||
|
@ -911,19 +614,18 @@ lsquic_engine_connect (lsquic_engine_t *engine, const struct sockaddr *peer_sa,
|
|||
static void
|
||||
remove_conn_from_hash (lsquic_engine_t *engine, lsquic_conn_t *conn)
|
||||
{
|
||||
conn_hash_remove(&engine->full_conns, conn);
|
||||
conn_hash_remove(&engine->conns_hash, conn);
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_HASHED);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
refflags2str (enum lsquic_conn_flags flags, char s[7])
|
||||
refflags2str (enum lsquic_conn_flags flags, char s[6])
|
||||
{
|
||||
*s = 'C'; s += !!(flags & LSCONN_CLOSING);
|
||||
*s = 'H'; s += !!(flags & LSCONN_HASHED);
|
||||
*s = 'O'; s += !!(flags & LSCONN_HAS_OUTGOING);
|
||||
*s = 'I'; s += !!(flags & LSCONN_HAS_INCOMING);
|
||||
*s = 'R'; s += !!(flags & LSCONN_RW_PENDING);
|
||||
*s = 'T'; s += !!(flags & LSCONN_TICKABLE);
|
||||
*s = 'A'; s += !!(flags & LSCONN_ATTQ);
|
||||
*s = '\0';
|
||||
}
|
||||
|
@ -932,7 +634,7 @@ refflags2str (enum lsquic_conn_flags flags, char s[7])
|
|||
static void
|
||||
engine_incref_conn (lsquic_conn_t *conn, enum lsquic_conn_flags flag)
|
||||
{
|
||||
char str[7];
|
||||
char str[6];
|
||||
assert(flag & CONN_REF_FLAGS);
|
||||
assert(!(conn->cn_flags & flag));
|
||||
conn->cn_flags |= flag;
|
||||
|
@ -945,7 +647,7 @@ static lsquic_conn_t *
|
|||
engine_decref_conn (lsquic_engine_t *engine, lsquic_conn_t *conn,
|
||||
enum lsquic_conn_flags flags)
|
||||
{
|
||||
char str[7];
|
||||
char str[6];
|
||||
assert(flags & CONN_REF_FLAGS);
|
||||
assert(conn->cn_flags & flags);
|
||||
#ifndef NDEBUG
|
||||
|
@ -974,41 +676,32 @@ force_close_conn (lsquic_engine_t *engine, lsquic_conn_t *conn)
|
|||
const enum lsquic_conn_flags flags = conn->cn_flags;
|
||||
assert(conn->cn_flags & CONN_REF_FLAGS);
|
||||
assert(!(flags & LSCONN_HAS_OUTGOING)); /* Should be removed already */
|
||||
assert(!(flags & LSCONN_TICKABLE)); /* Should be removed already */
|
||||
assert(!(flags & LSCONN_CLOSING)); /* It is in transient queue? */
|
||||
if (flags & LSCONN_HAS_INCOMING)
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_in, conn, cn_next_in);
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_HAS_INCOMING);
|
||||
}
|
||||
if (flags & LSCONN_RW_PENDING)
|
||||
{
|
||||
TAILQ_REMOVE(&engine->conns_pend_rw, conn, cn_next_pend_rw);
|
||||
EV_LOG_CONN_EVENT(conn->cn_cid,
|
||||
"removed from pending RW queue (engine destruction)");
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_RW_PENDING);
|
||||
}
|
||||
if (flags & LSCONN_ATTQ)
|
||||
{
|
||||
attq_remove(engine->attq, conn);
|
||||
(void) engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
}
|
||||
if (flags & LSCONN_HASHED)
|
||||
remove_conn_from_hash(engine, conn);
|
||||
}
|
||||
|
||||
|
||||
/* Iterator for all connections.
|
||||
* Returned connections are removed from the Incoming, Pending RW Event,
|
||||
* and Advisory Tick Time queues if necessary.
|
||||
/* Iterator for tickable connections (those on the Tickable Queue). Before
|
||||
* a connection is returned, it is removed from the Advisory Tick Time queue
|
||||
* if necessary.
|
||||
*/
|
||||
static lsquic_conn_t *
|
||||
conn_iter_next_all (struct lsquic_engine *engine)
|
||||
conn_iter_next_tickable (struct lsquic_engine *engine)
|
||||
{
|
||||
lsquic_conn_t *conn;
|
||||
|
||||
conn = conn_hash_next(&engine->full_conns);
|
||||
conn = lsquic_mh_pop(&engine->conns_tickable);
|
||||
|
||||
if (conn && (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING)))
|
||||
conn = remove_from_inc_andor_pend_rw(engine, conn, "process all");
|
||||
if (conn && (conn->cn_flags & LSCONN_ATTQ)
|
||||
&& conn_attq_expired(engine, conn))
|
||||
if (conn)
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_TICKABLE);
|
||||
if (conn && (conn->cn_flags & LSCONN_ATTQ))
|
||||
{
|
||||
attq_remove(engine->attq, conn);
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
|
@ -1018,62 +711,26 @@ conn_iter_next_all (struct lsquic_engine *engine)
|
|||
}
|
||||
|
||||
|
||||
static lsquic_conn_t *
|
||||
conn_iter_next_attq (struct lsquic_engine *engine)
|
||||
void
|
||||
lsquic_engine_process_conns (lsquic_engine_t *engine)
|
||||
{
|
||||
lsquic_conn_t *conn;
|
||||
lsquic_time_t now;
|
||||
|
||||
conn = attq_pop(engine->attq, engine->iter_state.attq.cutoff);
|
||||
if (conn)
|
||||
{
|
||||
assert(conn->cn_flags & LSCONN_ATTQ);
|
||||
if (conn->cn_flags & (LSCONN_HAS_INCOMING|LSCONN_RW_PENDING))
|
||||
conn = remove_from_inc_andor_pend_rw(engine, conn, "process attq");
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_engine_proc_all (lsquic_engine_t *engine)
|
||||
{
|
||||
ENGINE_IN(engine);
|
||||
/* We poke each connection every time as initial implementation. If it
|
||||
* proves to be too inefficient, we will need to figure out
|
||||
* a) when to stop processing; and
|
||||
* b) how to remember state between calls.
|
||||
*/
|
||||
conn_hash_reset_iter(&engine->full_conns);
|
||||
process_connections(engine, conn_iter_next_all);
|
||||
ENGINE_OUT(engine);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_engine_process_conns_to_tick (lsquic_engine_t *engine)
|
||||
{
|
||||
lsquic_time_t prev_min, now;
|
||||
|
||||
now = lsquic_time_now();
|
||||
if (LSQ_LOG_ENABLED(LSQ_LOG_DEBUG))
|
||||
while ((conn = attq_pop(engine->attq, now)))
|
||||
{
|
||||
const lsquic_time_t *expected_time;
|
||||
int64_t diff;
|
||||
expected_time = attq_next_time(engine->attq);
|
||||
if (expected_time)
|
||||
diff = *expected_time - now;
|
||||
else
|
||||
diff = -1;
|
||||
LSQ_DEBUG("process connections in attq; time diff: %"PRIi64, diff);
|
||||
conn = engine_decref_conn(engine, conn, LSCONN_ATTQ);
|
||||
if (conn && !(conn->cn_flags & LSCONN_TICKABLE))
|
||||
{
|
||||
lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
|
||||
engine_incref_conn(conn, LSCONN_TICKABLE);
|
||||
}
|
||||
}
|
||||
|
||||
ENGINE_IN(engine);
|
||||
prev_min = attq_set_min(engine->attq, now); /* Prevent infinite loop */
|
||||
engine->iter_state.attq.cutoff = now;
|
||||
process_connections(engine, conn_iter_next_attq);
|
||||
attq_set_min(engine->attq, prev_min); /* Restore previos value */
|
||||
process_connections(engine, conn_iter_next_tickable, now);
|
||||
ENGINE_OUT(engine);
|
||||
}
|
||||
|
||||
|
@ -1164,12 +821,12 @@ encrypt_packet (lsquic_engine_t *engine, const lsquic_conn_t *conn,
|
|||
}
|
||||
|
||||
|
||||
STAILQ_HEAD(closed_conns, lsquic_conn);
|
||||
STAILQ_HEAD(conns_stailq, lsquic_conn);
|
||||
|
||||
|
||||
struct conns_out_iter
|
||||
{
|
||||
struct out_heap *coi_heap;
|
||||
struct min_heap *coi_heap;
|
||||
TAILQ_HEAD(, lsquic_conn) coi_active_list,
|
||||
coi_inactive_list;
|
||||
lsquic_conn_t *coi_next;
|
||||
|
@ -1197,9 +854,9 @@ coi_next (struct conns_out_iter *iter)
|
|||
{
|
||||
lsquic_conn_t *conn;
|
||||
|
||||
if (iter->coi_heap->oh_nelem > 0)
|
||||
if (lsquic_mh_count(iter->coi_heap) > 0)
|
||||
{
|
||||
conn = oh_pop(iter->coi_heap);
|
||||
conn = lsquic_mh_pop(iter->coi_heap);
|
||||
TAILQ_INSERT_TAIL(&iter->coi_active_list, conn, cn_next_out);
|
||||
conn->cn_flags |= LSCONN_COI_ACTIVE;
|
||||
#ifndef NDEBUG
|
||||
|
@ -1268,7 +925,7 @@ coi_reheap (struct conns_out_iter *iter, lsquic_engine_t *engine)
|
|||
{
|
||||
TAILQ_REMOVE(&iter->coi_active_list, conn, cn_next_out);
|
||||
conn->cn_flags &= ~LSCONN_COI_ACTIVE;
|
||||
oh_insert(iter->coi_heap, conn);
|
||||
lsquic_mh_insert(iter->coi_heap, conn, conn->cn_last_sent);
|
||||
}
|
||||
while ((conn = TAILQ_FIRST(&iter->coi_inactive_list)))
|
||||
{
|
||||
|
@ -1358,7 +1015,7 @@ check_deadline (lsquic_engine_t *engine)
|
|||
|
||||
static void
|
||||
send_packets_out (struct lsquic_engine *engine,
|
||||
struct closed_conns *closed_conns)
|
||||
struct conns_stailq *closed_conns)
|
||||
{
|
||||
unsigned n, w, n_sent, n_batches_sent;
|
||||
lsquic_packet_out_t *packet_out;
|
||||
|
@ -1471,7 +1128,7 @@ int
|
|||
lsquic_engine_has_unsent_packets (lsquic_engine_t *engine)
|
||||
{
|
||||
return !(engine->flags & ENG_PAST_DEADLINE)
|
||||
&& ( engine->conns_out.oh_nelem > 0
|
||||
&& ( lsquic_mh_count(&engine->conns_out) > 0
|
||||
)
|
||||
;
|
||||
}
|
||||
|
@ -1490,7 +1147,7 @@ void
|
|||
lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
|
||||
{
|
||||
lsquic_conn_t *conn;
|
||||
struct closed_conns closed_conns;
|
||||
struct conns_stailq closed_conns;
|
||||
|
||||
STAILQ_INIT(&closed_conns);
|
||||
reset_deadline(engine, lsquic_time_now());
|
||||
|
@ -1506,29 +1163,32 @@ lsquic_engine_send_unsent_packets (lsquic_engine_t *engine)
|
|||
|
||||
|
||||
static void
|
||||
process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
|
||||
process_connections (lsquic_engine_t *engine, conn_iter_f next_conn,
|
||||
lsquic_time_t now)
|
||||
{
|
||||
lsquic_conn_t *conn;
|
||||
enum tick_st tick_st;
|
||||
lsquic_time_t now = lsquic_time_now();
|
||||
struct closed_conns closed_conns;
|
||||
unsigned i;
|
||||
lsquic_time_t next_tick_time;
|
||||
struct conns_stailq closed_conns, ticked_conns;
|
||||
|
||||
engine->proc_time = now;
|
||||
eng_hist_tick(&engine->history, now);
|
||||
|
||||
STAILQ_INIT(&closed_conns);
|
||||
STAILQ_INIT(&ticked_conns);
|
||||
reset_deadline(engine, now);
|
||||
|
||||
while ((conn = next_conn(engine)))
|
||||
i = 0;
|
||||
while ((conn = next_conn(engine))
|
||||
)
|
||||
{
|
||||
tick_st = conn->cn_if->ci_tick(conn, now);
|
||||
if (conn_iter_next_rw_pend == next_conn)
|
||||
update_pend_rw_progress(engine, conn, tick_st & TICK_PROGRESS);
|
||||
conn->cn_last_ticked = now + i /* Maintain relative order */ ++;
|
||||
if (tick_st & TICK_SEND)
|
||||
{
|
||||
if (!(conn->cn_flags & LSCONN_HAS_OUTGOING))
|
||||
{
|
||||
oh_insert(&engine->conns_out, conn);
|
||||
lsquic_mh_insert(&engine->conns_out, conn, conn->cn_last_sent);
|
||||
engine_incref_conn(conn, LSCONN_HAS_OUTGOING);
|
||||
}
|
||||
}
|
||||
|
@ -1539,6 +1199,8 @@ process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
|
|||
if (conn->cn_flags & LSCONN_HASHED)
|
||||
remove_conn_from_hash(engine, conn);
|
||||
}
|
||||
else
|
||||
STAILQ_INSERT_TAIL(&ticked_conns, conn, cn_next_ticked);
|
||||
}
|
||||
|
||||
if (lsquic_engine_has_unsent_packets(engine))
|
||||
|
@ -1549,6 +1211,31 @@ process_connections (lsquic_engine_t *engine, conn_iter_f next_conn)
|
|||
(void) engine_decref_conn(engine, conn, LSCONN_CLOSING);
|
||||
}
|
||||
|
||||
/* TODO Heapification can be optimized by switching to the Floyd method:
|
||||
* https://en.wikipedia.org/wiki/Binary_heap#Building_a_heap
|
||||
*/
|
||||
while ((conn = STAILQ_FIRST(&ticked_conns)))
|
||||
{
|
||||
STAILQ_REMOVE_HEAD(&ticked_conns, cn_next_ticked);
|
||||
if (!(conn->cn_flags & LSCONN_TICKABLE)
|
||||
&& conn->cn_if->ci_is_tickable(conn))
|
||||
{
|
||||
lsquic_mh_insert(&engine->conns_tickable, conn, conn->cn_last_ticked);
|
||||
engine_incref_conn(conn, LSCONN_TICKABLE);
|
||||
}
|
||||
else if (!(conn->cn_flags & LSCONN_ATTQ))
|
||||
{
|
||||
next_tick_time = conn->cn_if->ci_next_tick_time(conn);
|
||||
if (next_tick_time)
|
||||
{
|
||||
if (0 == attq_add(engine->attq, conn, next_tick_time))
|
||||
engine_incref_conn(conn, LSCONN_ATTQ);
|
||||
}
|
||||
else
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -1613,6 +1300,12 @@ lsquic_engine_earliest_adv_tick (lsquic_engine_t *engine, int *diff)
|
|||
const lsquic_time_t *next_time;
|
||||
lsquic_time_t now;
|
||||
|
||||
if (lsquic_mh_count(&engine->conns_tickable))
|
||||
{
|
||||
*diff = 0;
|
||||
return 1;
|
||||
}
|
||||
|
||||
next_time = attq_next_time(engine->attq);
|
||||
if (!next_time)
|
||||
return 0;
|
||||
|
|
|
@ -26,45 +26,12 @@ struct lsquic_engine_public {
|
|||
unsigned enp_ver_tags_len;
|
||||
};
|
||||
|
||||
/* These values are printable ASCII characters for ease of printing the
|
||||
* whole history in a single line of a log message. If connection was
|
||||
* processed as result of being put onto the queue, the letter is converted
|
||||
* to uppercase.
|
||||
*
|
||||
* The letters are assigned by first letter of the verb for most obvious
|
||||
* and important actions, like "read" and "write" and other letters of
|
||||
* the verb or some other letters for other actions.
|
||||
*
|
||||
* Each reason is either expected to produce user read from the stream
|
||||
* or putting stream data into packet for sending out. This is documented
|
||||
* in a separate comment column below.
|
||||
*/
|
||||
enum rw_reason
|
||||
{
|
||||
RW_REASON_EMPTY = '\0', /* No init required */
|
||||
|
||||
/* Expected action: */
|
||||
RW_REASON_USER_WRITE = 'w', /* write */
|
||||
RW_REASON_USER_WRITEV = 'v', /* write */
|
||||
RW_REASON_USER_READ = 'r', /* write (WINDOW_UPDATE frame) */
|
||||
RW_REASON_FLUSH = 'f', /* write */
|
||||
RW_REASON_STREAM_CLOSE = 'c', /* write */
|
||||
RW_REASON_RST_IN = 'n', /* read */
|
||||
RW_REASON_STREAM_IN = 'd', /* read */
|
||||
RW_REASON_RESET_EXT = 'e', /* write */
|
||||
RW_REASON_WANTREAD = 'a', /* read */
|
||||
RW_REASON_SHUTDOWN = 'u', /* write */
|
||||
RW_REASON_WRITEFILE = 't', /* write */
|
||||
RW_REASON_SENDFILE = 's', /* write */
|
||||
};
|
||||
|
||||
/* Put connection onto Pending RW Queue if it is not already on it. If
|
||||
/* Put connection onto the Tickable Queue if it is not already on it. If
|
||||
* connection is being destroyed, this is a no-op.
|
||||
* XXX Is the bit about "being destroyed" still true?
|
||||
*/
|
||||
void
|
||||
lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
|
||||
lsquic_conn_t *conn, enum rw_reason);
|
||||
lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *,
|
||||
lsquic_conn_t *);
|
||||
|
||||
/* Put connection onto Advisory Tick Time Queue if it is not already on it.
|
||||
*/
|
||||
|
|
|
@ -255,9 +255,6 @@ reset_ack_state (struct full_conn *conn);
|
|||
static int
|
||||
write_is_possible (struct full_conn *);
|
||||
|
||||
static int
|
||||
dispatch_stream_read_events (struct full_conn *, struct lsquic_stream *);
|
||||
|
||||
static const struct headers_stream_callbacks *headers_callbacks_ptr;
|
||||
|
||||
#if KEEP_CLOSED_STREAM_HISTORY
|
||||
|
@ -1168,7 +1165,7 @@ process_stream_frame (struct full_conn *conn, lsquic_packet_in_t *packet_in,
|
|||
* has not been completed yet. Nevertheless, this is good enough
|
||||
* for now.
|
||||
*/
|
||||
dispatch_stream_read_events(conn, stream);
|
||||
lsquic_stream_dispatch_read_events(stream);
|
||||
}
|
||||
|
||||
return parsed_len;
|
||||
|
@ -2350,43 +2347,23 @@ service_streams (struct full_conn *conn)
|
|||
}
|
||||
|
||||
|
||||
static int
|
||||
dispatch_stream_read_events (struct full_conn *conn, lsquic_stream_t *stream)
|
||||
{
|
||||
struct stream_read_prog_status saved_status;
|
||||
int progress_made;
|
||||
|
||||
lsquic_stream_get_read_prog_status(stream, &saved_status);
|
||||
lsquic_stream_dispatch_read_events(stream);
|
||||
progress_made = lsquic_stream_progress_was_made(stream, &saved_status);
|
||||
|
||||
return progress_made;
|
||||
}
|
||||
|
||||
|
||||
/* Return 1 if progress was made, 0 otherwise */
|
||||
static int
|
||||
static void
|
||||
process_streams_read_events (struct full_conn *conn)
|
||||
{
|
||||
lsquic_stream_t *stream;
|
||||
struct stream_prio_iter spi;
|
||||
int progress_count;
|
||||
|
||||
if (TAILQ_EMPTY(&conn->fc_pub.read_streams))
|
||||
return 0;
|
||||
return;
|
||||
|
||||
lsquic_spi_init(&spi, TAILQ_FIRST(&conn->fc_pub.read_streams),
|
||||
TAILQ_LAST(&conn->fc_pub.read_streams, lsquic_streams_tailq),
|
||||
(uintptr_t) &TAILQ_NEXT((lsquic_stream_t *) NULL, next_read_stream),
|
||||
STREAM_WANT_READ, conn->fc_conn.cn_cid, "read");
|
||||
|
||||
progress_count = 0;
|
||||
for (stream = lsquic_spi_first(&spi); stream;
|
||||
stream = lsquic_spi_next(&spi))
|
||||
progress_count +=
|
||||
dispatch_stream_read_events(conn, stream);
|
||||
|
||||
return progress_count > 0;
|
||||
lsquic_stream_dispatch_read_events(stream);
|
||||
}
|
||||
|
||||
|
||||
|
@ -2429,15 +2406,16 @@ process_streams_write_events (struct full_conn *conn, int high_prio)
|
|||
}
|
||||
|
||||
|
||||
/* Return 1 if progress was made, 0 otherwise. */
|
||||
static int
|
||||
static void
|
||||
process_hsk_stream_read_events (struct full_conn *conn)
|
||||
{
|
||||
lsquic_stream_t *stream;
|
||||
TAILQ_FOREACH(stream, &conn->fc_pub.read_streams, next_read_stream)
|
||||
if (LSQUIC_STREAM_HANDSHAKE == stream->id)
|
||||
return dispatch_stream_read_events(conn, stream);
|
||||
return 0;
|
||||
{
|
||||
lsquic_stream_dispatch_read_events(stream);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -2641,13 +2619,13 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
struct full_conn *conn = (struct full_conn *) lconn;
|
||||
int have_delayed_packets;
|
||||
unsigned n;
|
||||
int progress_made, s;
|
||||
enum tick_st progress_tick = 0;
|
||||
int s;
|
||||
enum tick_st tick = 0;
|
||||
|
||||
#define CLOSE_IF_NECESSARY() do { \
|
||||
if (conn->fc_flags & FC_IMMEDIATE_CLOSE_FLAGS) \
|
||||
{ \
|
||||
progress_tick |= immediate_close(conn); \
|
||||
tick |= immediate_close(conn); \
|
||||
goto end; \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -2659,13 +2637,13 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
{ \
|
||||
LSQ_DEBUG("used up packet allowance, quiet now (line %d)", \
|
||||
__LINE__); \
|
||||
progress_tick |= TICK_QUIET; \
|
||||
tick |= TICK_QUIET; \
|
||||
} \
|
||||
else \
|
||||
{ \
|
||||
LSQ_DEBUG("used up packet allowance, sending now (line %d)",\
|
||||
__LINE__); \
|
||||
progress_tick |= TICK_SEND; \
|
||||
tick |= TICK_SEND; \
|
||||
} \
|
||||
goto end; \
|
||||
} \
|
||||
|
@ -2682,8 +2660,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
LSQ_DEBUG("memory used: %zd bytes", calc_mem_used(conn));
|
||||
}
|
||||
|
||||
assert(!(conn->fc_conn.cn_flags & LSCONN_RW_PENDING));
|
||||
|
||||
if (conn->fc_flags & FC_HAVE_SAVED_ACK)
|
||||
{
|
||||
(void) /* If there is an error, we'll fail shortly */
|
||||
|
@ -2709,10 +2685,9 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
* does not want to wait if it has the server information.
|
||||
*/
|
||||
if (conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE)
|
||||
progress_made = process_streams_read_events(conn);
|
||||
process_streams_read_events(conn);
|
||||
else
|
||||
progress_made = process_hsk_stream_read_events(conn);
|
||||
progress_tick |= progress_made << TICK_BIT_PROGRESS;
|
||||
process_hsk_stream_read_events(conn);
|
||||
CLOSE_IF_NECESSARY();
|
||||
|
||||
if (lsquic_send_ctl_pacer_blocked(&conn->fc_send_ctl))
|
||||
|
@ -2774,7 +2749,7 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
* dropped and replaced by new ACK packet. This way, we are never
|
||||
* more than 1 packet over CWND.
|
||||
*/
|
||||
progress_tick |= TICK_SEND;
|
||||
tick |= TICK_SEND;
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
@ -2830,7 +2805,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
}
|
||||
|
||||
lsquic_send_ctl_set_buffer_stream_packets(&conn->fc_send_ctl, 0);
|
||||
const unsigned n_sched = lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl);
|
||||
if (!(conn->fc_conn.cn_flags & LSCONN_HANDSHAKE_DONE))
|
||||
{
|
||||
process_hsk_stream_write_events(conn);
|
||||
|
@ -2860,8 +2834,6 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
process_streams_write_events(conn, 0);
|
||||
|
||||
end_write:
|
||||
progress_made = (n_sched < lsquic_send_ctl_n_scheduled(&conn->fc_send_ctl));
|
||||
progress_tick |= progress_made << TICK_BIT_PROGRESS;
|
||||
|
||||
skip_write:
|
||||
service_streams(conn);
|
||||
|
@ -2883,10 +2855,10 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
!conn->fc_settings->es_silent_close)
|
||||
{
|
||||
generate_connection_close_packet(conn);
|
||||
progress_tick |= TICK_SEND|TICK_CLOSE;
|
||||
tick |= TICK_SEND|TICK_CLOSE;
|
||||
}
|
||||
else
|
||||
progress_tick |= TICK_CLOSE;
|
||||
tick |= TICK_CLOSE;
|
||||
goto end;
|
||||
}
|
||||
|
||||
|
@ -2901,7 +2873,7 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
}
|
||||
else
|
||||
{
|
||||
progress_tick |= TICK_QUIET;
|
||||
tick |= TICK_QUIET;
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
@ -2924,11 +2896,11 @@ full_conn_ci_tick (lsquic_conn_t *lconn, lsquic_time_t now)
|
|||
lsquic_hash_count(conn->fc_pub.all_streams) > 0)
|
||||
lsquic_alarmset_set(&conn->fc_alset, AL_PING, now + TIME_BETWEEN_PINGS);
|
||||
|
||||
progress_tick |= TICK_SEND;
|
||||
tick |= TICK_SEND;
|
||||
|
||||
end:
|
||||
lsquic_send_ctl_set_buffer_stream_packets(&conn->fc_send_ctl, 1);
|
||||
return progress_tick;
|
||||
return tick;
|
||||
}
|
||||
|
||||
|
||||
|
@ -3343,6 +3315,68 @@ lsquic_conn_status (lsquic_conn_t *lconn, char *errbuf, size_t bufsz)
|
|||
}
|
||||
|
||||
|
||||
static int
|
||||
full_conn_ci_is_tickable (lsquic_conn_t *lconn)
|
||||
{
|
||||
struct full_conn *conn = (struct full_conn *) lconn;
|
||||
const struct lsquic_stream *stream;
|
||||
int can_send;
|
||||
|
||||
/* This caches the value so that we only need to call the function once */
|
||||
#define CAN_SEND() \
|
||||
(can_send >= 0 ? can_send : \
|
||||
(can_send = lsquic_send_ctl_can_send(&conn->fc_send_ctl)))
|
||||
|
||||
if (lsquic_send_ctl_has_buffered(&conn->fc_send_ctl))
|
||||
return 1;
|
||||
|
||||
if (!TAILQ_EMPTY(&conn->fc_pub.service_streams))
|
||||
return 1;
|
||||
|
||||
can_send = -1;
|
||||
if (!TAILQ_EMPTY(&conn->fc_pub.sending_streams) && CAN_SEND())
|
||||
return 1;
|
||||
|
||||
TAILQ_FOREACH(stream, &conn->fc_pub.read_streams, next_read_stream)
|
||||
if (lsquic_stream_readable(stream))
|
||||
return 1;
|
||||
|
||||
if (!TAILQ_EMPTY(&conn->fc_pub.write_streams) && CAN_SEND())
|
||||
{
|
||||
TAILQ_FOREACH(stream, &conn->fc_pub.write_streams, next_write_stream)
|
||||
if (lsquic_stream_write_avail(stream))
|
||||
return 1;
|
||||
}
|
||||
|
||||
#undef CAN_SEND
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static lsquic_time_t
|
||||
full_conn_ci_next_tick_time (lsquic_conn_t *lconn)
|
||||
{
|
||||
struct full_conn *conn = (struct full_conn *) lconn;
|
||||
lsquic_time_t alarm_time, pacer_time;
|
||||
|
||||
alarm_time = lsquic_alarmset_mintime(&conn->fc_alset);
|
||||
pacer_time = lsquic_send_ctl_next_pacer_time(&conn->fc_send_ctl);
|
||||
|
||||
if (alarm_time && pacer_time)
|
||||
{
|
||||
if (alarm_time < pacer_time)
|
||||
return alarm_time;
|
||||
else
|
||||
return pacer_time;
|
||||
}
|
||||
else if (alarm_time)
|
||||
return alarm_time;
|
||||
else
|
||||
return pacer_time;
|
||||
}
|
||||
|
||||
|
||||
static const struct headers_stream_callbacks headers_callbacks =
|
||||
{
|
||||
.hsc_on_headers = headers_stream_on_incoming_headers,
|
||||
|
@ -3359,7 +3393,9 @@ static const struct conn_iface full_conn_iface = {
|
|||
.ci_destroy = full_conn_ci_destroy,
|
||||
.ci_handshake_failed = full_conn_ci_handshake_failed,
|
||||
.ci_handshake_ok = full_conn_ci_handshake_ok,
|
||||
.ci_is_tickable = full_conn_ci_is_tickable,
|
||||
.ci_next_packet_to_send = full_conn_ci_next_packet_to_send,
|
||||
.ci_next_tick_time = full_conn_ci_next_tick_time,
|
||||
.ci_packet_in = full_conn_ci_packet_in,
|
||||
.ci_packet_not_sent = full_conn_ci_packet_not_sent,
|
||||
.ci_packet_sent = full_conn_ci_packet_sent,
|
||||
|
|
|
@ -70,6 +70,7 @@ enum lsq_log_level lsq_log_levels[N_LSQUIC_LOGGER_MODULES] = {
|
|||
[LSQLM_SPI] = LSQ_LOG_WARN,
|
||||
[LSQLM_DI] = LSQ_LOG_WARN,
|
||||
[LSQLM_PACER] = LSQ_LOG_WARN,
|
||||
[LSQLM_MIN_HEAP] = LSQ_LOG_WARN,
|
||||
};
|
||||
|
||||
const char *const lsqlm_to_str[N_LSQUIC_LOGGER_MODULES] = {
|
||||
|
@ -97,6 +98,7 @@ const char *const lsqlm_to_str[N_LSQUIC_LOGGER_MODULES] = {
|
|||
[LSQLM_SPI] = "spi",
|
||||
[LSQLM_DI] = "di",
|
||||
[LSQLM_PACER] = "pacer",
|
||||
[LSQLM_MIN_HEAP] = "min-heap",
|
||||
};
|
||||
|
||||
const char *const lsq_loglevel2str[N_LSQUIC_LOG_LEVELS] = {
|
||||
|
|
|
@ -70,6 +70,7 @@ enum lsquic_logger_module {
|
|||
LSQLM_SPI,
|
||||
LSQLM_DI,
|
||||
LSQLM_PACER,
|
||||
LSQLM_MIN_HEAP,
|
||||
N_LSQUIC_LOGGER_MODULES
|
||||
};
|
||||
|
||||
|
|
92
src/liblsquic/lsquic_min_heap.c
Normal file
92
src/liblsquic/lsquic_min_heap.c
Normal file
|
@ -0,0 +1,92 @@
|
|||
/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */
|
||||
/*
|
||||
* lsquic_min_heap.c
|
||||
*/
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdint.h>
|
||||
|
||||
#include "lsquic_min_heap.h"
|
||||
#define LSQUIC_LOGGER_MODULE LSQLM_MIN_HEAP
|
||||
#include "lsquic_logger.h"
|
||||
|
||||
#define MHE_PARENT(i) ((i - 1) / 2)
|
||||
#define MHE_LCHILD(i) (2 * i + 1)
|
||||
#define MHE_RCHILD(i) (2 * i + 2)
|
||||
|
||||
|
||||
static void
|
||||
heapify_min_heap (struct min_heap *heap, unsigned i)
|
||||
{
|
||||
struct min_heap_elem el;
|
||||
unsigned smallest;
|
||||
|
||||
assert(i < heap->mh_nelem);
|
||||
|
||||
if (MHE_LCHILD(i) < heap->mh_nelem)
|
||||
{
|
||||
if (heap->mh_elems[ MHE_LCHILD(i) ].mhe_val <
|
||||
heap->mh_elems[ i ].mhe_val)
|
||||
smallest = MHE_LCHILD(i);
|
||||
else
|
||||
smallest = i;
|
||||
if (MHE_RCHILD(i) < heap->mh_nelem &&
|
||||
heap->mh_elems[ MHE_RCHILD(i) ].mhe_val <
|
||||
heap->mh_elems[ smallest ].mhe_val)
|
||||
smallest = MHE_RCHILD(i);
|
||||
}
|
||||
else
|
||||
smallest = i;
|
||||
|
||||
if (smallest != i)
|
||||
{
|
||||
el = heap->mh_elems[ smallest ];
|
||||
heap->mh_elems[ smallest ] = heap->mh_elems[ i ];
|
||||
heap->mh_elems[ i ] = el;
|
||||
heapify_min_heap(heap, smallest);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
lsquic_mh_insert (struct min_heap *heap, struct lsquic_conn *conn, uint64_t val)
|
||||
{
|
||||
struct min_heap_elem el;
|
||||
unsigned i;
|
||||
|
||||
assert(heap->mh_nelem < heap->mh_nalloc);
|
||||
|
||||
heap->mh_elems[ heap->mh_nelem ].mhe_conn = conn;
|
||||
heap->mh_elems[ heap->mh_nelem ].mhe_val = val;
|
||||
++heap->mh_nelem;
|
||||
|
||||
i = heap->mh_nelem - 1;
|
||||
while (i > 0 && heap->mh_elems[ MHE_PARENT(i) ].mhe_val >
|
||||
heap->mh_elems[ i ].mhe_val)
|
||||
{
|
||||
el = heap->mh_elems[ MHE_PARENT(i) ];
|
||||
heap->mh_elems[ MHE_PARENT(i) ] = heap->mh_elems[ i ];
|
||||
heap->mh_elems[ i ] = el;
|
||||
i = MHE_PARENT(i);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct lsquic_conn *
|
||||
lsquic_mh_pop (struct min_heap *heap)
|
||||
{
|
||||
struct lsquic_conn *conn;
|
||||
|
||||
if (heap->mh_nelem == 0)
|
||||
return NULL;
|
||||
|
||||
conn = heap->mh_elems[0].mhe_conn;
|
||||
--heap->mh_nelem;
|
||||
if (heap->mh_nelem > 0)
|
||||
{
|
||||
heap->mh_elems[0] = heap->mh_elems[ heap->mh_nelem ];
|
||||
heapify_min_heap(heap, 0);
|
||||
}
|
||||
|
||||
return conn;
|
||||
}
|
36
src/liblsquic/lsquic_min_heap.h
Normal file
36
src/liblsquic/lsquic_min_heap.h
Normal file
|
@ -0,0 +1,36 @@
|
|||
/* Copyright (c) 2017 - 2018 LiteSpeed Technologies Inc. See LICENSE. */
|
||||
/*
|
||||
* lsquic_min_heap.h -- Min-heap for connections
|
||||
*/
|
||||
|
||||
#ifndef LSQUIC_MIN_HEAP_H
|
||||
#define LSQUIC_MIN_HEAP_H 1
|
||||
|
||||
struct lsquic_conn;
|
||||
|
||||
struct min_heap_elem
|
||||
{
|
||||
struct lsquic_conn *mhe_conn;
|
||||
uint64_t mhe_val;
|
||||
};
|
||||
|
||||
|
||||
struct min_heap
|
||||
{
|
||||
struct min_heap_elem *mh_elems;
|
||||
unsigned mh_nalloc,
|
||||
mh_nelem;
|
||||
};
|
||||
|
||||
|
||||
void
|
||||
lsquic_mh_insert (struct min_heap *, struct lsquic_conn *conn, uint64_t val);
|
||||
|
||||
struct lsquic_conn *
|
||||
lsquic_mh_pop (struct min_heap *);
|
||||
|
||||
#define lsquic_mh_count(heap) (+(heap)->mh_nelem)
|
||||
|
||||
#define lsquic_mh_nalloc(heap) (+(heap)->mh_nalloc)
|
||||
|
||||
#endif
|
|
@ -57,6 +57,8 @@ pacer_packet_scheduled (struct pacer *pacer, unsigned n_in_flight,
|
|||
void
|
||||
pacer_loss_event (struct pacer *);
|
||||
|
||||
#define pacer_delayed(pacer) ((pacer)->pa_flags & PA_LAST_SCHED_DELAYED)
|
||||
|
||||
#define pacer_next_sched(pacer) (+(pacer)->pa_next_sched)
|
||||
|
||||
#endif
|
||||
|
|
|
@ -219,12 +219,22 @@ lsquic_send_ctl_drop_scheduled (lsquic_send_ctl_t *);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define lsquic_send_ctl_next_pacer_time(ctl) ( \
|
||||
((ctl)->sc_flags & SC_PACE) \
|
||||
&& pacer_delayed(&(ctl)->sc_pacer) \
|
||||
? pacer_next_sched(&(ctl)->sc_pacer) \
|
||||
: 0 )
|
||||
|
||||
enum lsquic_packno_bits
|
||||
lsquic_send_ctl_packno_bits (lsquic_send_ctl_t *);
|
||||
|
||||
int
|
||||
lsquic_send_ctl_schedule_buffered (lsquic_send_ctl_t *, enum buf_packet_type);
|
||||
|
||||
#define lsquic_send_ctl_has_buffered(ctl) ( \
|
||||
TAILQ_FIRST(&(ctl)->sc_buffered_packets[BPT_HIGHEST_PRIO].bpq_packets) \
|
||||
|| TAILQ_FIRST(&(ctl)->sc_buffered_packets[BPT_OTHER_PRIO].bpq_packets ))
|
||||
|
||||
#define lsquic_send_ctl_invalidate_bpt_cache(ctl) do { \
|
||||
(ctl)->sc_cached_bpt.stream_id = 0; \
|
||||
} while (0)
|
||||
|
|
|
@ -71,9 +71,6 @@ stream_wantread (lsquic_stream_t *stream, int is_want);
|
|||
static int
|
||||
stream_wantwrite (lsquic_stream_t *stream, int is_want);
|
||||
|
||||
static int
|
||||
stream_readable (const lsquic_stream_t *stream);
|
||||
|
||||
static ssize_t
|
||||
stream_write_to_packets (lsquic_stream_t *, struct lsquic_reader *, size_t);
|
||||
|
||||
|
@ -174,13 +171,12 @@ stream_inside_callback (const lsquic_stream_t *stream)
|
|||
|
||||
/* Here, "readable" means that the user is able to read from the stream. */
|
||||
static void
|
||||
maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream,
|
||||
enum rw_reason reason)
|
||||
maybe_conn_to_tickable_if_readable (lsquic_stream_t *stream)
|
||||
{
|
||||
if (!stream_inside_callback(stream) && stream_readable(stream))
|
||||
if (!stream_inside_callback(stream) && lsquic_stream_readable(stream))
|
||||
{
|
||||
lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
|
||||
stream->conn_pub->lconn, reason);
|
||||
lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
|
||||
stream->conn_pub->lconn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -189,15 +185,14 @@ maybe_conn_to_pendrw_if_readable (lsquic_stream_t *stream,
|
|||
* scheduled to be sent out.
|
||||
*/
|
||||
static void
|
||||
maybe_conn_to_pendrw_if_writeable (lsquic_stream_t *stream,
|
||||
enum rw_reason reason)
|
||||
maybe_conn_to_tickable_if_writeable (lsquic_stream_t *stream)
|
||||
{
|
||||
if (!stream_inside_callback(stream) &&
|
||||
lsquic_send_ctl_can_send(stream->conn_pub->send_ctl) &&
|
||||
! lsquic_send_ctl_have_delayed_packets(stream->conn_pub->send_ctl))
|
||||
{
|
||||
lsquic_engine_add_conn_to_pend_rw(stream->conn_pub->enpub,
|
||||
stream->conn_pub->lconn, reason);
|
||||
lsquic_engine_add_conn_to_tickable(stream->conn_pub->enpub,
|
||||
stream->conn_pub->lconn);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -403,8 +398,8 @@ lsquic_stream_call_on_close (lsquic_stream_t *stream)
|
|||
}
|
||||
|
||||
|
||||
static int
|
||||
stream_readable (const lsquic_stream_t *stream)
|
||||
int
|
||||
lsquic_stream_readable (const lsquic_stream_t *stream)
|
||||
{
|
||||
/* A stream is readable if one of the following is true: */
|
||||
return
|
||||
|
@ -429,8 +424,8 @@ stream_readable (const lsquic_stream_t *stream)
|
|||
}
|
||||
|
||||
|
||||
static size_t
|
||||
stream_write_avail (const struct lsquic_stream *stream)
|
||||
size_t
|
||||
lsquic_stream_write_avail (const struct lsquic_stream *stream)
|
||||
{
|
||||
uint64_t stream_avail, conn_avail;
|
||||
|
||||
|
@ -516,7 +511,7 @@ lsquic_stream_frame_in (lsquic_stream_t *stream, stream_frame_t *frame)
|
|||
}
|
||||
if (got_next_offset)
|
||||
/* Checking the offset saves di_get_frame() call */
|
||||
maybe_conn_to_pendrw_if_readable(stream, RW_REASON_STREAM_IN);
|
||||
maybe_conn_to_tickable_if_readable(stream);
|
||||
return 0;
|
||||
}
|
||||
else if (INS_FRAME_DUP == ins_frame)
|
||||
|
@ -603,7 +598,7 @@ lsquic_stream_rst_in (lsquic_stream_t *stream, uint64_t offset,
|
|||
}
|
||||
|
||||
/* Let user collect error: */
|
||||
maybe_conn_to_pendrw_if_readable(stream, RW_REASON_RST_IN);
|
||||
maybe_conn_to_tickable_if_readable(stream);
|
||||
|
||||
lsquic_sfcw_consume_rem(&stream->fc);
|
||||
drop_frames_in(stream);
|
||||
|
@ -793,7 +788,7 @@ lsquic_stream_readv (lsquic_stream_t *stream, const struct iovec *iov,
|
|||
if (!(stream->stream_flags & STREAM_SENDING_FLAGS))
|
||||
TAILQ_INSERT_TAIL(&stream->conn_pub->sending_streams, stream, next_send_stream);
|
||||
stream->stream_flags |= STREAM_SEND_WUF;
|
||||
maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_USER_READ);
|
||||
maybe_conn_to_tickable_if_writeable(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -894,7 +889,7 @@ lsquic_stream_shutdown (lsquic_stream_t *stream, int how)
|
|||
maybe_finish_stream(stream);
|
||||
maybe_schedule_call_on_close(stream);
|
||||
if (how)
|
||||
maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_SHUTDOWN);
|
||||
maybe_conn_to_tickable_if_writeable(stream);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1045,7 +1040,7 @@ lsquic_stream_wantread (lsquic_stream_t *stream, int is_want)
|
|||
if (!(stream->stream_flags & STREAM_U_READ_DONE))
|
||||
{
|
||||
if (is_want)
|
||||
maybe_conn_to_pendrw_if_readable(stream, RW_REASON_WANTREAD);
|
||||
maybe_conn_to_tickable_if_readable(stream);
|
||||
return stream_wantread(stream, is_want);
|
||||
}
|
||||
else
|
||||
|
@ -1083,7 +1078,8 @@ stream_dispatch_read_events_loop (lsquic_stream_t *stream)
|
|||
no_progress_limit = stream->conn_pub->enpub->enp_settings.es_progress_check;
|
||||
|
||||
no_progress_count = 0;
|
||||
while ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
|
||||
while ((stream->stream_flags & STREAM_WANT_READ)
|
||||
&& lsquic_stream_readable(stream))
|
||||
{
|
||||
flags = stream->stream_flags & USER_PROGRESS_FLAGS;
|
||||
size = stream->read_offset;
|
||||
|
@ -1121,7 +1117,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream)
|
|||
stream->stream_flags |= STREAM_LAST_WRITE_OK;
|
||||
while ((stream->stream_flags & (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK))
|
||||
== (STREAM_WANT_WRITE|STREAM_LAST_WRITE_OK)
|
||||
&& stream_write_avail(stream))
|
||||
&& lsquic_stream_write_avail(stream))
|
||||
{
|
||||
flags = stream->stream_flags & USER_PROGRESS_FLAGS;
|
||||
|
||||
|
@ -1149,7 +1145,7 @@ stream_dispatch_write_events_loop (lsquic_stream_t *stream)
|
|||
static void
|
||||
stream_dispatch_read_events_once (lsquic_stream_t *stream)
|
||||
{
|
||||
if ((stream->stream_flags & STREAM_WANT_READ) && stream_readable(stream))
|
||||
if ((stream->stream_flags & STREAM_WANT_READ) && lsquic_stream_readable(stream))
|
||||
{
|
||||
stream->stream_if->on_read(stream, stream->st_ctx);
|
||||
}
|
||||
|
@ -1227,7 +1223,7 @@ lsquic_stream_dispatch_write_events (lsquic_stream_t *stream)
|
|||
if (stream->stream_flags & STREAM_RW_ONCE)
|
||||
{
|
||||
if ((stream->stream_flags & STREAM_WANT_WRITE)
|
||||
&& stream_write_avail(stream))
|
||||
&& lsquic_stream_write_avail(stream))
|
||||
{
|
||||
stream->stream_if->on_write(stream, stream->st_ctx);
|
||||
}
|
||||
|
@ -1395,7 +1391,7 @@ frame_gen_size (void *ctx)
|
|||
|
||||
/* Make sure we are not writing past available size: */
|
||||
remaining = fg_ctx->fgc_reader->lsqr_size(fg_ctx->fgc_reader->lsqr_ctx);
|
||||
available = stream_write_avail(fg_ctx->fgc_stream);
|
||||
available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
|
||||
if (available < remaining)
|
||||
remaining = available;
|
||||
|
||||
|
@ -1452,7 +1448,7 @@ frame_gen_read (void *ctx, void *begin_buf, size_t len, int *fin)
|
|||
stream->sm_n_buffered = 0;
|
||||
}
|
||||
|
||||
available = stream_write_avail(fg_ctx->fgc_stream);
|
||||
available = lsquic_stream_write_avail(fg_ctx->fgc_stream);
|
||||
n_to_write = end - p;
|
||||
if (n_to_write > available)
|
||||
n_to_write = available;
|
||||
|
@ -1662,7 +1658,7 @@ save_to_buffer (lsquic_stream_t *stream, struct lsquic_reader *reader,
|
|||
return -1;
|
||||
}
|
||||
|
||||
avail = stream_write_avail(stream);
|
||||
avail = lsquic_stream_write_avail(stream);
|
||||
if (avail < len)
|
||||
len = avail;
|
||||
|
||||
|
@ -1891,7 +1887,7 @@ lsquic_stream_reset_ext (lsquic_stream_t *stream, uint32_t error_code,
|
|||
if (do_close)
|
||||
lsquic_stream_close(stream);
|
||||
else
|
||||
maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_RESET_EXT);
|
||||
maybe_conn_to_tickable_if_writeable(stream);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1924,7 +1920,7 @@ lsquic_stream_close (lsquic_stream_t *stream)
|
|||
stream_shutdown_read(stream);
|
||||
maybe_schedule_call_on_close(stream);
|
||||
maybe_finish_stream(stream);
|
||||
maybe_conn_to_pendrw_if_writeable(stream, RW_REASON_STREAM_CLOSE);
|
||||
maybe_conn_to_tickable_if_writeable(stream);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -280,18 +280,6 @@ struct stream_read_prog_status
|
|||
enum stream_flags srps_flags;
|
||||
};
|
||||
|
||||
#define lsquic_stream_get_read_prog_status(stream, stats) do { \
|
||||
(stats)->srps_read_offset = (stream)->read_offset; \
|
||||
(stats)->srps_flags = \
|
||||
(stream)->stream_flags & STREAM_RW_PROG_FLAGS; \
|
||||
} while (0)
|
||||
|
||||
#define lsquic_stream_progress_was_made(stream, stats) ( \
|
||||
(stats)->srps_read_offset != (stream)->read_offset \
|
||||
|| (stats)->srps_flags != \
|
||||
((stream)->stream_flags & STREAM_RW_PROG_FLAGS) \
|
||||
)
|
||||
|
||||
#define lsquic_stream_is_critical(stream) ( \
|
||||
(stream)->id == LSQUIC_STREAM_HANDSHAKE || \
|
||||
((stream)->id == LSQUIC_STREAM_HEADERS && \
|
||||
|
@ -305,4 +293,10 @@ lsquic_stream_cid (const struct lsquic_stream *);
|
|||
|
||||
#define lsquic_stream_has_data_to_flush(stream) ((stream)->sm_n_buffered > 0)
|
||||
|
||||
int
|
||||
lsquic_stream_readable (const lsquic_stream_t *);
|
||||
|
||||
size_t
|
||||
lsquic_stream_write_avail (const struct lsquic_stream *);
|
||||
|
||||
#endif
|
||||
|
|
87
test/prog.c
87
test/prog.c
|
@ -28,9 +28,6 @@
|
|||
|
||||
static int prog_stopped;
|
||||
|
||||
static void
|
||||
prog_set_onetimer (struct prog *prog, unsigned usec);
|
||||
|
||||
static const struct lsquic_packout_mem_if pmi = {
|
||||
.pmi_allocate = pba_allocate,
|
||||
.pmi_release = pba_release,
|
||||
|
@ -91,8 +88,6 @@ prog_print_common_options (const struct prog *prog, FILE *out)
|
|||
" ::1:12345\n"
|
||||
" If no -s option is given, 0.0.0.0:12345 address\n"
|
||||
" is used.\n"
|
||||
" -i USEC Library will `tick' every USEC microseconds. The default\n"
|
||||
" is %u\n"
|
||||
#if LSQUIC_DONTFRAG_SUPPORTED
|
||||
" -D Set `do not fragment' flag on outgoing UDP packets\n"
|
||||
#endif
|
||||
|
@ -122,7 +117,6 @@ prog_print_common_options (const struct prog *prog, FILE *out)
|
|||
" -S opt=val Socket options. Supported options:\n"
|
||||
" sndbuf=12345 # Sets SO_SNDBUF\n"
|
||||
" rcvbuf=12345 # Sets SO_RCVBUF\n"
|
||||
, PROG_DEFAULT_PERIOD_USEC
|
||||
);
|
||||
|
||||
|
||||
|
@ -150,9 +144,6 @@ prog_set_opt (struct prog *prog, int opt, const char *arg)
|
|||
{
|
||||
switch (opt)
|
||||
{
|
||||
case 'i':
|
||||
prog->prog_period_usec = atoi(arg);
|
||||
return 0;
|
||||
#if LSQUIC_DONTFRAG_SUPPORTED
|
||||
case 'D':
|
||||
{
|
||||
|
@ -248,6 +239,7 @@ prog_connect (struct prog *prog)
|
|||
prog->prog_max_packet_size))
|
||||
return -1;
|
||||
|
||||
prog_process_conns(prog);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -278,30 +270,30 @@ prog_init_server (struct prog *prog)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
drop_onetimer (struct prog *prog)
|
||||
{
|
||||
if (prog->prog_onetimer)
|
||||
{
|
||||
event_del(prog->prog_onetimer);
|
||||
event_free(prog->prog_onetimer);
|
||||
prog->prog_onetimer = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
prog_maybe_set_onetimer (struct prog *prog)
|
||||
prog_process_conns (struct prog *prog)
|
||||
{
|
||||
int diff;
|
||||
struct timeval timeout;
|
||||
|
||||
lsquic_engine_process_conns(prog->prog_engine);
|
||||
|
||||
if (lsquic_engine_earliest_adv_tick(prog->prog_engine, &diff))
|
||||
{
|
||||
if (diff > 0)
|
||||
prog_set_onetimer(prog, (unsigned) diff);
|
||||
if (diff < 4000)
|
||||
{
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 4000;
|
||||
}
|
||||
else
|
||||
drop_onetimer(prog);
|
||||
{
|
||||
timeout.tv_sec = (unsigned) diff / 1000000;
|
||||
timeout.tv_usec = (unsigned) diff % 1000000;
|
||||
}
|
||||
|
||||
if (!prog_is_stopped())
|
||||
event_add(prog->prog_timer, &timeout);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -309,33 +301,8 @@ static void
|
|||
prog_timer_handler (int fd, short what, void *arg)
|
||||
{
|
||||
struct prog *const prog = arg;
|
||||
lsquic_engine_proc_all(prog->prog_engine);
|
||||
prog_maybe_set_onetimer(prog);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
prog_onetimer_handler (int fd, short what, void *arg)
|
||||
{
|
||||
struct prog *const prog = arg;
|
||||
lsquic_engine_process_conns_to_tick(prog->prog_engine);
|
||||
prog_maybe_set_onetimer(prog);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
prog_set_onetimer (struct prog *prog, unsigned usec)
|
||||
{
|
||||
struct timeval timeout;
|
||||
|
||||
drop_onetimer(prog);
|
||||
if (usec < 4000)
|
||||
usec = 4000;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = usec;
|
||||
prog->prog_onetimer = event_new(prog->prog_eb, -1, EV_TIMEOUT,
|
||||
prog_onetimer_handler, prog);
|
||||
event_add(prog->prog_onetimer, &timeout);
|
||||
if (!prog_is_stopped())
|
||||
prog_process_conns(prog);
|
||||
}
|
||||
|
||||
|
||||
|
@ -350,19 +317,12 @@ prog_usr1_handler (int fd, short what, void *arg)
|
|||
int
|
||||
prog_run (struct prog *prog)
|
||||
{
|
||||
struct timeval timeout;
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = prog->prog_period_usec;
|
||||
prog->prog_timer = event_new(prog->prog_eb, -1, EV_PERSIST,
|
||||
prog_timer_handler, prog);
|
||||
event_add(prog->prog_timer, &timeout);
|
||||
#ifndef WIN32
|
||||
prog->prog_usr1 = evsignal_new(prog->prog_eb, SIGUSR1,
|
||||
prog_usr1_handler, prog);
|
||||
evsignal_add(prog->prog_usr1, NULL);
|
||||
#endif
|
||||
|
||||
|
||||
event_base_loop(prog->prog_eb, 0);
|
||||
|
||||
return 0;
|
||||
|
@ -392,7 +352,6 @@ prog_stop (struct prog *prog)
|
|||
sport_destroy(sport);
|
||||
}
|
||||
|
||||
drop_onetimer(prog);
|
||||
if (prog->prog_timer)
|
||||
{
|
||||
event_del(prog->prog_timer);
|
||||
|
@ -414,11 +373,6 @@ prog_prep (struct prog *prog)
|
|||
int s;
|
||||
char err_buf[100];
|
||||
|
||||
if (0 == prog->prog_period_usec)
|
||||
prog->prog_period_usec = PROG_DEFAULT_PERIOD_USEC;
|
||||
if (prog->prog_settings.es_proc_time_thresh == LSQUIC_DF_PROC_TIME_THRESH)
|
||||
prog->prog_settings.es_proc_time_thresh = prog->prog_period_usec;
|
||||
|
||||
if (0 != lsquic_engine_check_settings(prog->prog_api.ea_settings,
|
||||
prog->prog_engine_flags, err_buf, sizeof(err_buf)))
|
||||
{
|
||||
|
@ -442,6 +396,9 @@ prog_prep (struct prog *prog)
|
|||
if (!prog->prog_engine)
|
||||
return -1;
|
||||
|
||||
prog->prog_timer = event_new(prog->prog_eb, -1, 0,
|
||||
prog_timer_handler, prog);
|
||||
|
||||
if (prog->prog_engine_flags & LSENG_SERVER)
|
||||
s = prog_init_server(prog);
|
||||
else
|
||||
|
|
11
test/prog.h
11
test/prog.h
|
@ -19,13 +19,10 @@ struct prog
|
|||
unsigned prog_engine_flags;
|
||||
struct service_port prog_dummy_sport; /* Use for options */
|
||||
unsigned prog_packout_max;
|
||||
#define PROG_DEFAULT_PERIOD_USEC (10 * 1000) /* 10 ms default */
|
||||
unsigned prog_period_usec;
|
||||
unsigned short prog_max_packet_size;
|
||||
int prog_version_cleared;
|
||||
struct event_base *prog_eb;
|
||||
struct event *prog_timer,
|
||||
*prog_onetimer,
|
||||
*prog_usr1;
|
||||
struct sport_head *prog_sports;
|
||||
struct lsquic_engine *prog_engine;
|
||||
|
@ -48,7 +45,7 @@ prog_init (struct prog *, unsigned lsquic_engine_flags, struct sport_head *,
|
|||
# define IP_DONTFRAG_FLAG ""
|
||||
#endif
|
||||
|
||||
#define PROG_OPTS "i:m:c:y:L:l:o:H:s:S:Y:z:" SENDMMSG_FLAG IP_DONTFRAG_FLAG
|
||||
#define PROG_OPTS "m:c:y:L:l:o:H:s:S:Y:z:" SENDMMSG_FLAG IP_DONTFRAG_FLAG
|
||||
|
||||
/* Returns:
|
||||
* 0 Applied
|
||||
|
@ -79,10 +76,10 @@ prog_connect (struct prog *);
|
|||
void
|
||||
prog_print_common_options (const struct prog *, FILE *);
|
||||
|
||||
void
|
||||
prog_maybe_set_onetimer (struct prog *);
|
||||
|
||||
int
|
||||
prog_is_stopped (void);
|
||||
|
||||
void
|
||||
prog_process_conns (struct prog *);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -359,16 +359,10 @@ read_handler (int fd, short flags, void *ctx)
|
|||
while (ROP_NOROOM == rop);
|
||||
|
||||
if (n_batches)
|
||||
{
|
||||
n += packs_in->n_alloc * (n_batches - 1);
|
||||
lsquic_engine_process_conns_with_incoming(engine);
|
||||
}
|
||||
|
||||
while (lsquic_engine_has_pend_rw(engine))
|
||||
lsquic_engine_process_conns_with_pend_rw(engine);
|
||||
|
||||
if (!prog_is_stopped())
|
||||
prog_maybe_set_onetimer(sport->sp_prog);
|
||||
prog_process_conns(sport->sp_prog);
|
||||
|
||||
LSQ_DEBUG("read %u packet%.*s in %u batch%s", n, n != 1, "s", n_batches, n_batches != 1 ? "es" : "");
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ main (int argc, char **argv)
|
|||
lsquic_set_log_level("info");
|
||||
|
||||
malo = lsquic_malo_create(sizeof(*lconn));
|
||||
s = conn_hash_init(&conn_hash, nelems);
|
||||
s = conn_hash_init(&conn_hash);
|
||||
assert(0 == s);
|
||||
|
||||
for (n = 0; n < nelems; ++n)
|
||||
|
@ -63,18 +63,6 @@ main (int argc, char **argv)
|
|||
|
||||
assert(nelems == conn_hash_count(&conn_hash));
|
||||
|
||||
{
|
||||
lconn = get_new_lsquic_conn(malo);
|
||||
find_lsconn = conn_hash_find(&conn_hash, lconn->cn_cid);
|
||||
assert(!find_lsconn);
|
||||
s = conn_hash_add(&conn_hash, lconn);
|
||||
assert(-1 == s);
|
||||
lsquic_malo_put(lconn);
|
||||
}
|
||||
|
||||
for (n = 0, lconn = conn_hash_first(&conn_hash); lconn; ++n, lconn = conn_hash_next(&conn_hash))
|
||||
assert(n == (uintptr_t) lconn->cn_if);
|
||||
|
||||
for (lconn = lsquic_malo_first(malo); lconn;
|
||||
lconn = lsquic_malo_next(malo))
|
||||
{
|
||||
|
|
|
@ -128,8 +128,8 @@ lsquic_send_ctl_determine_bpt (struct lsquic_send_ctl *ctl,
|
|||
|
||||
/* This function is only here to avoid crash in the test: */
|
||||
void
|
||||
lsquic_engine_add_conn_to_pend_rw (struct lsquic_engine_public *enpub,
|
||||
lsquic_conn_t *conn, enum rw_reason reason)
|
||||
lsquic_engine_add_conn_to_tickable (struct lsquic_engine_public *enpub,
|
||||
lsquic_conn_t *conn)
|
||||
{
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue