Core Async Timeout Channels

I’ve been doing a deep dive into core.async concepts recently because they’re a nice way to re-organize callback code. A prominent feature of core.async is timeout channels, which are channels that don’t deliver any values and close after a specified amount of time. They are useful if you want to “sleep” a given amount of time in a go block since they won’t block the underlying thread, and they are particularly useful with alts!. alts! allows waiting on more than one channel concurrently returning the value delivered from the channel as well as the channel that delivered the value.

In the example below, if a call url takes longer than 300ms return a :timeout key. If the response is returned faster than 300ms, the code returns that value.

(require '[clojure.core.async :as a]
         '[org.httpkit.client :as http])
(def url ...)
(def ch (a/chan 1))
(http/get url #(a/put! ch %))
(a/go
  (let [[v c] (a/alts! [ch (a/timeout 300)] ;;close channel after 300ms
    (if (= c ch)
      v
      (do (a/close! ch) :timeout)))))

The great thing about timeout channels is that the codebase isn’t very large and therefore easy to dig into and understand. Everything hinges on roughly 3 main ideas:

  1. timeouts-queue, a DelayQueue
  2. timeouts-map, a ConcurrentSkipListMap
  3. timeout-daemon, a regular Thread

timeouts-queue

The timeouts-queue is what manages the timeout functionality. DelayQueue is unbounded so the puts from timeout function do not block All the elements inside the queue must implement the Delayed interface.

Here’s a facsimile of the code used by core.async. For sake of a REPLable example I rewrote TimeoutEntry code, giving it a nice toString method that shows the delay, as well as the creation timestamp, which helps with legibility. I omitted the channel operations because they are not relevant for this example.

(def dq (java.util.concurrent.DelayQueue.))

(deftype TimeoutEntry [^long delay ^long ts]
  java.util.concurrent.Delayed
  (getDelay [_ time-unit] (.convert ^TimeUnit time-unit
				    (- (+ delay ts) (System/currentTimeMillis))
				    TimeUnit/MILLISECONDS))
  (compareTo [_ that] (let [this-time (+ delay ts)
  			    that-time (+ (.delay that) (.ts that))]
			     (cond (< this-time that-time) -1
			      	   (= this-time that-time)  0
				   :else 		    1)))
  (toString [_] (str "Delay millis: " delay " timestamp: " ts)))
(.put dq (TimeoutEntry. 1000 (System/currentTimeMillis)))
(.take dq)
;; 10 seconds later
#object[user.TimeoutEntry 0x33d48fe "Delay millis: 10000 timestamp: 1543169231753"]

Once current time in the getDelay method is greater than delay plus ts, the item is taken off the DelayQueue. Until then, .take blocks the current thread of execution.

timeouts-map

timeouts-map which is used as a cache so that if many calls to timeout function are made in a short timespan, the timeout channel will be reused and the unbounded DelayQueue will not be overrun with entries. This is where the time out granularity of 10ms helps. There is no comment in the code about why specifically 10ms and not say 15 or 20 was chosen. My guess is that it was arbitrarily chosen on the assumption that most systems don’t return better than 10ms precision anyway.

Here’s the cache in action:

(identical? (a/timeout 10009) (a/timeout 10000))
=> true

Since the code above creates two channels at virtually the same instant and the first one is within 10ms of the second, re-use the first channel.

As an example here’s some code that creates 10,000 timeout channels in a very short time span. Notice that there were only 6 actual timeout channels created.

(time (count (set (repeatedly 10000 #(a/timeout 1000)))))
"Elapsed time: 19.8315 msecs"
=> 6

And here it is again with the print out of the timeouts-map:

(time (count (set (repeatedly 10000 #(a/timeout 1000)))))
@#'clojure.core.async.impl.timers/timeouts-map
"Elapsed time: 5.797694 msecs"
=> 6
=>
{1543110781315 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0x628ced5a
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@628ced5a"],
 1543110781316 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0x428b222b
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@428b222b"],
 1543110781317 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0x3d33d751
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@3d33d751"],
 1543110781318 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0x13f57d15
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@13f57d15"],
 1543110781319 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0x1696a40c
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@1696a40c"],
 1543110781320 #object[clojure.core.async.impl.timers.TimeoutQueueEntry
                       0xd97568f
                       "clojure.core.async.impl.timers.TimeoutQueueEntry@d97568f"]}

Notice that there is no mutex around the critical section in the code on lines 68 and 69. I assume this is why (.put timeouts-map timeout timeout-entry) happens before (.put timeouts-queue timeout-entry): in order to maximize the chances of other threads creating timeout channels in close proximity getting a cache hit.

There is another issue where two requests for a new timeout channel happen in such a way that the (.put timeouts-map timeout timeout-entry) didn’t yet finish but a new request already passed the caching logic evaluating to false on line 64. It was likely not deemed as a big enough problem to worry about since the timeout daemon will clean it up and the cache is doing a good enough job limiting the creation of timeout channels most of the time.

timeout-daemon

timeout-entrys that hold the channel need to be expired and have their channel close. This is done by a daemon thread that constantly tries to .take from the timeouts-queue and once a timeout-entry is successfuly removed, delete it from the timeouts-map cache.

Architecture considerations for your code

  1. Timeout values of 10ms or lower are too low and shouldn’t be used
  2. Two channels that are created at about the same timestamp and differ in less than 10ms delay are considered identical
  3. Since timeout adds the delay to current time, code like (a/timeout Long/MAX_VALUE) will blow up with ArithmeticException integer overflow (or IllegalArgumentException in case of big nums). If you really want a channel that never closes just create one and never close it.
  4. Do not call close! on a timeout channel. Since they are shared and mutable you will likely close a channel in another go routine which could lead to strange behavior.
  5. Doing many takes on what appear to be distinct timeout channels could result in assertion errors. While this can blow up with MAX-QUEUE-SIZE error:
(def t (a/timeout 1000))
(dotimes [_ 1025] (a/take! t (fn [_])))

So can this:

(let [timeouts (vec (repeatedly 10 #(a/timeout 10000)))]
 (doseq [tc timeouts]
  (dotimes [_ 110] (a/take! tc (fn [_])))))

I don’t think this matters much from a practical standpoint, but assuming that timeout returns a semantically new value is incorrect.