I’ve been working with Clojure a fair amount in production projects and found myself writing a lot of tasks that required asyncronous calls, but knew that callbacks weren’t the answer, so I turned to Core Async to manage that work with channels.
When I first read through the rationale and walkthrough of core async, I thought I was all set up to start writing core.async-based code.
I was wrong. Core.Async is a library with a lot of dangerous edge cases, and if you don’t understand these edges, you’re in for a lot of trouble. The official documentation doesn’t cover these cases, so I’m going to lay out some of them out here, and talk about how to avoid them.
Edge One: Threadpool underneath the go macro
What’s wrong with this code?
(require '[clojure.core.async :as a])
(import 'java.util.concurrent.ArrayBlockingQueue)
(def queue (ArrayBlockingQueue. 10))
(dotimes [i 10] (a/go-loop [] (.put queue i) (recur)))
(a/go-loop [] (println (.take queue)) (recur))
The final go-loop
will never print because of a deadlock.
The thread pool
that underpins
the execution of the go
macro is set to eight threads. This wasn’t
always
so
and it may change again. Since go
routines are multiplexed on this eight-thread pool, you need to be very careful with
what you do inside these go
blocks. If you’re executing thread-blocking operations like reading/writting a file with
InputStream/OutputStream, any blocking network calls or any kind of mutex you can potentially deadlock the pool.
Never put anything that blocks threads inside a go
block. The only exception I would make is logging
because that shouldn’t meaningfully block your thread pool. Even then I prefer to use an
async logger. That means that go
blocks are for
computation and non-blocking io only.
The reason the code above deadlocks is because the producer go-loop
s use all the available 8 threads which means
the consumer go-loop
cannot run. Unlike a BlockingQueue
core.async channels are designed to work with go
blocks and do not block threads but instead have parking semantics.
Edge Two: Too Many Puts
Consider the following code:
(require '[clojure.core.async :as a])
(def ch (a/chan))
=> nil
=> #'user/ch
(dotimes [i 1024] (a/put! ch i))
=> nil
(a/put! ch 1024)
What happens after the last put!
? For people bitten by this, seeing the number 1024 is enough
to guess. The dreaded AssertionError
.
AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE) clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)
This error means you’re almost certainly not handling backpressure correctly. That is, your code is producing events faster than the consumers can process them. Instead of pausing the producers to let the consumers catch up, producers keep producing. This causes the channel to blow up. There are hack solutions to get around this problem (dropping buffers), but chances are if you’re reaching for these to solve the MAX-QUEUE-SIZE problem, the code itself should be restructured.
For example, lets say we’re ingesting an event stream, and we have a URL that we need to query continuously and put the results on a channel. This service needs to be queried a million times in a row and gather results into a set. Here’s some code we could write:
Initial Version:
(require '[clojure.core.async :as a])
(def url ...)
(def topic-chan (a/chan 10))
(go-loop [gather #{}]
(if-some [event (a/<! topic-chan)]
(recur (conj gather (do-something-with-event event)))
gather))
(dotimes[_ 1000000]
(http/get url #(a/put! topic-chan %)))
Why is this code bad?
It’s almost guranteed to overproduce events and blow up the topic-chan.
Additionally, we’ll also
run out of socket descriptors because of unbounded http/get
calls. In blocking IO, this situation
would not be possible since every http/get
call would block the thread until it receives a response.
Notice that if the code above was to do only a hundred total calls to the service, we wouldn’t hit the constraints of the system. And, we’d be surprised when something that worked in tests/REPL blew up in production. During testing, you should keep all of your buffers small to give yourself a chance to catch these types of errors instead of having larger buffers and a smaller number of calls, which will mask these problems.
Better version:
(require '[clojure.core.async :as a])
(def url ...)
(def conc-calls 10)
(def topic-chan (a/chan 10))
(go-loop [gather #{}]
(if-some [event (a/<! topic-chan)]
(recur (conj gather (do-something-with-event event)))
gather))
(dotimes [i conc-calls]
(go-loop [calls 1]
(when-not (= calls 10000)
(let [ch (a/chan 1)]
(http/get url #(do (a/put! ch %) (a/close! ch))
(when-some [event (a/<! ch)]
(when (a/>! topic-chan event)
(recur (inc calls)))))))
What’s better about this version?
Here, there’s a bounded number (namely conc-calls
) of concurrent calls
happening. Furthermore each go
routine waits to get a response from the ch
channel via
blocking take <!
before performing a parking put >!
on the topic-chan
. Inside the go
block
the operations >!
and <!
don’t actually block the thread, they stop the execution in the go
block until the operation is finished. While a go
block waits on the results of <!
/ >!
other
go
blocks can be running.
This means that if the go-loop
taking events from topic-chan
falls behind the producers will respect
backpressure and let the consumer catch up. The producers will run as long as topic-chan
is not closed.
Edge Three: Prefer pipelines and other abstractions built on top
The async code above works even better as a pipeline with reduce. The code below uses a library I wrote that’s influenced by pipeline functions in core.async itself.
Most core.async code should be using these higher abstractions instead of go
blocks.
There is nothing wrong with go
blocks but they are a lot more error prone even in
simple examples.
The code below has a lot fewer moving parts that need to be just right in order for a system to query events from a URL.
(require '[clojure.core.async :as a]
'[affable-async.core :as aff])
(def url ...)
(def conc-calls 10)
(def topic-chan (a/chan 10))
(aff/u-pipeline-async {:n conc-calls :to topic-chan :from (a/to-chan (range 1000000))
:af (fn [_ ch]
(http/get url #(do (a/put! ch %) (a/close! ch))))})
(a/reduce conj #{} topic-chan)
Edge Four: Don’t use !!>
and <!!
at REPL time
When trying to figure something out in the REPL, an inevitable outcome of using core.async is dead-locking
our REPL thread because we’re using !!>
and <!!
to interact with a channel. For oneoffs like that
it’s a lot safer to use offer!
and poll!
functions. It’s still possible to break the REPL for some other reason, but it won’t be because we’re waiting for a
channel to deliver an event that will never arrive.
Edge Five: Function boundaries
What happens with the bellow code?
(def cs [(a/chan 1) (a/chan 1) (a/chan 1)])
(doseq [c cs] (a/put! c :foo))
(a/go (mapv a/<! cs))
go
macro transforms clojure code inside of it to run in the dispatcher pool we talked about earlier.
When clojure code inside go
is expanded there aren’t any >!
or <!
they have all been converted to put!
and take!
.
What go
doesn’t do is transform code across function creation boundaries so >!
and <!
remain undefined.
#object[clojure.core.async.impl.channels.ManyToManyChannel
0x50ba97ea
"clojure.core.async.impl.channels.ManyToManyChannel@50ba97ea"]
Exception in thread "async-dispatch-1" java.lang.AssertionError: Assert failed: <! used not in (go ...) block
nil
at clojure.core.async$_LT__BANG_.invokeStatic(async.clj:120)
at clojure.core.async$_LT__BANG_.invoke(async.clj:116)
Typically this issue manifests when you’d like to have a helper function that will do a >!
or <!
.
Even though the outer call is wrapped in go
, because the transformation doesn’t apply at function
boundaries the code fails with an AssertionError.
Edge Six: Most of the code is non-blocking but some must use blocking IO
In a practical application having NIO for everything is often not an option. There are many inherently synchronous APIs that can’t be directly used with core.async, a typical example is JDBC code. There is something you can do: make the code be asynchronous via a thread pool.
(def pool (Executors/newFixedThreadPool 5))
(defn blocking-synchronous-sql-call [id] ...)
(defn async-wrapper [pool f & args]
(let [ch (a/chan 1)]
(.submit pool (fn []
(try (a/put! ch (apply f args))
(catch Exception e (a/put! ch (ex-info "some error" {:args args} e)))
(finally (a/close! ch)))))
ch))
This code obviously omits
error
handling but now it can be called from a go
block and interop with other async code.
(a/go
(a/<! (async-wrapper pool blocking-synchronous-sql-call id)))