I’ve been doing a deep dive into core.async
concepts recently because they’re a nice way to
re-organize callback code. A prominent feature of core.async is timeout channels, which
are channels that don’t deliver any values and
close after a specified amount of time. They are useful if you want to “sleep” a given amount
of time in a go
block since they won’t block the underlying thread, and they are particularly
useful with alts!.
alts!
allows waiting on more than one channel concurrently returning the value delivered from
the channel as well as the channel that delivered the value.
In the example below, if a call url
takes longer than 300ms return a :timeout
key.
If the response is returned faster than 300ms, the code returns that value.
(require '[clojure.core.async :as a]
'[org.httpkit.client :as http])
(def url ...)
(def ch (a/chan 1))
(http/get url #(a/put! ch %))
(a/go
(let [[v c] (a/alts! [ch (a/timeout 300)] ;;close channel after 300ms
(if (= c ch)
v
(do (a/close! ch) :timeout)))))
The great thing about timeout channels is that the codebase isn’t very large and therefore easy to dig into and understand. Everything hinges on roughly 3 main ideas:
- timeouts-queue, a DelayQueue
- timeouts-map, a ConcurrentSkipListMap
- timeout-daemon, a regular Thread
timeouts-queue
The timeouts-queue
is what manages the timeout functionality. DelayQueue
is unbounded so the puts from timeout
function do not block
All the elements inside the queue must implement the Delayed
interface.
Here’s a facsimile of the code used by core.async
. For sake of a REPLable example I rewrote
TimeoutEntry
code, giving it a nice toString
method that shows the delay, as well as the
creation timestamp, which helps with legibility. I omitted the channel operations because
they are not relevant for this example.
(def dq (java.util.concurrent.DelayQueue.))
(deftype TimeoutEntry [^long delay ^long ts]
java.util.concurrent.Delayed
(getDelay [_ time-unit] (.convert ^TimeUnit time-unit
(- (+ delay ts) (System/currentTimeMillis))
TimeUnit/MILLISECONDS))
(compareTo [_ that] (let [this-time (+ delay ts)
that-time (+ (.delay that) (.ts that))]
(cond (< this-time that-time) -1
(= this-time that-time) 0
:else 1)))
(toString [_] (str "Delay millis: " delay " timestamp: " ts)))
(.put dq (TimeoutEntry. 10000 (System/currentTimeMillis)))
(.take dq)
;; 10 seconds later
#object[user.TimeoutEntry 0x33d48fe "Delay millis: 10000 timestamp: 1543169231753"]
Once current time in the getDelay
method is greater than delay
plus ts
, the item
is taken off the DelayQueue
. Until then, .take
blocks the current thread of execution.
timeouts-map
timeouts-map
which is used as a cache so that if many calls to timeout function are made in a short timespan,
the timeout channel will be reused and the unbounded DelayQueue
will not be overrun with entries.
This is where the time out granularity of 10ms
helps. There is no comment in the code about why specifically 10ms and not say 15 or 20 was chosen.
My guess is that it was arbitrarily chosen on the assumption that most systems don’t return better
than 10ms precision anyway.
Here’s the cache in action:
(identical? (a/timeout 10009) (a/timeout 10000))
=> true
Since the code above creates two channels at virtually the same instant and the first one is within 10ms of the second, re-use the first channel.
As an example here’s some code that creates 10,000 timeout channels in a very short time span. Notice that there were only 6 actual timeout channels created.
(time (count (set (repeatedly 10000 #(a/timeout 1000)))))
"Elapsed time: 19.8315 msecs"
=> 6
And here it is again with the print out of the timeouts-map
:
(time (count (set (repeatedly 10000 #(a/timeout 1000)))))
@#'clojure.core.async.impl.timers/timeouts-map
"Elapsed time: 5.797694 msecs"
=> 6
=>
{1543110781315 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0x628ced5a
"clojure.core.async.impl.timers.TimeoutQueueEntry@628ced5a"],
1543110781316 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0x428b222b
"clojure.core.async.impl.timers.TimeoutQueueEntry@428b222b"],
1543110781317 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0x3d33d751
"clojure.core.async.impl.timers.TimeoutQueueEntry@3d33d751"],
1543110781318 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0x13f57d15
"clojure.core.async.impl.timers.TimeoutQueueEntry@13f57d15"],
1543110781319 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0x1696a40c
"clojure.core.async.impl.timers.TimeoutQueueEntry@1696a40c"],
1543110781320 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
0xd97568f
"clojure.core.async.impl.timers.TimeoutQueueEntry@d97568f"]}
Notice that there is no mutex around the critical section in the code on lines 68
and 69.
I assume this is why (.put timeouts-map timeout timeout-entry)
happens before (.put timeouts-queue timeout-entry)
:
in order to maximize the chances of other threads creating timeout channels in close proximity getting a cache hit.
There is another issue where two requests for a new timeout channel happen in such a way that the
(.put timeouts-map timeout timeout-entry)
didn’t yet finish but a new request already passed
the caching logic evaluating to false
on line 64.
It was likely not deemed as a big enough problem to worry about since the timeout daemon will
clean it up and the cache is doing a good enough job limiting the creation of timeout channels most
of the time.
timeout-daemon
timeout-entrys
that hold the channel need to be expired and have their channel close.
This is done by a daemon thread that constantly tries to .take
from the timeouts-queue
and once
a timeout-entry
is successfuly removed, delete it from the timeouts-map
cache.
Architecture considerations for your code
- Timeout values of 10ms or lower are too low and shouldn’t be used
- Two timeout channels that are created at about the same timestamp and differ in less than 10ms delay are considered identical
- Since
timeout
adds the delay to current time, code like(a/timeout Long/MAX_VALUE)
will blow up withArithmeticException integer overflow
(orIllegalArgumentException
in case of big nums). If you really want a channel that never closes just create a regular one and never close it. - Do not call
close!
on a timeout channel. Since they are shared and mutable you will likely close a channel in anothergo
routine which could lead to strange behavior. - Doing many takes on what appear to be distinct timeout channels could result in assertion errors.
While this can blow up with
MAX-QUEUE-SIZE
error:
(def t (a/timeout 1000))
(dotimes [_ 1025] (a/take! t (fn [_])))
So can this:
(let [timeouts (vec (repeatedly 10 #(a/timeout 10000)))]
(doseq [tc timeouts]
(dotimes [_ 110] (a/take! tc (fn [_])))))
I don’t think this matters much from a practical standpoint, but assuming that timeout
returns a semantically new value is incorrect.