Using 0MQ for Clojure and Ruby Interop

Now that you have ZeroMQ working with Clojure, some introductory examples are in order. The demonstrations below show how simple it is to bridge the gap between programming languages by using ZeroMQ, focusing on connecting Clojure and Ruby.

##Pub/Sub Pattern

Our first example is the pub/sub pattern. Our code will broadcast messages from Ruby which our Clojure process can subscribe to. One interesting capability of 0MQ is that we can subscribe to “channels” — that is, we’ll only receive messages which start with a given query.1

#!/usr/bin/env ruby -wKU
# pub.rb
require 'rubygems'
require 'zmq'

zmq = ZMQ::Context.new
socket = zmq.socket(ZMQ::PUB)
socket.bind "tcp://127.0.0.1:5555"

# A REPL of sorts.  Messages entered here
# will get sent to our Clojure process.
while true
  '> '.display
  
  input = gets
  break unless input
  
  input.each do |msg|
    socket.send(msg)
  end
end

exit 0
; src/my-project/sub.clj
(ns my-project.sub
  (:use [org.zeromq.clojure :as zmq]))

(defn- string-to-bytes [s] (.getBytes s))
(defn- bytes-to-string [b] (String. b))
(defn- on-thread [f]
  (doto (Thread. #^Runnable f) 
    (.start)))

(defn launch-subscriber [query]
  (on-thread
   #((let [ctx (zmq/make-context 1)
	   socket (zmq/make-socket ctx zmq/+sub+)]
       (zmq/connect socket "tcp://127.0.0.1:5555")
       ; Note the dot-syntax! clojure-zmq is a bit behind
       ; on the Java API, so you have to fall back to straight
       ; Java interop to subscribe to a pub "channel".
       (.subscribe socket (string-to-bytes query))
       (while true
	 (println (bytes-to-string (zmq/recv socket))))))))

; Subscribing to the "foo" channel.  We'll only see messages prefixed with foo.
(launch-subscriber "foo")

After launching the Clojure process, hop into the shell:2

$ ruby pub.rb
> Hello There
# No entry in the Clojure output
> foo Hello There
# prints "foo Hello There" in the Clojure output

##Request/Response Pattern The request/response technique allows us to delegate work from our master program to a worker process. In this particular example, we’re sending lists of values from our Ruby process to our Clojure process, which adds them and sends back the result.

#!/usr/bin/env ruby -wKU
# req.rb

require 'rubygems'
require 'zmq'

zmq = ZMQ::Context.new
socket = zmq.socket(ZMQ::REQ)
socket.bind "tcp://127.0.0.1:5556"

while true
  '> '.display
  
  input = gets
  break unless input
  
  input.each do |msg|
    socket.send(msg)
  end
  puts socket.recv
end

exit 0
; src/my-project/rep.clj
(ns my-project.rep
  (:use [org.zeromq.clojure :as zmq])
  (:use [clojure.contrib str-utils]))

(defn- string-to-bytes [s] (.getBytes s))
(defn- bytes-to-string [b] (String. b))
(defn- on-thread [f]
  (doto (Thread. #^Runnable f) 
    (.start)))

(defn handler [request]
  (let [request (bytes-to-string request)
	values (map
		#(Integer/parseInt %)
		(re-seq #"\d+" request))
	output (str (apply + values))]
    (println (apply
	      str
	      (str-join " + " values)
	      " = "
	      output))
    output))

(defn make-adder []
  (on-thread
   #((let [ctx (zmq/make-context 1)
	   socket (zmq/make-socket ctx zmq/+rep+)]
       (zmq/connect socket "tcp://127.0.0.1:5556")
       (while true
	 (let [request (zmq/recv socket)
	       result (handler request)]
	   (zmq/send- socket (string-to-bytes result))))))))

(make-adder)

Fire up the Clojure process, and hop back into the shell:

$ ruby req.rb
> 2 2
4
# The Clojure process will print "2 + 2 = 4"
> 8 8
16
# The Clojure process will print "8 + 8 = 16"

You can find more ZeroMQ patterns on the official cookbook page.

  1. This could be useful for using 0MQ for method dispatch, e.g. this socket will accept “add” messages and sum the terms in a message, whereas another socket will accept “log” messages and log the message to file. 

  2. If you launched the process through lein and swank-clojure, your output will appear in the console which started lein swank

blog comments powered by Disqus