Things I Wish I Knew About Core Async in Clojure

I’ve been working with Clojure a fair amount in production projects and found myself writing a lot of tasks that required asyncronous calls, but knew that callbacks weren’t the answer, so I turned to Core Async to manage that work with channels.

When I first read through the rationale and walkthrough of core async, I thought I was all set up to start writing core.async-based code.

I was wrong. Core.Async is a library with a lot of dangerous edge cases, and if you don’t understand these edges, you’re in for a lot of trouble. The official documentation doesn’t cover these cases, so I’m going to lay out some of them out here, and talk about how to avoid them.

Edge One: Threadpool underneath the go macro

What’s wrong with this code?

(require '[clojure.core.async :as a])
(import 'java.util.concurrent.ArrayBlockingQueue)
(def queue (ArrayBlockingQueue. 10))
(dotimes [i 10] (a/go-loop [] (.put queue i) (recur)))
(a/go-loop [] (println (.take queue)) (recur))

The final go-loop will never print because of a deadlock.

The thread pool that underpins the execution of the go macro is set to eight threads. This wasn’t always so and it may change again. Since go routines are multiplexed on this eight-thread pool, you need to be very careful with what you do inside these go blocks. If you’re executing thread-blocking operations like reading/writting a file with InputStream/OutputStream, any blocking network calls or any kind of mutex you can potentially deadlock the pool.

Never put anything that blocks threads inside a go block. The only exception I would make is logging because that shouldn’t meaningfully block your thread pool. Even then I prefer to use an async logger. That means that go blocks are for computation and non-blocking io only.

The reason the code above deadlocks is because the producer go-loops use all the available 8 threads which means the consumer go-loop cannot run. Unlike a BlockingQueue core.async channels are designed to work with go blocks and do not block threads but instead have parking semantics.

Edge Two: Too Many Puts

Consider the following code:

(require '[clojure.core.async :as a])
(def ch (a/chan))
=> nil
=> #'user/ch
(dotimes [i 1024] (a/put! ch i))
=> nil
(a/put! ch 1024)

What happens after the last put!? For people bitten by this, seeing the number 1024 is enough to guess. The dreaded AssertionError.

AssertionError Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)  clojure.core.async.impl.channels.ManyToManyChannel (channels.clj:152)

This error means you’re almost certainly not handling backpressure correctly. That is, your code is producing events faster than the consumers can process them. Instead of pausing the producers to let the consumers catch up, producers keep producing. This causes the channel to blow up. There are hack solutions to get around this problem (dropping buffers), but chances are if you’re reaching for these to solve the MAX-QUEUE-SIZE problem, the code itself should be restructured.

For example, lets say we’re ingesting an event stream, and we have a URL that we need to query continuously and put the results on a channel. This service needs to be queried a million times in a row and gather results into a set. Here’s some code we could write:

Initial Version:

(require '[clojure.core.async :as a])
(def url ...)
(def topic-chan (a/chan 10))
(go-loop [gather #{}]
  (if-some [event (a/<! topic-chan)]
    (recur (conj gather (do-something-with-event event)))
    gather))
(dotimes[_ 1000000]
  (http/get url #(a/put! topic-chan %)))

Why is this code bad?

It’s almost guranteed to overproduce events and blow up the topic-chan. Additionally, we’ll also run out of socket descriptors because of unbounded http/get calls. In blocking IO, this situation would not be possible since every http/get call would block the thread until it receives a response.

Notice that if the code above was to do only a hundred total calls to the service, we wouldn’t hit the constraints of the system. And, we’d be surprised when something that worked in tests/REPL blew up in production. During testing, you should keep all of your buffers small to give yourself a chance to catch these types of errors instead of having larger buffers and a smaller number of calls, which will mask these problems.

Better version:

(require '[clojure.core.async :as a])
(def url ...)
(def conc-calls 10)
(def topic-chan (a/chan 10))
(go-loop [gather #{}]
  (if-some [event (a/<! topic-chan)]
    (recur (conj gather (do-something-with-event event)))
    gather))
(dotimes [i conc-calls]
  (go-loop [calls 1]
    (when-not (= calls 10000)
      (let [ch (a/chan 1)]
        (http/get url #(do (a/put! ch %) (a/close! ch))
	(when-some [event (a/<! ch)]
	  (when (a/>! topic-chan event)
	    (recur (inc calls)))))))

What’s better about this version?

Here, there’s a bounded number (namely conc-calls) of concurrent calls happening. Furthermore each go routine waits to get a response from the ch channel via blocking take <! before performing a parking put >! on the topic-chan. Inside the go block the operations >! and <! don’t actually block the thread, they stop the execution in the go block until the operation is finished. While a go block waits on the results of <! / >! other go blocks can be running.

This means that if the go-loop taking events from topic-chan falls behind the producers will respect backpressure and let the consumer catch up. The producers will run as long as topic-chan is not closed.

Edge Three: Prefer pipelines and other abstractions built on top

The async code above works even better as a pipeline with reduce. The code below uses a library I wrote that’s influenced by pipeline functions in core.async itself.

Most core.async code should be using these higher abstractions instead of go blocks. There is nothing wrong with go blocks but they are a lot more error prone even in simple examples.

The code below has a lot fewer moving parts that need to be just right in order for a system to query events from a URL.

(require '[clojure.core.async :as a]
	 '[affable-async.core :as aff])
(def url ...)
(def conc-calls 10)
(def topic-chan (a/chan 10))

(aff/u-pipeline-async {:n conc-calls :to topic-chan :from (a/to-chan (range 1000000))
		       :af (fn [_ ch]
		             (http/get url #(do (a/put! ch %) (a/close! ch))))})

(a/reduce conj #{} topic-chan)

Edge Four: Don’t use !!> and <!! at REPL time

When trying to figure something out in the REPL, an inevitable outcome of using core.async is dead-locking our REPL thread because we’re using !!> and <!! to interact with a channel. For oneoffs like that it’s a lot safer to use offer! and poll! functions. It’s still possible to break the REPL for some other reason, but it won’t be because we’re waiting for a channel to deliver an event that will never arrive.

Edge Five: Function boundaries

What happens with the bellow code?

(def cs [(a/chan 1) (a/chan 1) (a/chan 1)])
(doseq [c cs] (a/put! c :foo))
(a/go (mapv a/<! cs))

go macro transforms clojure code inside of it to run in the dispatcher pool we talked about earlier. When clojure code inside go is expanded there aren’t any >! or <! they have all been converted to put! and take!. What go doesn’t do is transform code across function creation boundaries so >! and <! remain undefined.

#object[clojure.core.async.impl.channels.ManyToManyChannel
        0x50ba97ea
        "clojure.core.async.impl.channels.ManyToManyChannel@50ba97ea"]
Exception in thread "async-dispatch-1" java.lang.AssertionError: Assert failed: <! used not in (go ...) block
nil
	at clojure.core.async$_LT__BANG_.invokeStatic(async.clj:120)
	at clojure.core.async$_LT__BANG_.invoke(async.clj:116)

Typically this issue manifests when you’d like to have a helper function that will do a >! or <!. Even though the outer call is wrapped in go, because the transformation doesn’t apply at function boundaries the code fails with an AssertionError.

Edge Six: Most of the code is non-blocking but some must use blocking IO

In a practical application having NIO for everything is often not an option. There are many inherently synchronous APIs that can’t be directly used with core.async, a typical example is JDBC code. There is something you can do: make the code be asynchronous via a thread pool.

(def pool (Executors/newFixedThreadPool 5))
(defn blocking-synchronous-sql-call [id] ...)
(defn async-wrapper [pool f & args]
  (let [ch (a/chan 1)]
    (.submit pool (fn []
		    (try (a/put! ch (apply f args))
		     	 (catch Exception e (a/put! ch (ex-info "some error" {:args args} e)))
		         (finally (a/close! ch)))))
      ch))

This code obviously omits error handling but now it can be called from a go block and interop with other async code.

(a/go
  (a/<! (async-wrapper pool blocking-synchronous-sql-call id)))