Bitpub was a project I started about two months ago. One of the main goals of the project is to build something that utilizes RabbitMQ. In essence, bitpub fetches Bitcoin ticker data from various sources, and publishes them to a RabbitMQ’s topic exchange. Please read the README of Bitpub on how to run the publisher and consumer.

Polling

The project uses core.async to poll the sources for ticker data. The supported sources are CampBx, Bitstamp, Vircurex, BTC-e, and BTC China. The logic of the polling, shown below, will poll a given ticker URL to GET the market data. It will place the data returned from the GET request into the out channel. If no process is consuming the out channel, it will wait for a specified time and poll the ticker URL again. This ensures the data that a consumer receives are kept somewhat up-to-date. There is a timeout for the initial GET request. If the GET request timeout has elapsed, it will retry again. All of these polling logics are performed inside a go block, which is similar to goroutines in Go. As a result, the polling is done in a thread pool. So, multiple feeds can be constructed, and poll concurrently.

(defn create-ticker-feed
  [ticker-url & {:keys [get-timeout async-put-timeout park-time-fn] :as params}]
  (let [out (chan)]
    (go-loop []
      (let [time-out (as/timeout get-timeout)
            ticker-poll (http-get ticker-url)
            [v c] (alts! [time-out ticker-poll])]
        (cond
          (= c ticker-poll) (let [[_ ch] (alts! [[out v] (as/timeout async-put-timeout)])]
                              ;; If no-one is consuming the out channel, polls
                              ;; the url again after some time.
                              (if (= ch out)
                                (do
                                  ;; park for a little bit before polling again
                                  (<! (as/timeout (park-time-fn)))
                                  (recur))
                                (recur)))
          ;; HTTP GET timeout
          (= c time-out) (do (close! c)
                             (recur)))))
    out))

There are a few options for create-ticket-feed function. They are all related to timings, because different sources have different timing restrictions when polling. The timing restrictions are set in place, in order to prevent abuse. If the go block polls a source too frequently, it may get banned by the source. The options are:

:get-timeout
The time it waits for a reply from the GET request before retrying (in ms).
:async-put-timeout
The time it waits for a consumer to consume the value (in ms).
:park-time-fn
A function that returns an integer (in ms). The integer will be used as the park time before the go block re-polls the ticker url again. This is dictated by the source’s timing restriction.

To create a feed from, say, CampBx.

(create-ticker-feed
  "http://campbx.com/api/xticker.php"
  ;; GET request timed out after 30s
  :get-timeout 30000
  ;; If no process is consuming the out channel, re-poll
  :async-put-timeout 10000
  ;; Wait at least 1s between each poll
  ;; rand-int is to introduce some variance
  :park-time-fn #(+ 1000 (rand-int 1500)))

Publishing feeds

The return value for create-ticker-feed is a channel. From that channel, a consumer can acquire the market data and then perform whatever action it needs to perform. In the publisher’s case, whenever it receives any market data, it will publish the data out to a topic exchange of a RabbitMQ instance. Actually, the publisher I implemented does a little bit more. All the data that it receives from the channel get passed through a function (using mapv<). The function is used to clean up the market data. One example for such function is to transform all the string keys into Clojure keywords.

Subscribing to feeds

As described earilier, the data will be published to a RabbitMQ’s topic exchange. Each feed will have the routing key of the form ticker.$feedname. So, to subscribe to the CampBx feed, you will need to subscribe to ticker.campbx. Moreover, if you want to subscribe to all feeds, then you will need to subscribe to ticker.#. Bitpub comes a basic consumer the subscribe to all feeds, and dump all the data out to STDOUT.

Final words

One of the other reasons why I started bitpub, besides learning about RabbitMQ, was to build a notification system for Bitcoin prices at different exchanges. So, users can get notifications when the prices of Bitcoin rise above or fall below some thresholds defined by the users. Another usage of bitpub is to allow trading bot to get the market data and make decision on how to trade.

Unfortunately, life and work get in the way, so I haven’t started on the rest of the components of the notification system. And, that’s why I haven’t written anything about bitpub until months later. Anyway, try it and play around with it.