Pyroclast Roaming & Clojure SDK Open Availability

Michael Drogalis - 07/11/2017

I’m pleased to announce the open availability of Pyroclast Roaming, along with its complete Clojure SDK. Roaming is a container for the Pyroclast event streaming platform. Outfitted with our internal streaming engine, Roaming is able to interpret streaming programs written by its SDKs and execute them as Pyroclast would in a fully distributed, production-grade environment.

Roaming’s Clojure SDK is data-driven - meaning idiomatic Clojure comes naturally. In this post, we’ll walk through how to get started with Roaming right now.

Get the container

The first thing you’ll need is the Roaming Docker container. Pull it, then start a container, exposing port 9700.

$ docker pull pyroclastio/roaming:0.1.2

$ docker run -it -p 9700:9700 pyroclastio/roaming:0.1.2

Wait a moment for the the uberjar to launch. Once it echoes it’s configuration, you’re ready to rock. Let’s try an example.

Make a new Clojure project

As one does, make a new Clojure project using your favorite build tool. Add the following as a dependency:

[io.pyroclast/pyroclast-clojure "0.1.6"]

Example 1: Streaming ETL

For our first example, we’ll work through a simple streaming ETL pipeline. We begin by importing all the Roaming namespaces with functions we might use. The vast majority of the API directly mirrors Clojure core, so the function names should do what you think they do if you use Clojure day-to-day.

(ns my.project
  (:require [clojure.test :refer :all]
            [pyroclast-clojure.v1.roaming.client :as roaming]
            [pyroclast-clojure.v1.roaming.seq :as pseq]
            [pyroclast-clojure.v1.roaming.string :as string]
            [pyroclast-clojure.v1.roaming.math :as math]
            [pyroclast-clojure.v1.roaming.time :as time]
            [pyroclast-clojure.v1.roaming.service :as s]
            [pyroclast-clojure.v1.roaming.filters :as f]
            [pyroclast-clojure.v1.roaming.aggregations :as a]
            [pyroclast-clojure.v1.roaming.coerce :as c]
            [pyroclast-clojure.v1.roaming.topic :as t]))

Next, we define a configuration endpoint. The only key that’s needed is the endpoint to Roaming, which for Docker, we started locally on port 9700.

(def config {:endpoint "http://127.0.0.1:9700"})

The last thing we’ll define before our service code is the sample record set that we want Roaming to stream through. To keep things simple, we’ll supply 1 record. Data sets must be a sequence of maps.

(def records [{"sentence" "Pyroclast Roaming is a tool virtualizes Pyroclast's runtime."}])

Now that the initial set up is done, let’s define a service that reads events off an input topic. These events represent sentences. We’ll split apart the sentences into their individual words, split those words into their own events, then clean them up by lowercasing them and stripping out non alpha-numeric characters. We’ll stream the results to another topic for further processing.

Most functions in the SDK take the first argument of a “source key” - that is, the key in the incoming event to apply the function to. Similarly, most functions signatures end with an optional arguments map which allows for a “destination key” - where to put the result of that function. When omitted, the result simply replaces the source key.

(def service (-> (s/new-service)
                 (t/input-topic "input")
                 (string/split-whitespace "sentence")
                 (pseq/explode "sentence" {:dst "word"})
                 (string/replace "word" "[^a-zA-Z0-0]" "")
                 (string/lower-case "word")
                 (t/output-topic "output")))

Finally, let’s add a test to ensure the service does what it’s supposed to do.

(deftest test-word-splitting
  (let [simulation (roaming/simulate! config service records)]
    (is (:success? simulation))
    (is (= [{"word" "pyroclast"}
            {"word" "roaming"}
            {"word" "is"}
            {"word" "a"}
            {"word" "tool"}
            {"word" "virtualizes"}
            {"word" "pyroclasts"}
            {"word" "runtime"}]
           (get-in simulation [:result :output-records])))))

Voila, streaming ETL in plain Clojure. By invoking a subsequent API call at the end of our function chain, we can directly run this service in a distributed environment on Pyroclast’s cloud, without Docker.

Streaming ETL is fun, but what about rolling data up over time?

Example 2: Streaming aggregations

Suppose our records are being streamed from an IoT sensor emitting temperature readings and other reports. Our first goal is to convert the readings from fahrenheit to celsius, then clean up the digits and output the data elsewhere. Let’s see what that looks like.

(def temperature-records
  [{"sensor-id" "1" "event-type" "reading" "value" "50.24" "unit" "fahrenheit"}
   {"sensor-id" "2" "event-type" "reading" "value" "48.37" "unit" "fahrenheit"}
   {"sensor-id" "3" "event-type" "reading" "value" "52.98" "unit" "fahrenheit"}
   {"sensor-id" "2" "event-type" "ping"}
   {"sensor-id" "1" "event-type" "reading" "value" "49.53" "unit" "fahrenheit"}
   {"sensor-id" "3" "event-type" "reading" "value" "51.13" "unit" "fahrenheit"}
   {"sensor-id" "2" "event-type" "reading" "value" "50.03" "unit" "fahrenheit"}
   {"sensor-id" "1" "event-type" "ping"}
   {"sensor-id" "2" "event-type" "reading" "value" "50.01" "unit" "fahrenheit"}])

(def service (-> (s/new-service)
                 (t/input-topic "sensor-events")
                 (f/= "event-type" "reading")       ;; Only interested in temperature readings.
                 (c/parse-vals {"value" "double"})  ;; Parse strings into doubles.
                 (math/minus "value" 32)            ;; Fahrenheit to celsius math.
                 (math/divide "value" 1.8)
                 (pseq/assoc-in ["unit"] "celsius") ;; Update the event to reflect the new unit.
                 (math/round-decimals "value" 2)    ;; Tidy up our decimals.
                 (t/output-topic "as-celsius")))

Reasonably straightforward. Before we start aggregating, let’s test the I/O of this service to spot-check our logic.

(deftest test-streaming-temperature-sensor-readings
  (let [simulation (roaming/simulate! config service temperature-records)]
    (is (:success? simulation))
    (is (= [{"sensor-id" "1" "event-type" "reading" "value" 10.13 "unit" "celsius"}
            {"sensor-id" "2" "event-type" "reading" "value" 9.09 "unit" "celsius"}
            {"sensor-id" "3" "event-type" "reading" "value" 11.66 "unit" "celsius"}
            {"sensor-id" "1" "event-type" "reading" "value" 9.74 "unit" "celsius"}
            {"sensor-id" "3" "event-type" "reading" "value" 10.63 "unit" "celsius"}
            {"sensor-id" "2" "event-type" "reading" "value" 10.02 "unit" "celsius"}
            {"sensor-id" "2" "event-type" "reading" "value" 10.01 "unit" "celsius"}]
           (get-in simulation [:result :output-records])))))

Celsius readings rounded to 2 decimal places. Wonderful. Breaking apart services into modular components is a good practice, and since it’s all data under the hood without any macros or other tricks, it’s easy to factor out the unit conversion code.

(defn f-to-c [service k]
  (-> service
      (math/minus k 32)
      (math/divide k 1.8)
      (pseq/assoc-in ["unit"] "celsius")))

(def service (-> (s/new-service)
                 (t/input-topic "sensor-events")
                 (f/= "event-type" "reading")
                 (c/parse-vals {"value" "double"})
                 (f-to-c "value")                   ;; Replaces inline math.
                 (math/round-decimals "value" 2)
                 (t/output-topic "as-celsius")))

Looks better. But what about those aggregations? In this next portion, we replace the output topic with an aggregation. Here, we take the minimum, maximum, and average of all readings, grouped by sensor ID. A global window indicates that we are interested in all events, regardless of when they happened. More on that later!

(def service
  (-> (s/new-service)
      (t/input-topic "sensor-events")
      (f/= "event-type" "reading")
      (c/parse-vals {"value" "double"})
      (f-to-c "value")
      (a/aggregations
       [(a/min "min-reading" "value" (a/globally-windowed))
        (a/max "max-reading" "value" (a/globally-windowed))
        (a/average "avg-reading" "value" (a/globally-windowed))]
       "sensor-id")))

aggregations executes a collection of aggregates over a stream of events. now for the test. Each aggregate is available by name, and we’ll see our sensor values grouped by the key “sensor-id”.

(deftest test-aggregated-temperature-reads-by-id
  (let [simulation (roaming/simulate! config service temperature-records)

        min-reading (get-in simulation [:result :aggregates "min-reading"])
        max-reading (get-in simulation [:result :aggregates "max-reading"])
        avg-reading (get-in simulation [:result :aggregates "avg-reading"])]

    (is (:success? simulation))

    (is (= {"1" {"value" 9.738888888888889}
            "2" {"value" 9.094444444444443}
            "3" {"value" 10.627777777777778}}
           min-reading))

    (is (= {"1" {"value" 10.133333333333335}
            "2" {"value" 10.016666666666667}
            "3" {"value" 11.655555555555553}}
           max-reading))

    (is (= {"1" {"value" 9.936111111111112}
            "2" {"value" 9.705555555555556}
            "3" {"value" 11.141666666666666}}
           avg-reading))))

A materialized view is produced that’s representative of the requested aggregation. For our final example, let’s factor time into the equation.

Example 3: time-windowed aggregations

It’s often useful to aggregate based on timestamps that are attributes of the incoming data. For this example, we’ll make a new sample data set.

(def records
  [{"page" "/console"
    "browser" "chrome"
    "country" "usa"
    "timestamp" "2017-07-11T16:37:08.000-00:00"}
   {"page" "/console"
    "browser" "chrome"
    "country" "br"
    "timestamp" "2017-07-11T16:34:02.000-00:00"}
   {"page" "/store"
    "browser" "chrome"
    "country" "usa"
    "timestamp" "2017-07-11T16:48:15.000-00:00"}
   {"page" "/console"
    "browser" "firefox"
    "country" "usa"
    "timestamp" "2017-07-11T16:01:53.000-00:00"}
   {"page" "/store"
    "browser" "chrome"
    "country" "br"
    "timestamp" "2017-07-11T16:05:35.000-00:00"}
   {"page" "/console"
    "browser" "chrome"
    "country" "usa"
    "timestamp" "2017-07-11T15:20:08.000-00:00"}
   {"page" "/about"
    "browser" "chrome"
    "country" "can"
    "timestamp" "2017-07-11T16:21:01.000-00:00"}
   {"page" "/console"
    "browser" "firefox"
    "country" "usa"
    "timestamp" "2017-07-11T16:42:15.000-00:00"}
   {"page" "/console"
    "browser" "chrome"
    "country" "usa"
    "timestamp" "2017-07-11T16:24:59.000-00:00"}
   {"page" "/about"
    "browser" "chrome"
    "country" "usa"
    "timestamp" "2017-07-11T16:37:08.000-00:00"}
   {"page" "/console"
    "browser" "firefox"
    "country" "br"
    "timestamp" "2017-07-11T16:33:14.000-00:00"}
   {"page" "/console"
    "browser" "firefox"
    "country" "usa"
    "timestamp" "2017-07-11T16:37:03.000-00:00"}
   {"page" "/store"
    "browser" "chrome"
    "country" "can"
    "timestamp" "2017-07-10T15:24:08.000-00:00"}])

To aggregate with event time, we’ll need to parse our string timestamps into Unix ms since the epoch. We use parse-datetime with the specified format to get that job done, then aggregate into 15 minute fixed windows over that value, counting the instances.

(def service
  (-> (s/new-service)
      (t/input-topic "page-views")
      (time/parse-datetime "timestamp" "YYYY-MM-dd'T'HH:mm:ss.SSSZ" {:dst "parsed-time"})
      (a/aggregations
       [(a/count "windowed-page-views" (a/fixed-windows-of 15 "minutes" "parsed-time"))])))

Executing a test reveals the aggregate slices into 15 minute windows with lower and upper timestamp bounds, as well as the number of events that fell into the range.

(deftest test-page-views-over-fixed-windows
  (let [simulation (roaming/simulate! config service records)]
    (is (:success? simulation))
    (is (= [{"bounds" {"lower" 1499790600000 "upper" 1499791499999} "value" 6}
            {"bounds" {"lower" 1499791500000 "upper" 1499792399999} "value" 1}
            {"bounds" {"lower" 1499788800000 "upper" 1499789699999} "value" 2}
            {"bounds" {"lower" 1499786100000 "upper" 1499786999999} "value" 1}
            {"bounds" {"lower" 1499789700000 "upper" 1499790599999} "value" 2}
            {"bounds" {"lower" 1499699700000 "upper" 1499700599999} "value" 1}]
           (get-in simulation [:result :aggregates "windowed-page-views"])))))

A final variation on this example groups events by country, and again by browser, still over 15 minute windows:

(def service
  (-> (s/new-service)
      (t/input-topic "page-views")
      (time/parse-datetime "timestamp" "YYYY-MM-dd'T'HH:mm:ss.SSSZ" {:dst "parsed-time"})
      (a/aggregations
       [(a/count "windowed-page-views" (a/fixed-windows-of 15 "minutes" "parsed-time"))]
       ["country" "browser"])))

Pyroclast automatically sub-groups based on each specified category: country, and then browser.

(deftest test-page-views-over-grouped-windows
  (let [simulation (roaming/simulate! config service records)]
    (is (:success? simulation))
    (is (= {"usa" {"chrome" [{"bounds" {"lower" 1499790600000 "upper" 1499791499999} "value" 2}
                             {"bounds" {"lower" 1499791500000 "upper" 1499792399999} "value" 1}
                             {"bounds" {"lower" 1499786100000 "upper" 1499786999999} "value" 1}
                             {"bounds" {"lower" 1499789700000 "upper" 1499790599999} "value" 1}]
                   "firefox" [{"bounds" {"lower" 1499788800000 "upper" 1499789699999} "value" 1}
                              {"bounds" {"lower" 1499790600000 "upper" 1499791499999} "value" 2}]}
            "br" {"chrome" [{"bounds" {"lower" 1499790600000 "upper" 1499791499999} "value" 1}
                            {"bounds" {"lower" 1499788800000 "upper" 1499789699999} "value" 1}]
                  "firefox" [{"bounds" {"lower" 1499790600000 "upper" 1499791499999} "value" 1}]}
            "can" {"chrome" [{"bounds" {"lower" 1499789700000 "upper" 1499790599999} "value" 1}
                             {"bounds" {"lower" 1499699700000 "upper" 1499700599999} "value" 1}]}}
           (get-in simulation [:result :aggregates "windowed-page-views"])))))

Running in the Pyroclast Cloud

Finally, we come full circle to where our journey began. These exact programs, unmodified, are able to run inside Pyroclast’s cloud environment. When services run inside Pyroclast, they can connect to real topics and blast through millions of events per second at low latency. These services hook into our robust streaming framework seamlessly deal with any unexpected failures. This capability is truly exciting. Join our beta program to get access to the full environment today.