Introducing bitpub
Bitpub was a project I started about two months ago. One of the main goals of the project is to build something that utilizes RabbitMQ. In essence, bitpub fetches Bitcoin ticker data from various sources, and publishes them to a RabbitMQ’s topic exchange. Please read the README of Bitpub on how to run the publisher and consumer.
Polling
The project uses core.async to poll the
sources for ticker data. The supported sources are CampBx,
Bitstamp, Vircurex,
BTC-e, and BTC China. The logic
of the polling, shown below, will poll a given ticker URL to GET
the market
data. It will place the data returned from the GET
request into the out
channel. If no process is consuming the out
channel, it will wait for a
specified time and poll the ticker URL again. This ensures the data that a
consumer receives are kept somewhat up-to-date. There is a timeout for the
initial GET
request. If the GET
request timeout has elapsed, it will retry
again. All of these polling logics are performed inside a go
block, which is
similar to
goroutines in Go. As
a result, the polling is done in a thread pool. So, multiple feeds can be
constructed, and poll concurrently.
(defn create-ticker-feed
[ticker-url & {:keys [get-timeout async-put-timeout park-time-fn] :as params}]
(let [out (chan)]
(go-loop []
(let [time-out (as/timeout get-timeout)
ticker-poll (http-get ticker-url)
[v c] (alts! [time-out ticker-poll])]
(cond
(= c ticker-poll) (let [[_ ch] (alts! [[out v] (as/timeout async-put-timeout)])]
;; If no-one is consuming the out channel, polls
;; the url again after some time.
(if (= ch out)
(do
;; park for a little bit before polling again
(<! (as/timeout (park-time-fn)))
(recur))
(recur)))
;; HTTP GET timeout
(= c time-out) (do (close! c)
(recur)))))
out))
There are a few options for create-ticket-feed
function. They are all related
to timings, because different sources have different timing restrictions when
polling. The timing restrictions are set in place, in order to prevent abuse. If
the go
block polls a source too frequently, it may get banned by the source.
The options are:
:get-timeout
- The time it waits for a reply from the GET request before retrying (in ms).
:async-put-timeout
- The time it waits for a consumer to consume the value (in ms).
:park-time-fn
- A function that returns an integer (in ms). The integer will be used as the
park time before the
go
block re-polls the ticker url again. This is dictated by the source’s timing restriction.
To create a feed from, say, CampBx.
(create-ticker-feed
"http://campbx.com/api/xticker.php"
;; GET request timed out after 30s
:get-timeout 30000
;; If no process is consuming the out channel, re-poll
:async-put-timeout 10000
;; Wait at least 1s between each poll
;; rand-int is to introduce some variance
:park-time-fn #(+ 1000 (rand-int 1500)))
Publishing feeds
The return value for create-ticker-feed
is a channel. From that channel, a
consumer can acquire the market data and then perform whatever action it needs
to perform. In the publisher’s case, whenever it receives any market data, it
will publish the data out to a topic exchange of a RabbitMQ instance. Actually,
the publisher I implemented does a little bit more. All the data that it
receives from the channel get passed through a function (using
mapv<). The
function is used to clean up the market data. One example for such function is
to transform all the string keys into Clojure keywords
.
Subscribing to feeds
As described earilier, the data will be published to a RabbitMQ’s topic
exchange. Each feed will have the routing key of the form ticker.$feedname
.
So, to subscribe to the CampBx feed, you will need to
subscribe to ticker.campbx
. Moreover, if you want to subscribe to all feeds,
then you will need to subscribe to ticker.#
.
Bitpub comes a basic consumer the subscribe
to all feeds, and dump all the data out to STDOUT
.
Final words
One of the other reasons why I started bitpub, besides learning about RabbitMQ, was to build a notification system for Bitcoin prices at different exchanges. So, users can get notifications when the prices of Bitcoin rise above or fall below some thresholds defined by the users. Another usage of bitpub is to allow trading bot to get the market data and make decision on how to trade.
Unfortunately, life and work get in the way, so I haven’t started on the rest of the components of the notification system. And, that’s why I haven’t written anything about bitpub until months later. Anyway, try it and play around with it.