Hands-On Reactive Programming with Clojure
上QQ阅读APP看书,第一时间看更新

Transducers and core.async

Now, we might be asking ourselves, What do transducers have to do with core.async?

It turns out that once we're able to extract the core of these transformations and put them together using simple function composition, there is nothing stopping us from using transducers with data structures other than sequences!

Let's revisit our first example using standard core.async functions:

(def result (chan 10)) 
 
(def transformed 
  (->> result 
       (map< inc)      ;; creates a new channel 
       (filter< even?) ;; creates a new channel 
       (into [])))      
 
 
(go 
  (prn "result is " (<! transformed))) 
 
(go 
  (doseq [n (range 10)] 
    (>! result n)) 
  (close! result)) 
 
;; "result is " [2 4 6 8 10]  

This code should look familiar by now: it's the core.async equivalent of the sequence-only version that was shown earlier. As before, we have unnecessary allocations here as well, except that this time we're allocating channels.

With the new support for transducers, core.async can take advantage of the same transformation that we defined earlier:

(def result (chan 10)) 
 
(def xform  
     (comp (map inc) 
           (filter even?)))  ;; no intermediate channels created 
 
(def transformed (->> (pipe result (chan 10 xform))
(into [])))

(go
(prn "result is " (<! transformed)))

(go
(doseq [n (range 10)]
(>! result n))
(close! result)) ;; "result is " [2 4 6 8 10]

The code remains largely unchanged, except we now use the same xform transformation defined earlier when creating a new channel. It's important to note that we did not have to use core.async combinators—in fact, a lot of these combinators have been deprecated and will be removed in future versions of core.async.

The map and filter functions that are used to define xform are the same ones we used previously, that is, they are core Clojure functions.

This is the next big advantage of using transducers: by removing the underlying data structure from the equation via transducers, libraries such as core.async can reuse Clojure's core combinators to prevent unnecessary allocation and code duplication.

It's not too far-fetched to imagine that other frameworks, such as RxClojure, could take advantage of transducers as well. All of them would be able to use the same core function across substantially different data structures and contexts: sequences, channels, and observables.

The concept of extracting the essence of computations, disregarding their underlying data structures, is an exciting topic and has been seen before in the Haskell Community, although they deal with lists specifically [6].

Two papers worth mentioning on the subject are Stream Fusion by Duncan Coutts, Roman Leshchinskiy, and Don Stewart [7], and Transforming programs to eliminate trees by Philip Wadler [8]. There are some overlaps, so the reader might find these interesting.