So let's talk a bit about channel composition. One comment I've heard more than once over the past few weeks is that "core.async go blocks don't compose like F# async, C# async, etc.". Hopefully this walkthrough will show how this is not the case.
But, as always, I think it is best to start a discussion by defining the terms that we are discussing. In this case, let's define "compose". At the most level, this word means nothing more than to "form by putting together". By that definition almost any programming could be considered composition, so perhaps that definition isn't too helpful. I think then that the discussion should be about how closely the different parts of a async system can interact with each other. Or more specifically, can I take functionality from a function performing async operations and put some of that logic in a different function, and can that function also perform async work?
In the majority of cases most modern libraries and languages require the creation of a "thing" in order to communicate data asynchronously across function boundaries. Let's take a look at a few of them:
Perhaps there is a pattern here? If we were to express the concept shown in all these examples as psudeocode, it might look something like this.
(defn add-fn
[arg1 arg2]
(async-wrapper
(+ arg1 arg2))
(defn main-fn
[]
(async-wrapper
(let [token (add-fn 1 2)]
(await
token))))
In C# async-wrapper creates a
Task
clojure
(defn add-fn [arg1 arg2]
(go (+ arg1
arg2))
(defn main-fn []
(go (<! (add-fn 1
2)))
So let's now take a look at a slightly more complex example. Let's say that I want to read three files from disk, and I want to do this in parallel. The wrong approach would be to simply call slurp from inside a go block. The reason for this is that core.async allocates a fixed amount of threads for executing go blocks. If we were to perform a IO operation inside of a go, we would block that thread and it would be unusable by other threads.
Thankfully, core.async also has a macro called "thread" that acts much like go, but the thread for the execution of the macro's body comes from a cached thread pool. So let's create a async slurp function.
(defn
async-slurp [filename]
(thread (slurp
filename)))
Both go and thread return a channel which is given the result of the body of the macro, so now we can easily interact with this function from inside a go.
clojure
(defn read-files []
(go [file1 (<! (async-slurp
"file1.csv"))
file2 (<! (async-slurp "file2.csv"))]
(process-results file1 file2)))
Oops, we have a problem. These files won't process in parallel, since the second async-slurp call isn't fired off until the first has returned. What we want to do then is do both of our calls to async-slurp, and then do a take from the returned channels. This sounds like a good use-case for a helper function:
(defn wait-for
[&
chans]
(go (loop [acc []
chans chans]
(if chans
(recur
(conj acc (<! (first chans)))
(next chans))
acc))))
(defn read-files []
(go [[file1 file2] (<!
(wait-for (async-slurp "file1.csv")
(async-slurp
"file2.csv")))]
(process-results file1
file2)))
This is WIP….