Composing Channels with core.async

So let's talk a bit about channel composition. One comment I've heard more than once over the past few weeks is that "core.async go blocks don't compose like F# async, C# async, etc.". Hopefully this walkthrough will show how this is not the case.

But, as always, I think it is best to start a discussion by defining the terms that we are discussing. In this case, let's define "compose". At the most level, this word means nothing more than to "form by putting together". By that definition almost any programming could be considered composition, so perhaps that definition isn't too helpful. I think then that the discussion should be about how closely the different parts of a async system can interact with each other. Or more specifically, can I take functionality from a function performing async operations and put some of that logic in a different function, and can that function also perform async work?

In the majority of cases most modern libraries and languages require the creation of a "thing" in order to communicate data asynchronously across function boundaries. Let's take a look at a few of them:

  • Go Lang - This is the holy grail, since the language was designed from the start to support async operations, using functions to build up complex lists of operations is fairly easy, every function supports puts and takes from a channel
  • F# - Async workflows in F# are based on monadic composition. Thus if a function wants to contribute in a async workflow it return a value that can be used with the monadic bind operation. Calling a function that contributes in such a workflow will cause a task to be enqueued into a thread pool. Async await is only supported inside async workflows.
  • C# - The async syntax used in more recent versions of C# is based off of the Task which is in essence a promise. Calling an async function will spin off a task that is enqueued into a thread pool and once that task completes, the promise will be delivered and execution in the calling function can continue. Async await is only supported inside async functions
  • Haskell - The story here is much the same as F#, Control.Concurrent.Async creates monadic values, thread pools are used to execute sub processes. Async await is only supported inside async monad bindings.
  • Clojure - Instead of promises, or monadic values, Clojure go blocks return channels. These channels could return a single value, or many values. Async operations on these channels are only supported inside go blocks.

Perhaps there is a pattern here? If we were to express the concept shown in all these examples as psudeocode, it might look something like this.

(defn add-fn
[arg1 arg2]
 (async-wrapper
 (+ arg1 arg2))

(defn main-fn
[]
 (async-wrapper
 (let [token (add-fn 1 2)]
 (await
token))))

In C# async-wrapper creates a Task. In F# and Haskel, async-wrapper creates a monadic value. And in core.async async-wrapper is "go". In most of these languages await is defined as a language keyword or function called "await" and in core.async this is defined as <!. So our psudeo-code above can be translated to Clojure thusly:

clojure

(defn add-fn [arg1 arg2]
 (go (+ arg1
arg2))

(defn main-fn []
 (go (<! (add-fn 1
2)))

So let's now take a look at a slightly more complex example. Let's say that I want to read three files from disk, and I want to do this in parallel. The wrong approach would be to simply call slurp from inside a go block. The reason for this is that core.async allocates a fixed amount of threads for executing go blocks. If we were to perform a IO operation inside of a go, we would block that thread and it would be unusable by other threads.

Thankfully, core.async also has a macro called "thread" that acts much like go, but the thread for the execution of the macro's body comes from a cached thread pool. So let's create a async slurp function.

(defn
async-slurp [filename]
 (thread (slurp
filename)))

Both go and thread return a channel which is given the result of the body of the macro, so now we can easily interact with this function from inside a go.

clojure
(defn read-files []
 (go [file1 (<! (async-slurp
"file1.csv"))
 file2 (<! (async-slurp "file2.csv"))]

(process-results file1 file2)))

Oops, we have a problem. These files won't process in parallel, since the second async-slurp call isn't fired off until the first has returned. What we want to do then is do both of our calls to async-slurp, and then do a take from the returned channels. This sounds like a good use-case for a helper function:


(defn wait-for
 [&
chans]
 (go (loop [acc []
 chans chans]
 (if chans
 (recur
(conj acc (<! (first chans)))
 (next chans))

acc))))

(defn read-files []
 (go [[file1 file2] (<!
(wait-for (async-slurp "file1.csv")
 (async-slurp
"file2.csv")))]
 (process-results file1
file2)))

This is WIP….

Get In Touch