Sunday, January 13, 2013

Go Concurrency Constructs in Clojure, part 4: idioms and tradeoffs

The Go approach to concurrent software can be characterized as: Don't communicate by sharing memory, share memory by communicating.... You use the channel to pass the data back and forth between the Go routines and make your concurrent program operate that way.

--Rob Pike, Google IO 2012 conference talk
Programmers know the benefits of everything and the tradeoffs of nothing.

--Rich Hickey, Strangeloop 2011 talk "Simple Made Easy"


/* ---[ Functional Clojure Idioms ]--- */

In the preface to Eloquent Ruby, Russ Olsen relates a story that after teaching a Ruby class one of his students complained that his Ruby programs tended to end up looking like his Java programs. I remember that same experience when I first learned Ruby in the early 2000s. In fact, I set up conventions for myself in Ruby to try to "force" it to be more like Java. I hadn't fully grokked that changing languages does not mean just learning the syntax and libraries. It means adopting the idioms, the approaches and even the constraints that the designer put into the language and that have arisen in the language community. It often means changing the way you think. That is certainly true of Clojure, perhaps more than any language I've ever learned. (Side note: I haven't learned Haskell yet!)

Go is intended primarily to be systems-programming language, with a strong focus on writing concurrent server programs. While it does include some more "modern" functional features, such as closures and first class functions, it is not a functional programming language.

These blog posts and the go-lightly library are my attempt to think about how to adopt a Go-like CSP concurrency programming style into Clojure. But we should also think about when to adopt this style of programming. I don't have an answer yet and I'm writing this library to explore this area.

The Go-channel model is a message passing model, which you could view as a poor-man's Actor model, something Rich Hickey considered for the Clojure language and decided to leave out. He outlines those reasons in the clojure.org/state page (see the "Message Passing and Actors" section in particular).

On the blog lead-in above, I quote Pike saying that Go's model is to share memory by communicating, rather than the other way around. Hickey argues that the message-passing model is a complex model. Remember that "complex", in the Clojure community, is an objective measure of how entwined things are. Sharing memory by communicating is more complex as you have to coordinate entities, particularly if you have blocking waits. If one depends directly on the other, you have an entangled system. Coordinating multiple entities can be difficult and with blocking operations can lead to deadlocks. If you use timeouts to overcome potential deadlocks, then you have to add special logic to your code to deal with it. Sharing memory with immutable values or STM-protected values is often a simpler, less complected, model.

Go (synchronous) channels are for synchronizing threads or routines. When you need to synchronize in other languages you have constructs like CountDownLatches, CyclicBarriers, waiting on a future, a join in a fork-join model or, the lowest level, mutexes and semaphores. The synchronous channel provides an easier model that also allows message passing. But remember that easy is not necessarily simple [footnote 1] and consider the tradeoffs.


/* ---[ Channels as Queues ]--- */

Go buffered channels, on the other hand, are not synchronous communication tools. They are queues for asynchronous workflows. By decoupling producer(s) and consumer(s), they are less complected. Hickey, in his Simple Made Easy talk, has a table listing paired concepts where one is more complex and the other simple. On his chart, Actors are juxtaposed with queues: Actors are complex, queues are simple. And in the 2012 Clojure conj keynote, Hickey stated that queues have been underemphasized in the Clojure community so far.

Thus, as far as channels go, the asynchronous buffered ones are more idiomatic in Clojure than synchronous channels. In fact, an async concurrent queue is used in the Clojure Programming book's webcrawler example. For contrast, I implemented this webcrawler example using go-lightly.

On the other hand, from what I've seen, synchronous channels are very idiomatic in Go, and perhaps even preferred over buffered channels. That is the impression I've gotten from watching Pike's talks and reading a few threads on the golang mailing list. For example, see this thread where one participant says:

Go channels can be asynchronous, but most of the time that's not
what you want. When communicating between goroutines running on the
same machine a synchronous send/recv improves program flow. Synchronous
channels have a lot of advantages by making program flow predictable
and easier to think about.

I'll leave it there as an open question deserving careful thought.


/* ---[ Making channels more idiomatic ]--- */

As I've been doing various example programs with go-lightly, I've noticed that the code structure can be more imperative than functional, in part because channels are not composable data structures. You can't pass a channel to map, reduce or filter, since it does not implement the ISeq interface.

To remedy this, I've added four functions to go-lightly to allow you to treat it like a seq when retrieving from it.

The first two functions convert the current values on a channel to a seq or a vector without removing them from the channel. The latter two functions remove or drain the channel either immediately (non-lazy) or as you read from it in a lazy fashion.


(channel->seq chan)
"Takes a snapshot of all values on a channel without removing the values from the channel. Returns a (non-lazy) seq of the values.


(channel->vec chan)
"Takes a snapshot of all values on a channel without removing the values from the channel. Returns a vector of the values."


(drain chan)
"Removes all the values on a channel and returns them as a non-lazy seq."


(lazy-drain chan)
"Lazily removes values from a channel. Returns a Cons lazy-seq until it reaches the end of the channel (as determined by getting a nil value when asking for the next value on the channel)."


All the sequences will end once a nil value is pulled off the queue, which represents the end of the queue. Since the lazy-drain function is lazy if something else added to the queue before the end of the queue is reached, it will read that additional value, where the non-lazy drain method will not.

A REPL session will illustrate how these work.

First let's look at channel->seq using a buffered channel:


  ;; define a channel with capacity of 7
  user=> (def ch (go/channel 7))
  #'user/ch
  user=> (dotimes [i 6] (.put ch i))
  nil
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; grab the values into a non-lazy seq
  user=> (def cseq (go/channel->seq ch))
  #'user/cseq
  user=> cseq
  (0 1 2 3 4 5)
  user=> (type cseq)
  clojure.lang.ArraySeq

  ;; the values have not been removed from the channel
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; if a value is removed from the channel the seq is not affected
  user=> (.take ch)
  0
  user=> ch
  #<LinkedBlockingQueue [1, 2, 3, 4, 5]>
  user=> cseq
  (0 1 2 3 4 5)

channel->vec behaves the same way except it returns a vector, not a seq.

Next let's look at the drain functions using a buffered channel:


  user=> (def ch (go/channel 7))
  #'user/ch
  user=> (dotimes [i 6] (.put ch i))
  nil
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; calling drain returns a seq of all the values on the
  ;; channel and removes them
  user=> (def dseq (go/drain ch))
  #'user/dseq
  user=> (type dseq)
  clojure.lang.IteratorSeq
  user=> dseq
  (0 1 2 3 4 5)
  user=> ch
  #<LinkedBlockingQueue []>

  ;; add more elements to the queue; the seq is not affected
  user=> (dotimes [i 6] (.put ch (+ 100 i)))
  nil
  user=> ch
  #<LinkedBlockingQueue [100, 101, 102, 103, 104, 105]>
  user=> dseq
  (0 1 2 3 4 5)

  ;; now let's lazily drain the queue into a lazy-seq Cons
  user=> (def zseq (go/lazy-drain ch))
  #'user/zseq
  user=> (type zseq)
  clojure.lang.Cons

  ;; realize the first three elements - takes only those
  ;; off the channel
  user=> (take 3 zseq)
  (100 101 102)
  user=> ch
  #<LinkedBlockingQueue [103, 104, 105]>

  ;; take more than are on the channel - get only what's available
  user=> (take 100 zseq)
  (100 101 102 103 104 105)
  ;; the channel is now empty
  user=> ch
  #<LinkedBlockingQueue []>

  ;; what if we try to take/read them again? They are still
  ;; in the lazy-seq since it caches the results
  user=> (take 100 zseq)
  (100 101 102 103 104 105)

  ;; we can use higher order functions - composability!
  user=> (map str (filter odd? zseq))
  ("101" "103" "105")



These functions also work with synchronous channel, but are not as useful. In particular lazy-seq faces a race condition with producers that try to transfer multiple consecutive values as shown below:


  ;; create a synchronous channel
  user=> (def c (go/channel))
  #'user/c

  ;; queue up 6 values to be put onto the queue but
  ;; only one can go on at a time waiting for a consumer
  user=> (go/go (dotimes [i 6] (.transfer c i)))
  #<core$future_call$reify__6110@5d47522a: :pending>
  user=> c

  ;; channel->vec and channel->seq will grab one value since
  ;; a producer is waiting for a consumer
  #<LinkedTransferQueue [0]>
  user=> (go/channel->vec c)
  [0]
  user=> c
  #<LinkedTransferQueue [0]>

  ;; drain also takes only the first value and also removes it
  ;; from the channel, allowing the next val to be put on the channel
  user=> (go/drain c)
  (0)
  user=> c
  #<LinkedTransferQueue [1]>

  ;; lazy-drain looks like it works!
  user=> (take 2 (go/lazy-drain c))
  (1 2)
  user=> c
  #<LinkedTransferQueue [3]>

  ;; but it has a race condition with the producer, so may
  ;; not get everything we "queued" up to transfer
  user=> (go/lazy-drain c)
  (3 4)
  user=> c
  #<LinkedTransferQueue [5]>


/* ---[ Next ]--- */

I've now created a go-lightly wiki with fairly extensive documentation and I've implemented a number of example applications using go-lightly.

A couple of things you may want to look into if you find this topic interesting:

  • I've added formal abstractions for the Channel types in go-lightly. Channel, BufferedChannel and TimeoutChannel all implement the GoChannel protocol.
  • As mentioned above, I have done a go-lightly centric implementation of a simple web crawler app based on the example at the end of Ch. 4 from the O'Reilly Clojure Programming book. This will provide a good contrast between the two concurrency approaches.
  • I have added the ability to preferentially read from one or more channels in a select or selectf.
  • I implemented Pike's Chinese Whispers example in go-lightly to see how many "Go routines" could be spawned in Clojure compared to Go. This is certainly an area where the JVM is less powerful than Go.


/* ---[ Resources and Notes ]--- */


[1] If you've spent much time in the Clojure community, you know I'm referring to the distinction that Hickey drew between the concepts of easy, a subjective concept, and simple, an objective one in his Simple Made Easy presentation. If you haven't watched it, well, listen to Bodil. (Jump back)

Blog entries in this series:

The Clojure go-lightly library on GitHub: https://github.com/midpeter444/go-lightly

Saturday, January 12, 2013

Go Concurrency Constructs in Clojure, part 3: why go-lightly?

There's a legendary example called the concurrent prime sieve, which is kind of an amazing thing. It was the first truly beautiful concurrent program I think I ever saw.

--Rob Pike, Google IO 2012 conference talk


/* ---[ Why go-lightly? ]--- */

In part 1 and part 2 of this blog series, I introduced the basics of Go channels, Go routines and the Go select statement. I then walked through initial implementations of these ideas in the go-lightly library and how to use the lamina channel and facilities to do Go-style CSP concurrent programming.

If the lamina library, which is 2+ years old now (thus reasonably mature and stable) and under active development, can be used for this, why am I proposing a new library? Well, I might have built one anyway just to get familiar with CSP style programming and improve my Clojure skills, but ultimately, I do think there is a good justification for us to consider a new library focused just around this.

The lamina library is fundamentally focused an asynchronous event-driven programming. Since dealing with callbacks can get messy and is hard to structure in a functional way, the core construct and central metaphor of Zach Tellman's approach to async programming is a channel that is used for putting and pulling events. Since a key focus of async event-driven programming is to avoid blocking, there are very few blocking operations in the lamina library. One case where it is provided is that you can choose to wait on pulling a value out of a channel. This is the part we've seen in use to emulate Go channels.

However, the primary use case for lamina channels is an event queue, which means you want it to be unbounded and non-blocking, especially for events being put onto the queue. Thus, lamina uses a Java ConcurrentLinkedQueue underneath.

Go channels, however, come in two flavors: bounded, blocking queues of size 0 (every put has to have a corresponding take) and bounded, asynchronous queues of a size you specify in the make function. The lamina channel really maps to neither, though in some scenarios it can be used for async queues or blocking queues where you need to block on read (but not write).

As I discussed in the first blog entry, Java's util.concurrent library already provides these Go channel types and even more variations on them. The bounded, blocking queue maps to a SynchronousQueue or a TransferQueue (if you only use the transfer and take methods). The bounded, asynchronous queue maps to LinkedBlockingQueue.

Thus, go-lightly proposes to wrap these Java concurrent queues, specifically facilitating a Go-style CSP concurrency semantics.


/* ---[ Why bounded blocking queues? ]--- */

So what is a use case where I really need a bounded blocking queue?

First from here on out I will use the term channel or Go channel to refer to a blocking queue of size 0 and buffered channel to refer to a non-blocking queue of arbitrary size: this is the Go terminology.

Rephrasing the question - when would I need a channel and not a buffered channel? With a buffered channel you "fire-and-forget" and let some other thread pluck it off the buffered channel when it's ready.

A channel, on the other hand, is a synchronization mechanism between threads/routines similar to a CountDownLatch, CyclicBarrier or join of a fork-join model, except you not only synchronize threads, but pass messages between them, so it is synchronizing communication tool.


/* ---[ Beautiful concurrency ]--- */

The golang site provides an example a concurrent prime sieve algorithm that, as implemented, requires blocking channels. If you were to use a lamina channel or buffered channel you'd potentially have some threads running way ahead of the others unnecessarily consuming memory and wasting CPU cycles.

This is the "first truly beautiful concurrent program" Pike referred to in his Google IO 2012 talk.

Let's look at the Go implementation from the Golang website first:

The Generate and Filter functions absolutely need to synchronize - when they push data onto a channel, they need to wait until the consumer (a chained filter function or main) is ready to pull it off.

Here is a Clojure version using go-lightly:

Happily, the implementations are pretty much the same line-for-line.


/* ---[ Next ]--- */

In the next blog entry, I will contrast the Go CSP concurrency model to the Clojure concurrency model and add some functions to allow channels to interoperate with the Clojure seq abstraction.


/* ---[ Resources ]--- */

Both of the prime sieve examples are available in the GitHub go-lightly repo:

Blog entries in this series:

Saturday, January 5, 2013

Go Concurrency Constructs in Clojure, part 2: select

"The select statement is a key part of why concurrency is built into Go as features of the language, rather than just a library. It's hard to do control structures that depend on libraries."

-Rob Pike, 2012 Google IO Conference

In the first blog entry of this series, I introduced some simple examples of the CSP (Communicating Sequential Processes) model of concurrency that have been built into the Go language. I'm blogging my investigation of how we might leverage this style of concurrent programming in Clojure.

The key benefit of the CSP approach is that you can use normal sequential semantics and control flows that are easy to reason about while building concurrent flows and processes. Channels are used to communicate and synchronize processes to bring control and deterministic behavior to an otherwise non-deterministic concurrent environment. We can do this without locks or other low-level constructs that are hard to reason about. The CSP constructs are built on top of those low-level primitives (or at least compare-and-swap mechanisms), but they are hidden from view from the application developer.


/* ---[ A construct to wait for the next available channel ]--- */

Go comes with a ready-made control structure called select. It provides a shorthand way to specify how to deal with multiple channels, as well as allow for timeouts and non-blocking behavior (via a "default" clause). It looks like a switch/case statement in C-based languages, but is different in that all paths involving a channel are evaluated, rather than just picking the first one that is ready.

Let's look at an example (adapted from Pike's 2012 Google IO talk):


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 }

This select wraps two channels. It evaluates both channels and there are four possible scenarios:

  1. c1 is ready to give a message, but c2 is not. The message from c1 is read into the variable v1 and the code clause for that first case is executed.
  2. c2 is ready to give a message, but c1 is not. v2 then is assigned to the value read from c2 and its code clause is executed.
  3. Both c1 and c2 are ready to give a message. One of them is randomly chosen to execute and the other does not execute. Note this means that you cannot depend on the order your clauses will be executed in.
  4. Neither c1 nor c2 are ready to give a message. The select will block until the first one is ready, at which point it will be read from the channel and execute the corresponding code clause.

Select statements can also have a default to make it non-blocking:


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 default:
     fmt.Println("no channel was ready to communicate")
 }

If neither channel is ready, the select executes the default clause and returns immediately.

Finally, select statements can also have a timeout:


 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-time.After(1 * time.Second):
         fmt.Println("You're too slow!")
     }
 }

In this example, the select is wrapped in an infinite loop, which will stop the first time any one round takes longer than 1 second to read from either channel. But we can also set a timeout on the loop as a whole:


 timeout := time.After(1 * time.Second)
 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-timeout:
         fmt.Println("Time's up!")
     }
 }

Now the loop will always cease after 1 second and in that one second it will read as many times as possible from either channel.

Here is an example of using selects with timeouts in a Go program:


/* ---[ Implementing select in Clojure ]--- */

Let's evaluate some of the ways we could emulate or implement the behavior of select in Clojure. While Go does have closures, treats functions as first class entities and deemphasizes object-orientation and inheritance, Go is not a functional language. So how should something like select be done in Clojure? What is the essence of what it accomplishes?

Let's first turn to the Racket language, a Lisp that is a descendant of Scheme. It has Events in the language. I am not deeply knowledgable about Racket, but from the research I've done the analog of select in Racket is sync. The sync function takes one or more "synchronizable events" and blocks until the first one is ready and returns that result:


 (let ((msg (sync evt1 evt2 evt3)))
    ;; do something with the first message result here
   )

As with Go's select, Racket's sync will choose to read from one of the events at random if more than one is ready.

Notice that the Racket version does not take a code block to execute for each event. In functional programming, it is preferable and more natural to return a value from an operation. Go's select is truly a control structure (in the C language sense of the word) - it does not return a value.

So let's implement Racket's sync in Clojure.

In order to implement select/sync in Clojure using the Java Queue libraries we used in the previous blog entry, we will need to able to check whether more than one of the queues has a value ready without blocking. That is why I selected the TransferQueue over the SynchronousQueue.

Next we have to decide what to name it. sync is already taken in clojure.core and has a specific and important enough meaning in Clojure that is best avoided. select is also used -- it is a function clojure.set namespace -- but since it is not in clojure.core, I'll go with it in my go-lightly namespace.

My initial implementation to get started is a simple one - it will check all the channels to see if any are ready and if not, do a short sleep. To do the check, it uses the .peek method of TransferQueue, since it neither blocks nor throws an exception if the queue is empty.

You pass select one or more channels and it immediately filters for those that already have a ready value. If there are any it picks one of those ready ones at random, dequeues the value and returns it. Only the one value is dequeued, so the other channels remain untouched.

If none are ready, it will "probe" the channels between short sleeps to get the first value it can find. This is an unsophisticated implementation, but it works for simple uses. (I'll provide a usage example after we add timeouts and "defaults" next.)


/* ---[ Adding "default" and timeouts to Clojure select ]--- */

The default clause in Go's select statement is a short circuit to not block if no channels are ready. Since Clojure's select is not a control structure, the most natural choice is to add another function, which I've called select-nowait.

As before it takes one or more channels (as a varargs list) and an optional sentinel keyword value. If no channels are ready, select-wait will return the sentinel keyword (if provided) or nil.


 user=> (select-nowait ch1 ch2 ch3 :bupkis)
 :bupkis

For timeouts, the Go example above shows that they come in two flavors: a timeout per round (timer starts each time you call select) or a timeout for a "conversation" that could involve multiple rounds of selecting the next value.

Let's take these one a time, as they will have different solutions in my implementation. For a timeout-per-select call, I've created a select-timeout function that takes a timeout (in milliseconds) as the first argument.


 ;; returns a value from one of the channels if it can
 ;; be read within 1 sec.  Otherwise it times out and
 ;; returns :go-lightly/timeout
 user=> (select-timeout 1000 ch1 ch2 ch3)
 :go-lightly/timeout

For an overall timeout, I provide two options.

First, following the pattern in Alexey Kachayev's example of doing this with the lamina library - we build a channel that will have a timeout sentinel value once the timer goes off. Use the go-lightly timeout-channel factory fn and then pass that timeout channel to the select function.

In order for the timeout-channel to be effective, you have to be continuously calling select until you hit the timeout. Also the current implementation of select doesn't preferentially look at the timeout channel first and select that over other channels if it is ready, but I'll be fixing that in later in the series.

You can also pass a timeout-channel into select-timeout if you want both types of timers running.

Second, I've added a general purpose with-timeout macro to the go-lightly.core library that wraps any arbitrary set of statements in a timeout.

Go here if you want to see the full implementation of these timeout methods.

All of these options are shown in this Clojure go-lightly example implementation of the Go "boring" select example:

Note: the channels here are no longer raw LinkedTransferQueues - they are go-lightly GoChannel type entities. See the go-lightly wiki for a detailed explanation.


/* ---[ Emulating Go's select in lamina ]--- */

lamina's analog to select is its join operation, which basically routes the output of multiple lamina channels into a single channel:


 user=> (use 'lamina.core)
 nil
 user=> (def ch1 (channel))
 #'user/ch1
 user=> (def ch2 (channel))
 #'user/ch2
 user=> (def ch3 (channel))
 #'user/ch3
 user=> (join ch1 ch3)
 true
 user=> (join ch2 ch3)
 true
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== […]]
 user=> (enqueue ch1 :one)
 :lamina/enqueued
 user=> (enqueue ch2 :two :three)
 :lamina/enqueued
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== [:one :two :three …]]
 

You can then read from the downstream channel:


 user=> @(read-channel ch3)
 :one
 user=> @(read-channel ch3)
 :two

To create a whole-conversation timeout, you can call the periodically fn that invokes your fn every 'period' milliseconds and returns the value. This was the inspiration for go-lightly's timeout-channel.

To create a per-round timeout, you can use either the read-channel* macro or the channel->lazy-seq function, both of which take a per-read timeout.

This program that demonstrates these options (and a few others) using lamina (with some helper functions from go-lightly):


/* ---[ Implementing Go's select in Clojure ]--- */

So we can provide Racket's sync functionality in Clojure either by implementing it ourselves or using lamina, but it is not as powerful as Go's select. What if you need to know not only the next value on the channels, but which channel it was read from? In that case, providing a function to execute per channel is a nice model. But to be more or less functional, the select statement still needs to return a value.

Let's hit an important point here: as I quoted at the start of this post, Piked has said that "it's hard to do control structures that depend on libraries". This is true in some languages, but not all - especially not in Lisps. You can do control structures with macros or sometimes just with functions and this is one of the key advantages of Lisp languages.

In the go-lightly library, I've implemented this as selectf and it turns out I didn't need a macro.

Here's an example of using go-lightly's selectf from the sleeping-barbers example app in the go-lightly-examples project:


  (defn barber-shop [clients-ch]
    (let [barber-ch (channel)]
      (loop [shop-state {:free-barbers (init-barber-vector)
                         :waiting-clients []}]
        (-> (selectf
             clients-ch #(client-walked-in % barber-ch shop-state)
             barber-ch  #(barber-available % barber-ch shop-state))
            (recur)))))

selectf takes pairs of arguments where the first member of the pair is a channel (or the :default keyword) and the second member of the pair is a function that takes one argument - the value read from that channel. (A function paired with :default takes no arguments.)

The return value of selectf is whatever the fn you provide returns. In the example above, I pass this value to the recur form so that I can reset the shop-state local var without having to use an atom to manage state changes.

And here is the implementation of selectf:


 (defn selectf
   "Control structure variable arity fn. Must be an even number of arguments where
   the first is either a GoChannel to read from or the keyword :default. The second
   arg is a function to call if the channel is read from.  Handler fns paired with
   channels should accept one argument - the value read from the channel.  The
   handler function paired with :default takes no args.  If no :default clause is
   provided, it blocks until a value is read from a channel (which could include
   a TimeoutChannel). Returns the value returned by the handler fn."
   [& args]
   (binding [*choose-fn* choose-tuple]
     (let [chfnmap (apply array-map args)
           [keywords chans] (partition-bifurcate
                             keyword?
                             (reduce #(conj % %2) [] (keys chfnmap)))
           choice (doselect chans nil (first keywords))]

       ;; invoke the associated fn
       (if choice
         ((chfnmap (nth choice 0)) (nth choice 1))
         ((chfnmap (first keywords)))))))

I won't give a full explanation of this implementation and all its helper functions, but notice this piece:


  (let [chfnmap (apply array-map args)
        ...
        ])

That's all that is required to turn the argument pairs into a control structure. It creates a map of channels to fns and once you have a map in Clojure, programming is straightforward.


/* ---[ Next ]--- */

In the next entry we'll implement some more interesting CSP examples in Go and Clojure and think about the pros and cons of using lamina vs. go-lightly.


/* ---[ Resources ]--- */

All of the code in this blog series, including the Go and lamina example code, is in the go-lightly project on GitHub.

Lamina library: https://github.com/ztellman/lamina

The Go examples are from Rob Pike's talk Google I/O 2012 - Go Concurrency Patterns

Alexey Kachayev wrote down the Go code that Pike used in the 2012 Google IO presentation, which otherwise doesn't seem to have been made available. Alexey published them as gists. They won't compile out of the box, so I've been modifying them, but wanted to link to his gists: https://gist.github.com/3124594.

Alexey also then brainstormed on ways to implement these examples in Clojure using the lamina library. Those gists are at: https://gist.github.com/3146759

Links to this blog series:

Tuesday, January 1, 2013

Go Concurrency Constructs in Clojure, part 1

If you look at the programming languages of today, you'd probably get this idea that the world is object-oriented, but it's not. It's actually parallel.... There's all these things that are happening simultaneously in the world and yet the computing tools we have are really not very good at expressing that kind of world view. And that seems like a failing.

--Rob Pike, 2012 Heroku Waza conference


/* ---[ Go ]--- */

The Go language recently turned 3 years old, so it is about 2 years Clojure's junior. I have only started investigating Go and one of the things that has captured my attention are its primitives for concurrency. Rob Pike, the leading spokesman for Go and one of its co-creators, has done a number of interesting talks on Go concurrency patterns and how it is built into the language. If you haven't watched them, here are two that I recommend:

I'll be referring to the first one through this post.

Pike talks a lot about how Go routines and channels are first class entities in the language, with simple syntax and keywords baked in. Go routines are akin to threads that you kick off to run in "the background". Pike's analogy is to think of them like launching a process on the command line with the ampersand. Staying with the Unix analogy, if you launch a process in the background and then need to communicate with it, what do you do? In Unix/Linux you have a number of options, such as a socket, a pipe or some other form of IPC or to use shared memory.

The Go language creators have chosen the "channel" as a core "inter-routine" communication mechanism. Quoting Pike: "The Go model is not to communicate by sharing memory, but to share memory by communicating".

What is called "inter-routine" here would be called inter-process in Erlang or traditional C/Unix programming or "inter-thread" communication in languages like Java. But for Go it is more appropriate to say "inter-routine". Pike emphasizes that go routines are lighter weight than threads. Go routines can be shared or multiplexed onto multiple running threads over their lifetime, avoiding thread starvation issues. They have their own stack frame, but I believe it is managed on the heap (need to research this more).

The roots of the Go routine and Go channel start in Tony Hoare's Communicating Sequential Processes paper (now book). CSP addresses concurrency interaction patterns - how separate processes (in the Unix or Erlang sense), threads or routines communicate and coordinate with each other via message passing. We want constructs that reduce the complexity of inter-process/inter-thread communication using primitives that are easy to use and reason about. This means not having to be a deep expert in a system's memory model in order to do concurrent programming. Instead, it hides semaphores, mutexes, barriers and other low level concurrency constructs in higher-level abstractions.


/* ---[ Go-style CSP in Clojure? ]--- */

My primary interest here is in what support for Go-like CSP patterns exist, or can be made to exist, in Clojure. Clojure, after all, promises to bring sanity to concurrent programming by means of efficient immutable data structures and software transaction memory for mutable state.

Go channels come in two forms: synchronous blocking channels that cannot hold multiple entries and non-synchronous buffered channels that can have multiple entries. I'll explain the nuances of synchronous here in a moment.

The Go spec says: A channel provides a mechanism for two concurrently executing functions to synchronize execution and communicate by passing a value of a specified element type.

Does Clojure have this? There are two things in Clojure or Java we can potentially use to emulate Go channels:

  • Java concurrency queues. In particular, SynchronousQueue, BlockingQueue and TransferQueue.
  • Zach Tellman's Clojure lamina library, whose primary focus is asynchronous event-based programming.

In this blog series, I will introduce the go-lightly Clojure library I built to have Go concurrency constructs. The GitHub repo for it also has many examples, some of which use Java concurrent queues directly, some of which use the lamina library and many of which use the go-lightly library.

Note: I also ran across the Java CSP (JCSP) project, which I haven't investigated yet, but might be something to build a Clojure library around.


/* ---[ "boring" basics ]--- */

In his initial examples to show how Go channels work, Pike uses a background process that is, as he calls it, "boring" - it just prints its name or the word "boring" at random intervals. After listening for a while, the "main" process gets tired and wants to leave or end the conversation.

Here are the first two examples in Go from his 2012 Google IO talk, modified slightly to work from one main function:

Here is the output from each option (single vs. multiple):

$ ./boring-generators single
You say: "boring! 0"
You say: "boring! 1"
You say: "boring! 2"
You say: "boring! 3"
You say: "boring! 4"
You're boring: I'm leaving.

$ ./boring-generators multiples
Joe 0
Ann 0
Joe 1
Ann 1
Joe 2
Ann 2
Joe 3
Ann 3
Joe 4
Ann 4
Joe 5
Ann 5
Joe 6
Ann 6
Joe 7
Ann 7
Joe 8
Ann 8
Joe 9
Ann 9
You're boring: I'm leaving.

See appendix 1 if you would like to compile and run this on your system. I'll frequently list the outputs during this blog series, but it helps to see the latency between print statements to get a better feel for how these examples work.

See appendix 2 for links my GitHub project with this code and to other gists with related code examples.


Pike calls this the generator pattern because the invoked "boring" function creates a (synchronous) channel, then creates a closure that references that channel, launches that closure as a go routine and immediately returns the channel to the calling function.

The go routine sends messages to the channel with the <- operator.
For example, c <- "hello kitty" sends the feline greeting into the channel.

The other function receives messages from the channel with the same operator, by putting the channel on the right side of the operator: <- c.

These are blocking operations with a synchronous channel. If the sender sends a message to the channel when there is no receiver waiting, it will block until a receiver comes and grabs its message off the channel. And, conversely, a receiver will block waiting for a sender if there isn't already one pushing to the channel.

In the multipleGenerators version, two go routines are created, each with its own channel. The receiving "main" function now receives from each channel consecutively in a defined order, which is why you always get Joe's output first, then Ann's.

One final thing to note: we don't explicitly do any work to shutdown the go routines. Go routines will automatically stop operation when the main function exits. Thus, go routines are like daemon threads in the JVM (well, except for the "thread" part ...)


/* ---[ "boring" in Clojure ]--- */

Java has two analogs of a Go synchronous channel: the SynchronousQueue and the TransferQueue.

The SynchronousQueue has the same specs I mentioned above: it allows one sender at a time that will block waiting for a receiver and one receiver at a time that will block waiting for a sender. The "queue", despite its name, has no internal storage. It's size method always returns 0 and its peek method always returns null. If you use just the put method to send and the take function to receive, it behaves like the Go channel in the first example.

The TransferQueue is a more liberal - you can use it like a SynchronousQueue or more like a BlockingQueue where you can put multiple messages on the queue and only block if you try to put onto a full queue (if it is bounded) or take from an empty queue. To use it like a SynchronousQueue, use transfer and take. The API is rich enough that you can find other uses also, such as non-blocking offer and poll methods to send and receive without blocking.

In the go-lightly code base examples, I use both, but here I'm going to stick with the TransferQueue for reasons that will become clear when we get to the Go select statement in the next blog entry.

OK, so the channel will be a Java TransferQueue. How do we implement a go routine in Clojure?

Clojure's future function is similar to a go routine in that it launches a new "routine" (Java thread) with its own stack and flow of execution. The interface is similar too: in Go you give an invoked function to the go statement. You do the same to the Clojure future function.

// launch a go routine in Go
go func() { fmt.Println ("hello kitty") }()

;; launch a thread that returns a future in Clojure
(future (println "sayonara"))

But Clojure futures differ from a go routine in at least two important ways:

  1. A Clojure future launches a new thread (or rather obtains a thread from the Clojure thread agent pool), whereas, as mentioned before, a Go routine is lighter weight than a Java thread. I don't know of any Clojure or Java facilities for creating something equally lightweight, so I will use threads.

  2. Clojure futures are not daemon threads and I don't think there is any way to tell them to be daemon threads. Thus, you cannot give a future an infinite loop and expect your program to close down. If you launch such a future, you would have to either:

    1. Call (future-cancel myfut) on the future, which means you have to retain a reference to the future. You can't just fork-and-forget.
    2. Set a flag, either through shared memory or a message via a channel, that the future will check periodically to see if it should stop. However, if this future is stuck in a blocking call trying to read from or push onto the blocking queue, this approach won't work.

In addition, you also need to call (shutdown-agents) at the end of your Clojure program to shut down the agent/future Thread pool.

Thus, Clojure's future requires more bookkeeping than Go's go.

But Clojure has macros to help, so I've written two macros and some helper functions to lessen the bookkeeping.

The first macro is simply called go and has accompanying stop and shutdown functions. The second macro, go&, is meant for fork-and-forget use so I give it the Unix ampersand in the name. It has the least amount of ceremony, but cannot be controlled through the Java Executor framework like the future and agents threads can be.

And here's my implementation of the boring-generator using it. To match the Go terminology, Finally, I also define the function channel to simply return a LinkedTransferQueue.

While the go& macro is easier to use and more like the actual Go go-routine launcher, it has one downside: it is not REPL-friendly if your go routine doesn't shutdown naturally. In this case it will block on the (.transfer ch) call on line 39 and hang around in memory. And each time you invoke the function in the REPL it creates another daemon, draining JVM resources and memory over time.

Since Go language development is not REPL-based they can get away with it. If you create a main function:

(defn -main [& args]
  (thornydev.go-lightly.boring.generator-tq/multiple-generator&))

and run it from the command line:

lein run

it will behave just like the Go version. You'll get the same output as the Go example above and it will shutdown gracefully instead of hanging.

But to be REPL friendly, I'll typically use go and stop rather than go&.

Another way to handle this in Go would be to close the channel and have the go-routine check whether the channel is closed before continuing.

Unfortunately, this is not possible with the Java concurrency classes mentioned above - they are not Closeable resources.

However, the go-lightly library and the Clojure lamina libraries both provide the notion of a closeable channel, which leads to our next implementation.


/* ---[ A "boring" lamina implementation ]--- */

The lamina library created by Zach Tellman provides constructs for handling evented asynchronous programming. It defines a channel whose purpose is to be a event-driven data structure that represents a stream of messages or events. A lamina channel has basic send (enqueue) and receive (receive or read-channel) functionality, but also is composable with other channels using classic functional programming constructs, such as map, reduce and filter. It also provides very useful fork/join operators.

For the "boring" example, we only need the ability to enqueue, read, and close the channel and check whether the channel is closed.

Since the "boring" go routine stops when the channel is closed, this use of the go& macro is REPL-friendly and works as expected when run standalone from the command line.


/* ---[ More macro goodness ]--- */

Finally, let's add one more macro to clean up that last example. If you've done much Clojure or Lisp programming, you know that a common macro pattern is a with-xxx binding macro that cleans up resources for you.

Clojure in fact has a with-open binding macro that will call "close" on all the things specified in the bindings. So that should work here, right? Well, the devil being in the details, it doesn't. with-open actually calls the Java Closeable interface method .close, not "close". And lamina channels do not implement Java's Closeable interface - they do not have a .close. So I grabbed the source for clojure.core/with-open and made my own (and put it in the go-lightly.util namespace):

(defmacro with-channel-open
  "bindings => [name init ...]

  Evaluates body in a try expression with names bound to the values
  of the inits, and a finally clause that calls (close name) on each
  name in reverse order."
  [bindings & body]
  (assert (vector? bindings) "a vector for its binding")
  (assert (even? (count bindings)) "an even number of forms in binding vector")
  (cond
    (= (count bindings) 0) `(do ~@body)
    (symbol? (bindings 0)) `(let ~(subvec bindings 0 2)
                              (try
                                (with-channel-open ~(subvec bindings 2) ~@body)
                                (finally
                                  (~'close ~(bindings 0)))))
    :else (throw (IllegalArgumentException.
                   "with-open only allows Symbols in bindings"))))

So with that in place we can revise the "boring" lamina implementation:


/* ---[ Looking ahead ]--- */

In the next installment we'll dig into the Go select statement, and further evaluate how to use the Java concurrency queues and the lamina libraries as Go channels.



/* ---[ Appendix 1: Compile and run Go examples ]--- */

If you don't have Go installed, see the golang install guide. On Ubuntu, it is as simple as:

sudo apt-get install golang-go

Next, decide where you want your go projects to live (mine are in $HOME/lang/go/projects). cd to that directory and do:

$ export GOPATH=$HOME/lang/go/projects  # change to yours
$ mkdir src
$ mkdir src/boring-generators  # your Go project name here
$ cd boring-generators
$ emacs boring-generators.go   # or whatever editor you like
$ go build   # this invokes the compiler

You will then have a boring-generators executable that you can run:

$ ./boring-generators

(Note: I didn't find that setting GOPATH was really necessary, but it is in the instructions, so YMMV).



/* ---[ Appendix 2: Resources ]--- */

Alexey Kachayev wrote down the Go code that Pike used in the 2012 Google IO presentation, which doesn't seem to have been made available. Alexey published them as gists. They won't compile out of the box, so I've been modifying them, but wanted to link to his gists: https://gist.github.com/3124594.

Alexey also then brainstormed on ways to implement these examples in Clojure using the lamina library. Those gists are at: https://gist.github.com/3146759

My code for working through these ideas are in my go-lightly project on GitHub.

Links to this blog series: