Friday, October 11, 2013

Launching a Cascading job from Apache Oozie

The Cascading framework has its own workflow management system embedded in it, so when I tried to find information online about how to launch a Cascading job from within the Apache Oozie workflow scheduler tool, I found a dearth of information.

In fact, when I asked on the oozie-users mailing list how to do it, the only response I got back was to write an Oozie extension to run Cascading jobs. That may be the right solution long term (don't know enough yet), but I did find a way to get it working with what Oozie provides today.


/*---[ Failed attempts ]---*/

I tried unsuccessfully to use the map-reduce action and the shell action. The former won't work because it wants you to specify the Mapper and Reducer classes explicitly. That doesn't make sense in a Cascading job - you launch your main Cascading class and it auto-generates a bunch of mappers and reducers. Secondly, while you can use the oozie.launcher.action.main.class property and specify your main Cascading class, there seems to be no way to pass arguments to it.

I'm not sure why I couldn't get the shell action to work. I made the exec property /usr/bin/hadoop in order to run it as hadoop jar myjar.jar com.mycompany.MyClass arg1 arg2 argN, but several attempts to make that work failed. There probably is a way to make it work, however.


/*---[ Solution: use the java action ]---*/

In order to launch Cascading jobs, we build an uber-jar (which maven annoyingly calls a shaded jar) that has our specific Cascading code and supporting objects, as well as the Cascading library all bundled in it. But that's not enough as all that depends on the myriad Hadoop jars. We then use the hadoop jar invocation as I indicated above because it puts all the Hadoop jars in the classpath.

I didn't think using the Oozie java action would work unless I built a massive uber jar with all the Hadoop dependencies which then have to get farmed around the Hadoop cluster each time you run it -- a great waste.

But I was happily surprised to notice that Oozie sets up the classpath for java (and map-reduce) tasks with all the Hadoop jars present.

So, here's the workflow.xml file that works:

<workflow-app xmlns='uri:oozie:workflow:0.2' name='cascading-wf'>
  <start to='stage1' />
  <action name='stage1'>
    <java>
      <job-tracker>${jobTracker}</job-tracker>
      <name-node>${nameNode}</name-node>

      <configuration>
        <property>
          <name>mapred.job.queue.name</name>
          <value>${queueName}</value>
        </property>
      </configuration>

      <main-class>com.mycompany.MyCascade</main-class>
      <java-opts></java-opts>
      <arg>/user/myuser/dir1/dir2</arg>
      <arg>my-arg-2</arg>
      <arg>my-arg-3</arg>
      <file>lib/${EXEC}#${EXEC}</file> 
      <capture-output />
    </java>
    <ok to="end" />
    <error to="fail" />
  </action>


  <kill name="fail">
    <message>FAIL: Oh, the huge manatee!</message>
  </kill>

  <end name="end"/>
</workflow-app>

The parameterized variables, such as ${EXEC}, are defined in a job.properties in the same directory as the workflow.xml file. The shaded jar is in a lib subdirectory as indicated.

  
 nameNode=hdfs://10.230.138.159:8020
 jobTracker=http://10.230.138.159:50300
  
 queueName=default
  
 oozie.wf.application.path=${nameNode}/user/${user.name}/examples/apps/cascading
 EXEC=mybig-shaded-0.0.1-SNAPSHOT.jar

Let me know if you find another way to launch a Cascading job from Oozie or find any problems with this solution.

Tuesday, October 8, 2013

Beautiful Code Ported to Go

This week I've been learning Go - the programming language, not the game. I had studied its concurrency primitives for my Clojure library that would bring the CSP model to Clojure (this was before Rich Hickey and crew created core.async), but until a few days ago I hadn't formally studied the whole of Go with the intention of being proficient in it.

Go has pointers, but it does not have pointer arithmetic. Instead, it has slices - variable sized arrays on which you can use Python-like "slice" notation. I wanted a chance to try that out and found it recently when reading Chapter 1 of Beautiful Code, which is about a limited regular expression that Rob Pike (co-creator of Go) wrote for the Practice of Programming book he co-wrote with Brian Kernighan (who is also the author of Ch. 1 of Beautiful Code).

Pike's code is a limited (pedagogical) regex library that allows the following notation:

|------------+------------------------------------------------------------|
| Character  | Meaning                                                    |
|------------+------------------------------------------------------------|
| c          | Matches any literal character c                            |
| . (period) | Matches any single character                               |
| ^          | Matches the beginning of the input string.                 |
| $          | Matches the end of the input string                        |
| *          | Matches zero or more occurrences of the previous character |
|------------+------------------------------------------------------------|


/* ---[ The C version ]--- */

Here's Pike's code in C:


 #include <stdio.h>

 int matchstar(int c, char *regexp, char *text);

 /* matchhere: search for regexp at beginning of text */
 int matchhere(char *regexp, char *text) {
   if (regexp[0] == '\0')
     return 1;
   if (regexp[1] == '*')
     return matchstar(regexp[0], regexp+2, text);
   if (regexp[0] == '$' && regexp[1] == '\0')
     return *text == '\0';
   if (*text!='\0' && (regexp[0]=='.' || regexp[0]==*text))
     return matchhere(regexp+1, text+1);
   return 0;
 }


 /* matchstar: search for c*regexp at beginning of text */
 int matchstar(int c, char *regexp, char *text) {
   do {
     /* a * matches zero or more instances */
     if (matchhere(regexp, text))
       return 1;
   } while (*text != '\0' && (*text++ == c || c == '.'));
   return 0;
 }


 /* match: search for regexp anywhere in text */
 int match(char *regexp, char *text) {
   if (regexp[0] == '^')
     return matchhere(regexp+1, text);
   do {
     /* must look even if string is empty */
     if (matchhere(regexp, text))
       return 1;
   } while (*text++ != '\0');
   return 0;
 }

I'll let you read Ch. 1 of Beautiful Code for an analysis, but two things are noteworthy for my purposes:

  1. Pike uses pointer arithmetic throughout the code
  2. He uses the unusual do-while loop twice in only 30 or so lines of code

So I thought I'd port it to Pike's new language Go.


/* ---[ My Go version ]--- */


 package pikeregex

 // search for c*regex at beginning of text
 func matchstar(c rune, regex []rune, text []rune) bool {
     for {
         if matchhere(regex, text) {
             return true
         }
         if ! (len(text) > 0 && (text[0] == c || c == '.')) {
             return false
         }
         text = text[1:]
     }
 }

 // search for regex at beginning of text
 func matchhere(regex []rune, text []rune) bool {
     if len(regex) == 0 {
         return true
     }
     if len(regex) > 1 && regex[1] == '*' {
         return matchstar(regex[0], regex[2:], text)
     }
     if regex[0] == '$' && len(regex) == 1 {
         return len(text) == 0
     }
     if len(text) > 0  && (regex[0] == '.' || regex[0] == text[0]) {
         return matchhere(regex[1:], text[1:])
     }
     return false
 }

 // search for regex anywhere in the text
 func Match(regex string, text string) bool {
     runerx := compile(regex)
     runetxt := []rune(text)

     if len(runerx) > 0 && runerx[0] == '^' {
         return matchhere(runerx[1:], runetxt)
     }

     for {
         if matchhere(runerx, runetxt) {
             return true
         }
         if len(runetxt) == 0 {
             return false
         }
         runetxt = runetxt[1:]
     }
 }

 // one enhancement: allow + (1 or more) notation
 func compile(regex string) (regslc []rune) {
     regslc = make([]rune, 0, len(regex) + 10)

     for _, r := range regex {
         if r == '+' {
             regslc = append(regslc, regslc[len(regslc) - 1], '*')
         } else {
             regslc = append(regslc, r)
         }
     }   
     return regslc
 }

This is as straight a port as I could make it. And I think it translates well to Go. I've capitalized the Match method, as that is the public one to be exported to other libraries.

Instead of pointer arithmetic I used slice notation, as in this recursive call to matchhere:


 // C version
 return matchhere(regexp+1, text+1);

 // Go version
 return matchhere(regex[1:], text[1:])

Also in the C code you check whether you are at the end of the text string by looking for the NUL char: *text == '\0'. In Go, you can use the builtin len function: len(text) == 0. That statement is true if you keep recursively slicing text[1:] until you get to an empty string, or rather in my code, an empty slice of runes.


/* ---[ Runeology ]--- */

Runes are the Go 'char' type. A rune is an integer value identifying a Unicode code point. When you iterate over strings, you get runes, which are of variable size (number of bytes).

You have to be careful with strings in Go: text[2] returns the third byte, not the third rune in the string. If you want the third rune, you might try to use the utf8.DecodeRuneInString(text[2:]) function. But this would only work with ASCII, as you are slicing at the third byte and asking the utf8 library to parse the first rune from that point. But if the first rune in the string is two bytes long, you'll be getting the second rune in the string, not the third. If it's three bytes long, you're really in trouble.

The safest way is to do what I did in the code: convert the string to a slice of runes ([]rune) immediately and then work it that. Now when you index runeslice[2] you know you are getting the third rune.


/* ---[ No do-while ]--- */

Go doesn't have a do-while loop. It doesn't even have a while statement: just for. But, as Rob Pike reminded me in a critique of the first version of this blog entry, a do-while can be adequately mimicked with an infinite for loop:

 func matchstar(c rune, regex []rune, text []rune) bool {
     for {
         if matchhere(regex, text) {
             return true
         }
         if ! (len(text) > 0 && (text[0] == c || c == '.')) {
             return false
         }
         text = text[1:]
     }
 }

The stated intent of Go was to be as minimal as possible. Pike, in a recent podcast interview, said that the core team that created Go (which includes Ken Thompson) all had to agree that a feature was essential for it be included. Many candidate features were dropped, including the do-while loop. Of note, goto was not, which I find quite interesting. goto is only mentioned once (almost in passing) in the Effective Go guide, so I'm interested in what the essential use case for it was.


/* ---[ One addition ]--- */

Finally, in the Beautiful Code chapter, Kernighan suggests a number of enhancements the reader can make. I've only done one - allowing the + (1 or more) operator by mildly precompiling the regex, turning x+ into xx*, allowing me to use Pike's original (ported) code untouched.

The above code is available on GitHub: https://github.com/midpeter444/pikeregex

Wednesday, September 25, 2013

How to compile Groovy scripts and run them on systems with no Groovy installed


/*---[ Problem ]---*/

This week I was faced with the need to write a Groovy script that would run on a Hadoop node at work, but we don't yet have groovy installed on the Hadoop nodes (and I don't have privileges to do that). Since Groovy is our defined scripting language, I had two options in the meantime:

  1. Download the groovy zip package, just unzip it in my user directory on the Hadoop node and run my thing.
  2. Compile the groovy script to bytecode and build an uber-jar with groovy in it and then run it like a Java program (with java -cp myjar.jar blah blah blah).


/*---[ Solution ]---*/

Since the second sounded like more of a challenge and would teach me a few things I hadn't done yet, I picked that. It worked out - here's my cheat sheet for future reference.


/*---[ Quoth the Maven ]---*/

Create a new maven project:

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false -DgroupId=net.thornydev -DartifactId=script


/*---[ Two plugins ]---*/

To compile groovy to bytecode use the groovy-eclipse-compiler plugin. Yes, I know that sounds weird, but you don't need to fire up Eclipse. You don't even need to have Eclipse installed.

To build the uberjar having your compiled script and all of groovy, use the maven-shade-plugin. Like most things about maven, I find name "shade-plugin" irritating, but it gets the job done.

Finally include groovy-all.jar as a dependency.

Here's my pom:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>net.thornydev</groupId>
  <artifactId>script</artifactId>
  <packaging>jar</packaging>
  <version>1.0</version>
  <name>script</name>
  <url>http://maven.apache.org</url>
  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>2.3.2</version>
        <configuration>
          <compilerId>groovy-eclipse-compiler</compilerId>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.codehaus.groovy</groupId>
            <artifactId>groovy-eclipse-compiler</artifactId>
            <version>2.7.0-01</version>
          </dependency>
        </dependencies>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>2.1</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>      
    </plugins>
  </build>
  <dependencies>
    <dependency>
      <groupId>org.codehaus.groovy</groupId>
      <artifactId>groovy-all</artifactId>
      <version>2.0.7</version>      
    </dependency>
  </dependencies>
</project>


/*---[ Treat groovy like a first class citizen ]---*/

Put your groovy file(s) in the src/main/java directory, not src/main/groovy like that other Groovy compiler tool wants.

The directory structure:

$ tree
.
├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── net
    │           └── thornydev
    │               └── GroovyApp.groovy

The Groovy code:

package net.thornydev;

class Script {

  def main(args) {
    println "Hello ${args[0]}. I'm groovy."
  }
}

new Script().main(args)


/*---[ Package, push, run ]---*/

Next: mvn clean package

Then scp the script-1.0.jar in the target dir to your desired system and run it:

$ java -cp script-1.0.jar net.thornydev.GroovyApp thornydev
Hello thornydev. I'm groovy.

QED.

Saturday, August 17, 2013

VMWare Player Crashes in Ubuntu After Kernel Upgrade


/*---[ Annoyance ]---*/

With Xubuntu 13.04, every time I get a kernel upgrade, which seems to happen at least once a month, my VMWare Player no longer works. I'm sure this is not specific to Xubuntu - probably any Ubuntu-13.04 based distro will have this problem.

The first time it happened, I spent a while trying to fix it and get it to recompile and then ended up deciding to uninstall and reinstall, but even that was a mess because the vmware-uninstaller doesn't work and tells you to use the installer and then I downloaded a really old version of VMWare Player that for some reason VMWare still has up and and came up as a top choice in google. When I installed that, it complained about all sorts of kernel issues, including that it wouldn't install on a system with KVM enabled, so I disabled KVM and it still wouldn't work. Then I figured out I had a really old version (like version 2 instead of 5!).


/*---[ Solution ]---*/

The solution is quite simple, so I'm documenting it here to save others headache.

If you have a kernel upgrade and VMWare player won't start, this is what I do:

  1. Uninstall: sudo vmware-installer -u vmware-player
  2. Download the latest VMPlayer for Linux from here: https://www.vmware.com/products/player/
  3. Reinstall: sudo ./VMware-Player-5.0.2-1031769.x86_64.bundle

Only takes a few minutes and all is back to normal.

Friday, July 5, 2013

Querying JSON records via Hive


/* ---[ Opacity: A brief rant ]--- */

Despite the popularity of Hadoop and its ecosystem, I've found that much of it is frustratingly underdocumented or at best opaquely documented. An example proof of this is the O'Reilly Programming Hive book, whose authors say they wrote it because so much of Hive is poorly documented and exists only in the heads of its developer community.

But even the Programming Hive book lacks good information on how to effectively use Hive with JSON records, so I'm cataloging my findings here.


/* ---[ JSON and Hive: What I've found ]--- */

I've only been playing with Hive about two weeks now, but here's what I found with respect to using complex JSON documents with Hive.

Hive has two built-in functions, get_json_object and json_tuple, for dealing with JSON. There are also a couple of JSON SerDe's (Serializer/Deserializers) for Hive. I like this one the best: https://github.com/rcongiu/Hive-JSON-Serde

I will document using these three options here.

Let's start with a simple JSON document and then move to a complex document with nested subdocuments and arrays of subdocuments.

Here's the first document:

{
    "Foo": "ABC",
    "Bar": "20090101100000",
    "Quux": {
        "QuuxId": 1234,
        "QuuxName": "Sam"
    }
}

We are going to store this as a Text document, so it is best to have the whole JSON entry on a single line in the text file you point the Hive table to.

Here it is on one line for easy copy and pasting:

{"Foo":"ABC","Bar":"20090101100000","Quux":{"QuuxId":1234,"QuuxName":"Sam"}}

Let's create a Hive table to reference this. I've put the above document in a file called simple.json:

CREATE TABLE json_table ( json string );

LOAD DATA LOCAL INPATH  '/tmp/simple.json' INTO TABLE json_table;

Since there are no delimiters, we leave off the ROW FORMAT section of the table DDL


Built in function #1: get_json_object

The get_json_object takes two arguments: tablename.fieldname and the JSON field to parse, where '$' represents the root of the document.

select get_json_object(json_table.json, '$') from json_table; 

Returns the full JSON document.

So do this to query all the fields:

select get_json_object(json_table.json, '$.Foo') as foo, 
       get_json_object(json_table.json, '$.Bar') as bar,
       get_json_object(json_table.json, '$.Quux.QuuxId') as qid,
       get_json_object(json_table.json, '$.Quux.QuuxName') as qname
from json_table;

You should get the output:

foo    bar              qid     qname
ABC    20090101100000   1234    Sam

(Note: to get the header fields, enter set hive.cli.print.header=true at the hive prompt or in your $HOME/.hiverc file.)

This works and has a nice JavaScript like "dotted" notation, but notice that you have to parse the same document once for every field you want to pull out of your JSON document, so it is rather inefficient.

The Hive wiki recommends using json_tuple for this reason.


Built in function #2: json_tuple

So let's see what json_tuple looks like. It has the benefit of being able to pass in multiple fields, but it only works to a single level deep. You also need to use Hive's slightly odd LATERAL VIEW notation:

select v.foo, v.bar, v.quux, v.qid 
from json_table jt
     LATERAL VIEW json_tuple(jt.json, 'Foo', 'Bar', 'Quux', 'Quux.QuuxId') v
     as foo, bar, quux, qid;

This returns:

foo  bar             quux                              qid
ABC  20090101100000  {"QuuxId":1234,"QuuxName":"Sam"}  NULL

It doesn't know how to look inside the Quux subdocument. And this is where json_tuple gets clunky fast - you have to create another lateral view for each subdocument you want to descend into:

select v1.foo, v1.bar, v2.qid, v2.qname 
from json_table jt
     LATERAL VIEW json_tuple(jt.json, 'Foo', 'Bar', 'Quux') v1
     as foo, bar, quux
     LATERAL VIEW json_tuple(v1.quux, 'QuuxId', 'QuuxName') v2
     as qid, qname;

This gives us the output we want:

foo  bar             qid   qname
ABC  20090101100000  1234  Sam

With a complicated highly nested JSON doc, json_tuple is also quite inefficient and clunky as hell. So let's turn to a custom SerDe to solve this problem.


The best option: rcongiu's Hive-JSON SerDe

A SerDe is a better choice than a json function (UDF) for at least two reasons:

  1. it only has to parse each JSON record once
  2. you can define the JSON schema in the Hive table schema, making it much easier to issue queries against.

I reviewed a couple of SerDe's and by far the best one I've found is rcongiu's Hive-JSON SerDe.

To get that SerDe, clone the project from GitHub and run mvn package. It creates a json-serde-1.1.6.jar in the target directory. If you have a place you like to put your jars for runtime referencing move it there.

Then tell Hive about it with:

ADD JAR /path/to/json-serde-1.1.6.jar;

You can do this either at the hive prompt or put it in your $HOME/.hiverc file.

Now let's define the Hive schema that this SerDe expects and load the simple.json doc:

CREATE TABLE json_serde (
  Foo string,
  Bar string,
  Quux struct<QuuxId:int, QuuxName:string>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe';

LOAD DATA LOCAL INPATH '/tmp/simple.json' INTO TABLE json_serde;

With the openx JsonSerDe, you can define subdocuments as maps or structs. I prefer structs, as it allows you to use the convenient dotted-path notation (e.g., Quux.QuuxId) and you can match the case of the fields. With maps, all the keys you pass in have to be lowercase, even if you defined them as upper or mixed case in your JSON.

The query to match the above examples is beautifully simple:

SELECT Foo, Bar, Quux.QuuxId, Quux.QuuxName
FROM json_serde;

Result:

foo  bar             quuxid  quuxname
ABC  20090101100000  1234    Sam



And now let's do a more complex JSON document:

{
  "DocId": "ABC",
  "User": {
    "Id": 1234,
    "Username": "sam1234",
    "Name": "Sam",
    "ShippingAddress": {
      "Address1": "123 Main St.",
      "Address2": null,
      "City": "Durham",
      "State": "NC"
    },
    "Orders": [
      {
        "ItemId": 6789,
        "OrderDate": "11/11/2012"
      },
      {
        "ItemId": 4352,
        "OrderDate": "12/12/2012"
      }
    ]
  }
}

Collapsed version:

{"DocId":"ABC","User":{"Id":1234,"Username":"sam1234","Name":"Sam","ShippingAddress":{"Address1":"123 Main St.","Address2":"","City":"Durham","State":"NC"},"Orders":[{"ItemId":6789,"OrderDate":"11/11/2012"},{"ItemId":4352,"OrderDate":"12/12/2012"}]}}

Hive Schema:

CREATE TABLE complex_json (
  DocId string,
  User struct<Id:int,
              Username:string,
              Name: string,
              ShippingAddress:struct<Address1:string,
                                     Address2:string,
                                     City:string,
                                     State:string>,
              Orders:array<struct<ItemId:int,
                                  OrderDate:string>>>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe';

Load the data:

    LOAD DATA LOCAL INPATH '/tmp/complex.json' 
    OVERWRITE INTO TABLE complex_json;

First let's query something from each document section. Since we know there are two orders in the orders array we can reference them both directly:

SELECT DocId, User.Id, User.ShippingAddress.City as city,
       User.Orders[0].ItemId as order0id,
       User.Orders[1].ItemId as order1id
FROM complex_json;

Result:

docid  id    city    order0id  order1id
ABC    1234  Durham  6789      4352

But what if we don't know how many orders there are and we want a list of all a user's order Ids? This will work:

SELECT DocId, User.Id, User.Orders.ItemId
FROM complex_json;

Result:

docid  id    itemid
ABC    1234  [6789,4352]

Oooh, it returns an array of ItemIds. Pretty cool. One of Hive's nice features.

Finally, does the openx JsonSerDe require me to define the whole schema? Or what if I have two JSON docs (say version 1 and version 2) where they differ in some fields? How constraining is this Hive schema definition?

Let's add two more JSON entries to our JSON document - the first has no orders; the second has a new "PostalCode" field in Shipping Address.

{
    "DocId": "ABC",
    "User": {
        "Id": 1235,
        "Username": "fred1235",
        "Name": "Fred",
        "ShippingAddress": {
            "Address1": "456 Main St.",
            "Address2": "",
            "City": "Durham",
            "State": "NC"
        }
    }
}

{
    "DocId": "ABC",
    "User": {
        "Id": 1236,
        "Username": "larry1234",
        "Name": "Larry",
        "ShippingAddress": {
            "Address1": "789 Main St.",
            "Address2": "",
            "City": "Durham",
            "State": "NC",
            "PostalCode": "27713"
        },
        "Orders": [
            {
                "ItemId": 1111,
                "OrderDate": "11/11/2012"
            },
            {
                "ItemId": 2222,
                "OrderDate": "12/12/2012"
            }
        ]
    }
}

Collapsed version:

{"DocId":"ABC","User":{"Id":1235,"Username":"fred1235","Name":"Fred","ShippingAddress":{"Address1":"456 Main St.","Address2":"","City":"Durham","State":"NC"}}}
{"DocId":"ABC","User":{"Id":1236,"Username":"larry1234","Name":"Larry","ShippingAddress":{"Address1":"789 Main St.","Address2":"","City":"Durham","State":"NC","PostalCode":"27713"},"Orders":[{"ItemId":1111,"OrderDate":"11/11/2012"},{"ItemId":2222,"OrderDate":"12/12/2012"}]}}


Add those records to complex.json and reload the data into the complex_json table.

Now try the query:

SELECT DocId, User.Id, User.Orders.ItemId
FROM complex_json;

It works just fine and gives the result:

docid  id    itemid
ABC    1234  [6789,4352]
ABC    1235  null
ABC    1236  [1111,2222]

Any field not present will just return null, as Hive normally does even for non-JSON formats.

Note that we cannot query for User.ShippingAddress.PostalCode because we haven't put it on our Hive schema. You would have to revise the schema and then reissue the query.


/* ---[ A tool to automate creation of Hive JSON schemas ]--- */

One feature missing from the openx JSON SerDe is a tool to generate a schema from a JSON document. Creating a schema for a large complex, highly nested JSON document is quite tedious.

I've created a tool to automate this: https://github.com/midpeter444/hive-json-schema.

Sunday, March 3, 2013

Signing and Promoting your Clojure libraries on Clojars

Phil Hagelberg, the creator and primary maintainer of Leiningen, has been advocating that Clojurians sign their Clojure libraries for the releases repository in Clojars. By itself, this isn't sufficient to provide security to avoid malicious code from causing havoc with public code repositories, but it is a necessary first step. Phil has talked about his ideas on how to get to a more complete model of security in a couple of places:


/* ---[ Signing your Clojure libraries ]--- */

My first experience deploying a signed jar to Clojars was a little rocky, so I'm providing this how-to report to help others (including future me).

I have only done this on (Xubuntu) Linux, but I imagine it will work fairly similarly on Macs. Not sure about Windows, as I seem to have constant trouble getting Clojure and Windows to play nicely together. I have used GPG on Windows that comes with mysysgit, so that will probably work with these instructions as well, but I haven't tried it.


/* ---[ STEP 1: Generate GPG Keys ]--- */

Clojars security is based on PGP keys, so you need to a have a PGP public/private keyset. GnuPG (GPG) is the generally recommended tool for that.

If you already have GPG installed and can't remember if you've already created a keyset, try this first:

gpg --list-keys

If you see your name and email in the list, then you have. If not, generate them with:

gpg --gen-key

Accepting the defaults you are prompted with is fine. See this article for details on this step. When completed this will create your public key ring and secret/private key ring:

$ ls ~/.gnupg
pubring.gpg  pubring.gpg~  random_seed  secring.gpg  trustdb.gpg


/* ---[ STEP 2: Publish your public GPG key to a keyserver ]--- */

By publishing your public key, others can download it and verify that your signed library is in fact signed by you.

To publish your key you will need to get its ID.

$ gpg --list-keys
/home/midpeter444/.gnupg/pubring.gpg
------------------------------------
pub   2048R/5414B325 2012-11-12
uid                  Michael Peterson <myemail@fubar.com>

The 8 characters after the '/' on the "pub" row of your key is your key's ID. Now publish it:

$ gpg --send-key 5414B325

If you don't specify a key server it will choose the GnuPG keyserver. If you want to target a specify keyserver use the --keyserver option as shown here.


/* ---[ STEP 3: Add your GPG key to your Clojars account ]--- */

When you sign up for Clojars there is a section in your Profile to add two keys: 1) an SSH public key and a PGP public key. The SSH key is for secure transport of the library from your system to the Clojars repo via scp. It is not related to signing your jars.

Your library will be signed with your PGP private key that resides only on your system. That signature indicates that the owner of the private key (the one paired with the public key you just published) signed this code artifact. It allows someone else to know who signed it and whether the code artifact has been changed since it was signed and deployed.

By having your PGP public key on Clojars, you allow Clojars to verify that one of its members signed the artifact. This check happens when you promote your release to the release repo (more on that below).

Note: Clojars is not a keyserver, so putting it there will not allow others to verify the signature. That is why in step 2 we published it to a public keyserver.

To add your public key to Clojars you create an "ASCII-armored" version of the binary public key, which you generate with:

gpg --armor --export your@email.here code

Once you have it, what exactly do you paste into the Clojars text box? The BEGIN and END delimiter lines and everything in between, like so:

-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1.4.11 (GNU/Linux)

mQENBFChf/ABCAC/2nK75NwOsg7nkI5NNTCqBMk5DMX0JWu17EZoii/6vH88KlTm
0xeIHwv3leMZbtjqTNzFPfGh5xQo7zH+Y2CBPG8gq9QKv9aB587vuzwCtN/uaP6Z
mjQlafc5HK8gn5PMULWJC0V46g+y39g8bDSEZDInGFFWF7kCOXMcsJnNuoXWbajz
WwV8lcr56EKeenRS3lV4GWd/W+aSjCkaq1SM+9XP3qZYC9lOuaYfkzxfTsf5hpvG
wfTJVOaaPDtfhefgzrK6+znvMC1TsKMKU8bpX7u9WaHn9jD24UE6idzSn84uPuNK
5Jms4r7r6y+kfMSrWK0KUH+Gp0Bs+6kVu6S1ABEBAAG0J01pY2hhZWwgUGV0ZXJz
b24gPG1wZXRlcnNvbjJAZ21haWwuY29tPokBOAQTAQIAIgUCUKF/8AIbAwYLCQgH
AwIGFQgCCQoLBBYCAwECHgECF4AACgkQeBe9+DhXuBiGzAf/aC43wc/TrNSMeWkN
6X92YpPu8SYh1bcDOEm7FvBSWZg/NSf4VBNqP6TXjobIGfSX8hFGrgrkB/ZDMY6N
Ec9UxpnhVC2gOn9TZzOCNfbvN4SAcBWm9vfABEQxIcXsOXEGxLWsW3FSeK2fp5Iv
S19eQ6Z6N2jw/H6xLpd5Zrvw4vAROOVKiYNkQKkqU95hqJQz+9xPOBwDIIL2isQ2
qd0fgDryue7D31XJ/Qrwxa++I70ew4u3TqYboUAL6aTIAxSGmMlbk2CDvVusRUw5
lrN7qxWejq61Qlhx+l9xEEBcq5HflQZpFENn95xT6l6IiLiiEWT4Gju4EwZz+CUO
pe99QrkBDQRQoX/wAQgAq6SDCbXHh6GKFnb1as0zzlngwv6MiA5eaY+83qOgeXov
UVOZBQU8vBmVuF/3Pd5Q7asTOy+40sBYcuCwsMvtXPX0s7A0pdSn5A7DFelVRM5y
oQheASDCtlnp1xpL/8GTr/YuYlQSgC5zqcv23FatrKQ5ljPDV+tbe40T0HQ1491x
g+QPmnS4jofcOGBJ/AcAPAXU17zEiip/JmDOGfpvAf+igRNW5nyGfCkkrHeAaovR
tkqMMtq3YZBBrfgYIuGOZYzIz/lOCDyVb6QP2B/rn6ub5UeB0oYJa98uW7Zmx1vn
ZIPgtbDFRoAj2NV1JEAgmZABcYYQVVpRuvyEC+94FwARAQABiQEfBBgBAgAJBQJQ
oX/wAhsMAAoJEHgXvfg4V7gYfcMH/3hiqNPHlb9FxY4p8gIj6JWdj++CXXjRg4Re
4QWP/JvRH5v4z8DLstcJmezgerHyFqSb7ylo108qONW+x9Q1tNRe+ey9YOeg4581
tdXLMPaGjU0jz5aCKnKQR7LJjOTS4SPPU4dYURDUUkmKgU4tmbQVdkXyT45rCh6b
tB655w1aYSLbA93E3DKkdqoN1gCTlwzsiayLsu1kiWSUopOlPKcwLjyo1OpRC2ph
3T7RuF+whq/NQ8SYSz6GgWh8tSMt/SDpJ5/YOveyH7iAuwcL4pNgGYSjAPklSolp
UZwJPsLOqDSxnlc7RKwX9hsdDL7tybYAX2P7BOGpoNDeN1ZMIEA=
=Kyc/
-----END PGP PUBLIC KEY BLOCK-----


/* ---[ STEP 4: Prepare your project and its metadata ]--- */

With Clojars you can publish SNAPSHOTS or releases. The latter can be "promoted" if you meet all the criteria in your project.clj, which are:

  • you cannot have the word SNAPSHOT in your version
  • you should have your license filled in
  • you need to have the :scm section filled in
    • you can either do this manually, as in the example below
    • or lein in theory can automatically do this for you if you are using GitHub and its remote "ID" is origin (though I've had issues even in that case)

Here is an example project.clj:


 (defproject thornydev/go-lightly "0.4.0"
   :description "Clojure library to facilitate CSP concurrent programming based on Go concurrency constructs"
   :url "https://github.com/midpeter444/go-lightly"
   :license {:name "Eclipse Public License"
             :url "http://www.eclipse.org/legal/epl-v10.html"}
   :profiles {:dev {:dependencies [[criterium "0.3.1"]]}}
   :dependencies [[org.clojure/clojure "1.5.0"]]
   :scm {:name "git"
         :url "https://github.com/midpeter444/go-lightly"})


/* ---[ STEP 5: Commit your code ]--- */

Make sure you have committed all your changes into Git (or Hg, SVN or whatever SCM you are using). Tag the release if you are so inclined and (optional) push it to GitHub or your remote or central hosting server.


/* ---[ STEP 6: Deploy to Clojars ]--- */

From the top of your project directory, enter:

lein deploy clojars

In my case, my gpg-agent prompted me twice for my GPG passphrase and then the deploy happened.

When you do this lein will create a pom and a jar and upload those to Clojars. That pom.xml should include SCM information that looks like this:


  <scm>
    <tag>12f653361a88c4df14</tag>
    <url>https://github.com/midpeter444/go-lightly</url>
  </scm>

The tag there should be the SHA1 of the last commit (in the case of Git). Note: Don't confuse it with a "tag" that you create with "git tag".

If the deploy was successful, your jar should be signed and (possibly) ready for Promotion.


/* ---[ STEP 7: Check whether your jar was signed ]--- */

Create a new lein project and make your deployed library one of its dependencies. Then in that new project run:

$ lein deps :verify
:signed [criterium "0.3.1"]
:unsigned [enlive "1.0.1"]
:signed [org.clojure/tools.macro "0.1.1"]
:signed [org.clojure/clojure "1.5.0"]
:bad-signature [thornydev/go-lightly "0.4.0"]

You see that some are signed and some are not. Obviously, you want yours to say :signed. If it has unsigned then you are probably either using Lein 1 or you didn't generate your GPG keys. If it has :bad-signature then something got corrupted on the Clojars server. In my case above, I promoted and tried to redeploy, which uncovered a bug in lein/clojars that caused some files to get overwritten when they shouldn't. This issue should be fixed soon. If you do have that problem, delete your local copy from your ~/.m2 directory and contact someone on the #leiningen IRC channel.


/* ---[ Optional STEP 8: Promote to release status ]--- */

If you are eligible to promote to release status, you will see a "Promote" button on your Clojars page. If you are not, you may be missing SCM information, which is what happened to me recently.

Note that once you promote you can no longer deploy to that version again, so make sure you're ready to make it immutable. After that, you can only add new versions.

Sunday, January 13, 2013

Go Concurrency Constructs in Clojure, part 4: idioms and tradeoffs

The Go approach to concurrent software can be characterized as: Don't communicate by sharing memory, share memory by communicating.... You use the channel to pass the data back and forth between the Go routines and make your concurrent program operate that way.

--Rob Pike, Google IO 2012 conference talk
Programmers know the benefits of everything and the tradeoffs of nothing.

--Rich Hickey, Strangeloop 2011 talk "Simple Made Easy"


/* ---[ Functional Clojure Idioms ]--- */

In the preface to Eloquent Ruby, Russ Olsen relates a story that after teaching a Ruby class one of his students complained that his Ruby programs tended to end up looking like his Java programs. I remember that same experience when I first learned Ruby in the early 2000s. In fact, I set up conventions for myself in Ruby to try to "force" it to be more like Java. I hadn't fully grokked that changing languages does not mean just learning the syntax and libraries. It means adopting the idioms, the approaches and even the constraints that the designer put into the language and that have arisen in the language community. It often means changing the way you think. That is certainly true of Clojure, perhaps more than any language I've ever learned. (Side note: I haven't learned Haskell yet!)

Go is intended primarily to be systems-programming language, with a strong focus on writing concurrent server programs. While it does include some more "modern" functional features, such as closures and first class functions, it is not a functional programming language.

These blog posts and the go-lightly library are my attempt to think about how to adopt a Go-like CSP concurrency programming style into Clojure. But we should also think about when to adopt this style of programming. I don't have an answer yet and I'm writing this library to explore this area.

The Go-channel model is a message passing model, which you could view as a poor-man's Actor model, something Rich Hickey considered for the Clojure language and decided to leave out. He outlines those reasons in the clojure.org/state page (see the "Message Passing and Actors" section in particular).

On the blog lead-in above, I quote Pike saying that Go's model is to share memory by communicating, rather than the other way around. Hickey argues that the message-passing model is a complex model. Remember that "complex", in the Clojure community, is an objective measure of how entwined things are. Sharing memory by communicating is more complex as you have to coordinate entities, particularly if you have blocking waits. If one depends directly on the other, you have an entangled system. Coordinating multiple entities can be difficult and with blocking operations can lead to deadlocks. If you use timeouts to overcome potential deadlocks, then you have to add special logic to your code to deal with it. Sharing memory with immutable values or STM-protected values is often a simpler, less complected, model.

Go (synchronous) channels are for synchronizing threads or routines. When you need to synchronize in other languages you have constructs like CountDownLatches, CyclicBarriers, waiting on a future, a join in a fork-join model or, the lowest level, mutexes and semaphores. The synchronous channel provides an easier model that also allows message passing. But remember that easy is not necessarily simple [footnote 1] and consider the tradeoffs.


/* ---[ Channels as Queues ]--- */

Go buffered channels, on the other hand, are not synchronous communication tools. They are queues for asynchronous workflows. By decoupling producer(s) and consumer(s), they are less complected. Hickey, in his Simple Made Easy talk, has a table listing paired concepts where one is more complex and the other simple. On his chart, Actors are juxtaposed with queues: Actors are complex, queues are simple. And in the 2012 Clojure conj keynote, Hickey stated that queues have been underemphasized in the Clojure community so far.

Thus, as far as channels go, the asynchronous buffered ones are more idiomatic in Clojure than synchronous channels. In fact, an async concurrent queue is used in the Clojure Programming book's webcrawler example. For contrast, I implemented this webcrawler example using go-lightly.

On the other hand, from what I've seen, synchronous channels are very idiomatic in Go, and perhaps even preferred over buffered channels. That is the impression I've gotten from watching Pike's talks and reading a few threads on the golang mailing list. For example, see this thread where one participant says:

Go channels can be asynchronous, but most of the time that's not
what you want. When communicating between goroutines running on the
same machine a synchronous send/recv improves program flow. Synchronous
channels have a lot of advantages by making program flow predictable
and easier to think about.

I'll leave it there as an open question deserving careful thought.


/* ---[ Making channels more idiomatic ]--- */

As I've been doing various example programs with go-lightly, I've noticed that the code structure can be more imperative than functional, in part because channels are not composable data structures. You can't pass a channel to map, reduce or filter, since it does not implement the ISeq interface.

To remedy this, I've added four functions to go-lightly to allow you to treat it like a seq when retrieving from it.

The first two functions convert the current values on a channel to a seq or a vector without removing them from the channel. The latter two functions remove or drain the channel either immediately (non-lazy) or as you read from it in a lazy fashion.


(channel->seq chan)
"Takes a snapshot of all values on a channel without removing the values from the channel. Returns a (non-lazy) seq of the values.


(channel->vec chan)
"Takes a snapshot of all values on a channel without removing the values from the channel. Returns a vector of the values."


(drain chan)
"Removes all the values on a channel and returns them as a non-lazy seq."


(lazy-drain chan)
"Lazily removes values from a channel. Returns a Cons lazy-seq until it reaches the end of the channel (as determined by getting a nil value when asking for the next value on the channel)."


All the sequences will end once a nil value is pulled off the queue, which represents the end of the queue. Since the lazy-drain function is lazy if something else added to the queue before the end of the queue is reached, it will read that additional value, where the non-lazy drain method will not.

A REPL session will illustrate how these work.

First let's look at channel->seq using a buffered channel:


  ;; define a channel with capacity of 7
  user=> (def ch (go/channel 7))
  #'user/ch
  user=> (dotimes [i 6] (.put ch i))
  nil
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; grab the values into a non-lazy seq
  user=> (def cseq (go/channel->seq ch))
  #'user/cseq
  user=> cseq
  (0 1 2 3 4 5)
  user=> (type cseq)
  clojure.lang.ArraySeq

  ;; the values have not been removed from the channel
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; if a value is removed from the channel the seq is not affected
  user=> (.take ch)
  0
  user=> ch
  #<LinkedBlockingQueue [1, 2, 3, 4, 5]>
  user=> cseq
  (0 1 2 3 4 5)

channel->vec behaves the same way except it returns a vector, not a seq.

Next let's look at the drain functions using a buffered channel:


  user=> (def ch (go/channel 7))
  #'user/ch
  user=> (dotimes [i 6] (.put ch i))
  nil
  user=> ch
  #<LinkedBlockingQueue [0, 1, 2, 3, 4, 5]>

  ;; calling drain returns a seq of all the values on the
  ;; channel and removes them
  user=> (def dseq (go/drain ch))
  #'user/dseq
  user=> (type dseq)
  clojure.lang.IteratorSeq
  user=> dseq
  (0 1 2 3 4 5)
  user=> ch
  #<LinkedBlockingQueue []>

  ;; add more elements to the queue; the seq is not affected
  user=> (dotimes [i 6] (.put ch (+ 100 i)))
  nil
  user=> ch
  #<LinkedBlockingQueue [100, 101, 102, 103, 104, 105]>
  user=> dseq
  (0 1 2 3 4 5)

  ;; now let's lazily drain the queue into a lazy-seq Cons
  user=> (def zseq (go/lazy-drain ch))
  #'user/zseq
  user=> (type zseq)
  clojure.lang.Cons

  ;; realize the first three elements - takes only those
  ;; off the channel
  user=> (take 3 zseq)
  (100 101 102)
  user=> ch
  #<LinkedBlockingQueue [103, 104, 105]>

  ;; take more than are on the channel - get only what's available
  user=> (take 100 zseq)
  (100 101 102 103 104 105)
  ;; the channel is now empty
  user=> ch
  #<LinkedBlockingQueue []>

  ;; what if we try to take/read them again? They are still
  ;; in the lazy-seq since it caches the results
  user=> (take 100 zseq)
  (100 101 102 103 104 105)

  ;; we can use higher order functions - composability!
  user=> (map str (filter odd? zseq))
  ("101" "103" "105")



These functions also work with synchronous channel, but are not as useful. In particular lazy-seq faces a race condition with producers that try to transfer multiple consecutive values as shown below:


  ;; create a synchronous channel
  user=> (def c (go/channel))
  #'user/c

  ;; queue up 6 values to be put onto the queue but
  ;; only one can go on at a time waiting for a consumer
  user=> (go/go (dotimes [i 6] (.transfer c i)))
  #<core$future_call$reify__6110@5d47522a: :pending>
  user=> c

  ;; channel->vec and channel->seq will grab one value since
  ;; a producer is waiting for a consumer
  #<LinkedTransferQueue [0]>
  user=> (go/channel->vec c)
  [0]
  user=> c
  #<LinkedTransferQueue [0]>

  ;; drain also takes only the first value and also removes it
  ;; from the channel, allowing the next val to be put on the channel
  user=> (go/drain c)
  (0)
  user=> c
  #<LinkedTransferQueue [1]>

  ;; lazy-drain looks like it works!
  user=> (take 2 (go/lazy-drain c))
  (1 2)
  user=> c
  #<LinkedTransferQueue [3]>

  ;; but it has a race condition with the producer, so may
  ;; not get everything we "queued" up to transfer
  user=> (go/lazy-drain c)
  (3 4)
  user=> c
  #<LinkedTransferQueue [5]>


/* ---[ Next ]--- */

I've now created a go-lightly wiki with fairly extensive documentation and I've implemented a number of example applications using go-lightly.

A couple of things you may want to look into if you find this topic interesting:

  • I've added formal abstractions for the Channel types in go-lightly. Channel, BufferedChannel and TimeoutChannel all implement the GoChannel protocol.
  • As mentioned above, I have done a go-lightly centric implementation of a simple web crawler app based on the example at the end of Ch. 4 from the O'Reilly Clojure Programming book. This will provide a good contrast between the two concurrency approaches.
  • I have added the ability to preferentially read from one or more channels in a select or selectf.
  • I implemented Pike's Chinese Whispers example in go-lightly to see how many "Go routines" could be spawned in Clojure compared to Go. This is certainly an area where the JVM is less powerful than Go.


/* ---[ Resources and Notes ]--- */


[1] If you've spent much time in the Clojure community, you know I'm referring to the distinction that Hickey drew between the concepts of easy, a subjective concept, and simple, an objective one in his Simple Made Easy presentation. If you haven't watched it, well, listen to Bodil. (Jump back)

Blog entries in this series:

The Clojure go-lightly library on GitHub: https://github.com/midpeter444/go-lightly

Saturday, January 12, 2013

Go Concurrency Constructs in Clojure, part 3: why go-lightly?

There's a legendary example called the concurrent prime sieve, which is kind of an amazing thing. It was the first truly beautiful concurrent program I think I ever saw.

--Rob Pike, Google IO 2012 conference talk


/* ---[ Why go-lightly? ]--- */

In part 1 and part 2 of this blog series, I introduced the basics of Go channels, Go routines and the Go select statement. I then walked through initial implementations of these ideas in the go-lightly library and how to use the lamina channel and facilities to do Go-style CSP concurrent programming.

If the lamina library, which is 2+ years old now (thus reasonably mature and stable) and under active development, can be used for this, why am I proposing a new library? Well, I might have built one anyway just to get familiar with CSP style programming and improve my Clojure skills, but ultimately, I do think there is a good justification for us to consider a new library focused just around this.

The lamina library is fundamentally focused an asynchronous event-driven programming. Since dealing with callbacks can get messy and is hard to structure in a functional way, the core construct and central metaphor of Zach Tellman's approach to async programming is a channel that is used for putting and pulling events. Since a key focus of async event-driven programming is to avoid blocking, there are very few blocking operations in the lamina library. One case where it is provided is that you can choose to wait on pulling a value out of a channel. This is the part we've seen in use to emulate Go channels.

However, the primary use case for lamina channels is an event queue, which means you want it to be unbounded and non-blocking, especially for events being put onto the queue. Thus, lamina uses a Java ConcurrentLinkedQueue underneath.

Go channels, however, come in two flavors: bounded, blocking queues of size 0 (every put has to have a corresponding take) and bounded, asynchronous queues of a size you specify in the make function. The lamina channel really maps to neither, though in some scenarios it can be used for async queues or blocking queues where you need to block on read (but not write).

As I discussed in the first blog entry, Java's util.concurrent library already provides these Go channel types and even more variations on them. The bounded, blocking queue maps to a SynchronousQueue or a TransferQueue (if you only use the transfer and take methods). The bounded, asynchronous queue maps to LinkedBlockingQueue.

Thus, go-lightly proposes to wrap these Java concurrent queues, specifically facilitating a Go-style CSP concurrency semantics.


/* ---[ Why bounded blocking queues? ]--- */

So what is a use case where I really need a bounded blocking queue?

First from here on out I will use the term channel or Go channel to refer to a blocking queue of size 0 and buffered channel to refer to a non-blocking queue of arbitrary size: this is the Go terminology.

Rephrasing the question - when would I need a channel and not a buffered channel? With a buffered channel you "fire-and-forget" and let some other thread pluck it off the buffered channel when it's ready.

A channel, on the other hand, is a synchronization mechanism between threads/routines similar to a CountDownLatch, CyclicBarrier or join of a fork-join model, except you not only synchronize threads, but pass messages between them, so it is synchronizing communication tool.


/* ---[ Beautiful concurrency ]--- */

The golang site provides an example a concurrent prime sieve algorithm that, as implemented, requires blocking channels. If you were to use a lamina channel or buffered channel you'd potentially have some threads running way ahead of the others unnecessarily consuming memory and wasting CPU cycles.

This is the "first truly beautiful concurrent program" Pike referred to in his Google IO 2012 talk.

Let's look at the Go implementation from the Golang website first:

The Generate and Filter functions absolutely need to synchronize - when they push data onto a channel, they need to wait until the consumer (a chained filter function or main) is ready to pull it off.

Here is a Clojure version using go-lightly:

Happily, the implementations are pretty much the same line-for-line.


/* ---[ Next ]--- */

In the next blog entry, I will contrast the Go CSP concurrency model to the Clojure concurrency model and add some functions to allow channels to interoperate with the Clojure seq abstraction.


/* ---[ Resources ]--- */

Both of the prime sieve examples are available in the GitHub go-lightly repo:

Blog entries in this series:

Saturday, January 5, 2013

Go Concurrency Constructs in Clojure, part 2: select

"The select statement is a key part of why concurrency is built into Go as features of the language, rather than just a library. It's hard to do control structures that depend on libraries."

-Rob Pike, 2012 Google IO Conference

In the first blog entry of this series, I introduced some simple examples of the CSP (Communicating Sequential Processes) model of concurrency that have been built into the Go language. I'm blogging my investigation of how we might leverage this style of concurrent programming in Clojure.

The key benefit of the CSP approach is that you can use normal sequential semantics and control flows that are easy to reason about while building concurrent flows and processes. Channels are used to communicate and synchronize processes to bring control and deterministic behavior to an otherwise non-deterministic concurrent environment. We can do this without locks or other low-level constructs that are hard to reason about. The CSP constructs are built on top of those low-level primitives (or at least compare-and-swap mechanisms), but they are hidden from view from the application developer.


/* ---[ A construct to wait for the next available channel ]--- */

Go comes with a ready-made control structure called select. It provides a shorthand way to specify how to deal with multiple channels, as well as allow for timeouts and non-blocking behavior (via a "default" clause). It looks like a switch/case statement in C-based languages, but is different in that all paths involving a channel are evaluated, rather than just picking the first one that is ready.

Let's look at an example (adapted from Pike's 2012 Google IO talk):


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 }

This select wraps two channels. It evaluates both channels and there are four possible scenarios:

  1. c1 is ready to give a message, but c2 is not. The message from c1 is read into the variable v1 and the code clause for that first case is executed.
  2. c2 is ready to give a message, but c1 is not. v2 then is assigned to the value read from c2 and its code clause is executed.
  3. Both c1 and c2 are ready to give a message. One of them is randomly chosen to execute and the other does not execute. Note this means that you cannot depend on the order your clauses will be executed in.
  4. Neither c1 nor c2 are ready to give a message. The select will block until the first one is ready, at which point it will be read from the channel and execute the corresponding code clause.

Select statements can also have a default to make it non-blocking:


 select {
 case v1 := <-c1:
     fmt.Printf("received %v from c1\n", v1)
 case v2 := <-c2:
     fmt.Printf("received %v from c2\n", v2)
 default:
     fmt.Println("no channel was ready to communicate")
 }

If neither channel is ready, the select executes the default clause and returns immediately.

Finally, select statements can also have a timeout:


 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-time.After(1 * time.Second):
         fmt.Println("You're too slow!")
     }
 }

In this example, the select is wrapped in an infinite loop, which will stop the first time any one round takes longer than 1 second to read from either channel. But we can also set a timeout on the loop as a whole:


 timeout := time.After(1 * time.Second)
 for {
     select {
     case v1 := <-c1:
         fmt.Printf("received %v from c1\n", v1)
     case v2 := <-c2:
         fmt.Printf("received %v from c2\n", v2)
     case <-timeout:
         fmt.Println("Time's up!")
     }
 }

Now the loop will always cease after 1 second and in that one second it will read as many times as possible from either channel.

Here is an example of using selects with timeouts in a Go program:


/* ---[ Implementing select in Clojure ]--- */

Let's evaluate some of the ways we could emulate or implement the behavior of select in Clojure. While Go does have closures, treats functions as first class entities and deemphasizes object-orientation and inheritance, Go is not a functional language. So how should something like select be done in Clojure? What is the essence of what it accomplishes?

Let's first turn to the Racket language, a Lisp that is a descendant of Scheme. It has Events in the language. I am not deeply knowledgable about Racket, but from the research I've done the analog of select in Racket is sync. The sync function takes one or more "synchronizable events" and blocks until the first one is ready and returns that result:


 (let ((msg (sync evt1 evt2 evt3)))
    ;; do something with the first message result here
   )

As with Go's select, Racket's sync will choose to read from one of the events at random if more than one is ready.

Notice that the Racket version does not take a code block to execute for each event. In functional programming, it is preferable and more natural to return a value from an operation. Go's select is truly a control structure (in the C language sense of the word) - it does not return a value.

So let's implement Racket's sync in Clojure.

In order to implement select/sync in Clojure using the Java Queue libraries we used in the previous blog entry, we will need to able to check whether more than one of the queues has a value ready without blocking. That is why I selected the TransferQueue over the SynchronousQueue.

Next we have to decide what to name it. sync is already taken in clojure.core and has a specific and important enough meaning in Clojure that is best avoided. select is also used -- it is a function clojure.set namespace -- but since it is not in clojure.core, I'll go with it in my go-lightly namespace.

My initial implementation to get started is a simple one - it will check all the channels to see if any are ready and if not, do a short sleep. To do the check, it uses the .peek method of TransferQueue, since it neither blocks nor throws an exception if the queue is empty.

You pass select one or more channels and it immediately filters for those that already have a ready value. If there are any it picks one of those ready ones at random, dequeues the value and returns it. Only the one value is dequeued, so the other channels remain untouched.

If none are ready, it will "probe" the channels between short sleeps to get the first value it can find. This is an unsophisticated implementation, but it works for simple uses. (I'll provide a usage example after we add timeouts and "defaults" next.)


/* ---[ Adding "default" and timeouts to Clojure select ]--- */

The default clause in Go's select statement is a short circuit to not block if no channels are ready. Since Clojure's select is not a control structure, the most natural choice is to add another function, which I've called select-nowait.

As before it takes one or more channels (as a varargs list) and an optional sentinel keyword value. If no channels are ready, select-wait will return the sentinel keyword (if provided) or nil.


 user=> (select-nowait ch1 ch2 ch3 :bupkis)
 :bupkis

For timeouts, the Go example above shows that they come in two flavors: a timeout per round (timer starts each time you call select) or a timeout for a "conversation" that could involve multiple rounds of selecting the next value.

Let's take these one a time, as they will have different solutions in my implementation. For a timeout-per-select call, I've created a select-timeout function that takes a timeout (in milliseconds) as the first argument.


 ;; returns a value from one of the channels if it can
 ;; be read within 1 sec.  Otherwise it times out and
 ;; returns :go-lightly/timeout
 user=> (select-timeout 1000 ch1 ch2 ch3)
 :go-lightly/timeout

For an overall timeout, I provide two options.

First, following the pattern in Alexey Kachayev's example of doing this with the lamina library - we build a channel that will have a timeout sentinel value once the timer goes off. Use the go-lightly timeout-channel factory fn and then pass that timeout channel to the select function.

In order for the timeout-channel to be effective, you have to be continuously calling select until you hit the timeout. Also the current implementation of select doesn't preferentially look at the timeout channel first and select that over other channels if it is ready, but I'll be fixing that in later in the series.

You can also pass a timeout-channel into select-timeout if you want both types of timers running.

Second, I've added a general purpose with-timeout macro to the go-lightly.core library that wraps any arbitrary set of statements in a timeout.

Go here if you want to see the full implementation of these timeout methods.

All of these options are shown in this Clojure go-lightly example implementation of the Go "boring" select example:

Note: the channels here are no longer raw LinkedTransferQueues - they are go-lightly GoChannel type entities. See the go-lightly wiki for a detailed explanation.


/* ---[ Emulating Go's select in lamina ]--- */

lamina's analog to select is its join operation, which basically routes the output of multiple lamina channels into a single channel:


 user=> (use 'lamina.core)
 nil
 user=> (def ch1 (channel))
 #'user/ch1
 user=> (def ch2 (channel))
 #'user/ch2
 user=> (def ch3 (channel))
 #'user/ch3
 user=> (join ch1 ch3)
 true
 user=> (join ch2 ch3)
 true
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== […]]
 user=> (enqueue ch1 :one)
 :lamina/enqueued
 user=> (enqueue ch2 :two :three)
 :lamina/enqueued
 user=> [ch1 ch2 ch3]
 [<== [ … ] <== [ … ] <== [:one :two :three …]]
 

You can then read from the downstream channel:


 user=> @(read-channel ch3)
 :one
 user=> @(read-channel ch3)
 :two

To create a whole-conversation timeout, you can call the periodically fn that invokes your fn every 'period' milliseconds and returns the value. This was the inspiration for go-lightly's timeout-channel.

To create a per-round timeout, you can use either the read-channel* macro or the channel->lazy-seq function, both of which take a per-read timeout.

This program that demonstrates these options (and a few others) using lamina (with some helper functions from go-lightly):


/* ---[ Implementing Go's select in Clojure ]--- */

So we can provide Racket's sync functionality in Clojure either by implementing it ourselves or using lamina, but it is not as powerful as Go's select. What if you need to know not only the next value on the channels, but which channel it was read from? In that case, providing a function to execute per channel is a nice model. But to be more or less functional, the select statement still needs to return a value.

Let's hit an important point here: as I quoted at the start of this post, Piked has said that "it's hard to do control structures that depend on libraries". This is true in some languages, but not all - especially not in Lisps. You can do control structures with macros or sometimes just with functions and this is one of the key advantages of Lisp languages.

In the go-lightly library, I've implemented this as selectf and it turns out I didn't need a macro.

Here's an example of using go-lightly's selectf from the sleeping-barbers example app in the go-lightly-examples project:


  (defn barber-shop [clients-ch]
    (let [barber-ch (channel)]
      (loop [shop-state {:free-barbers (init-barber-vector)
                         :waiting-clients []}]
        (-> (selectf
             clients-ch #(client-walked-in % barber-ch shop-state)
             barber-ch  #(barber-available % barber-ch shop-state))
            (recur)))))

selectf takes pairs of arguments where the first member of the pair is a channel (or the :default keyword) and the second member of the pair is a function that takes one argument - the value read from that channel. (A function paired with :default takes no arguments.)

The return value of selectf is whatever the fn you provide returns. In the example above, I pass this value to the recur form so that I can reset the shop-state local var without having to use an atom to manage state changes.

And here is the implementation of selectf:


 (defn selectf
   "Control structure variable arity fn. Must be an even number of arguments where
   the first is either a GoChannel to read from or the keyword :default. The second
   arg is a function to call if the channel is read from.  Handler fns paired with
   channels should accept one argument - the value read from the channel.  The
   handler function paired with :default takes no args.  If no :default clause is
   provided, it blocks until a value is read from a channel (which could include
   a TimeoutChannel). Returns the value returned by the handler fn."
   [& args]
   (binding [*choose-fn* choose-tuple]
     (let [chfnmap (apply array-map args)
           [keywords chans] (partition-bifurcate
                             keyword?
                             (reduce #(conj % %2) [] (keys chfnmap)))
           choice (doselect chans nil (first keywords))]

       ;; invoke the associated fn
       (if choice
         ((chfnmap (nth choice 0)) (nth choice 1))
         ((chfnmap (first keywords)))))))

I won't give a full explanation of this implementation and all its helper functions, but notice this piece:


  (let [chfnmap (apply array-map args)
        ...
        ])

That's all that is required to turn the argument pairs into a control structure. It creates a map of channels to fns and once you have a map in Clojure, programming is straightforward.


/* ---[ Next ]--- */

In the next entry we'll implement some more interesting CSP examples in Go and Clojure and think about the pros and cons of using lamina vs. go-lightly.


/* ---[ Resources ]--- */

All of the code in this blog series, including the Go and lamina example code, is in the go-lightly project on GitHub.

Lamina library: https://github.com/ztellman/lamina

The Go examples are from Rob Pike's talk Google I/O 2012 - Go Concurrency Patterns

Alexey Kachayev wrote down the Go code that Pike used in the 2012 Google IO presentation, which otherwise doesn't seem to have been made available. Alexey published them as gists. They won't compile out of the box, so I've been modifying them, but wanted to link to his gists: https://gist.github.com/3124594.

Alexey also then brainstormed on ways to implement these examples in Clojure using the lamina library. Those gists are at: https://gist.github.com/3146759

Links to this blog series: