Connect realtime event streaming to any toolset

Michael Drogalis - 07/03/2017

It’s our mission to deliver a platform that combines powerful streaming primitives with a truly great product experience. Realizing the future of event stream processing’s full potential means delighting customers – and keeping them extremely productive.

Today, I’m pleased to announce the latest addition to Pyroclast: Roaming. Pyroclast Roaming lets teams wield the full power of our platform from the comfort of their favorite programming language and development workflow.

Pyroclast is a Platform as a Service for building realtime event streaming applications, with a particular focus on handling event state in a correct and performant manner. Developers build services with our APIs that perform common streaming operations, like enrichment, windowed joins, and materialized views. Services launch and pair with a scalable backing stream processor, then become available for control through HTTP endpoints.

When you use Pyroclast Roaming:

  • Automated tests can be designed in your respective language to verify correctness on your own terms.
  • Changes to services can be managed by your own version control system, in your own storage.
  • Service changes can be tested with Roaming via continuous integration before they’re deployed to Pyroclast.
  • Going distributed on Pyroclast’s high-performance streaming enviroment is completely seamless.

Under the hood, Roaming comprises a Docker container and language-level SDKs. The container virtualizes Pyroclast’s streaming engine and APIs. Each SDK is tailored for language-idiomatic access to the Roaming container and the full API.

Let’s see an example of what this looks like in practice. True to our heritage, the first SDK implementation of Roaming is in Clojure.

Roaming in action: streaming ETL

For our first example, we’ll build out a service that performs streaming ETL. We’ll read messages from a topic called “events”, then filter for only events that are tagged as pageviews. Of those events, we’ll filter for pyroclast.io URLs. Finally, we’ll stream the records to a new Pyroclast topic for later access.

(def service
  (-> (new-service)
      (input-topic "events")
      (f/= :event :page-view)
      (string-starts-with? [:context :url] "https://pyroclast.io")
      (output-topic "page-views")))

Notice that we use the threading macro (->). Each SDK uses idiomatic syntax supportive of its own programming language. The API calls are plain functions that take and return a service data structure.

Next, let’s define some sample records that we’ll give to Roaming. Roaming will take our service and these records, and return what happens when the records flow through it at runtime.

(def records
  [{:user-id "sally"
    :event :page-view
    :timestamp "2017-06-29T16:13:36.696-00:00"
    :context {:url "https://pyroclast.io/console"}}

   {:user-id "fred"
    :event :page-view
    :timestamp "2017-06-28T16:02:21.696-00:00"
    :context {:url "https://pyroclast.io/topics"}}

   {:user-id "mary"
    :event :log-out
    :timestamp "2017-06-29T16:13:36.696-00:00"
    :context {:reason "idle-time-expired"}}

   {:user-id "john"
    :event :page-view
    :timestamp "2017-06-29T16:14:36.696-00:00"
    :context {:url "https://api.pyroclast.io"}}

   {:user-id "fred"
    :event :page-view
    :timestamp "2017-06-28T16:04:21.696-00:00"
    :context {:url "https://pyroclast.io/console"}}

   {:user-id "fred"
    :event :page-view
    :timestamp "2017-06-28T16:08:21.696-00:00"
    :context {:url "https://pyroclast.io/console"}}

   {:user-id "sally"
    :event :page-view
    :timestamp "2017-06-20T18:54:00.696-00:00"
    :context {:url "https://pyroclast.io/services"}}])

Finally, we’ll write a test to assert what happens when we execute our service. We define a configuration map to connect to the Docker port that Roaming runs inside. Notice that this container is running locally. Roaming does not require an internet connection.

(deftest filter-page-views-test
  (testing "Only main site page view events pass through."
    (let [config {:endpoint "http://127.0.0.1:12000"}
          expected [{:user-id "sally"
                     :event "page-view"
                     :timestamp "2017-06-29T16:13:36.696-00:00"
                     :context {:url "https://pyroclast.io/console"}}
                    {:user-id "fred"
                     :event "page-view"
                     :timestamp "2017-06-28T16:02:21.696-00:00"
                     :context {:url "https://pyroclast.io/topics"}}
                    {:user-id "fred"
                     :event "page-view"
                     :timestamp "2017-06-28T16:04:21.696-00:00"
                     :context {:url "https://pyroclast.io/console"}}
                    {:user-id "fred"
                     :event "page-view"
                     :timestamp "2017-06-28T16:08:21.696-00:00"
                     :context {:url "https://pyroclast.io/console"}}
                    {:user-id "sally"
                     :event "page-view"
                     :timestamp "2017-06-20T18:54:00.696-00:00"
                     :context {:url "https://pyroclast.io/services"}}]
          result (simulate! config service records)]
      (is (:success? result))
      (is (= expected (get-in result [:result :output-records]))))))

Our test passes and returns in about 10 milliseconds on my laptop.

Roaming in action: materialized views

Streaming ETL is cool, but let’s try something more advanced: aggregations. Instead of sending all events to a topic as output, we’ll roll up events into fixed windows of one-hour duration based on their timestamp. Inside each one-hour window, we’ll get a count of the number of events grouped by user ID.

(def service
  (-> (new-service)
      (input-topic "events")
      (f/= :event :page-view)
      (string-starts-with? [:context :url] "https://pyroclast.io")
      (parse-datetime :timestamp "YYY-MM-dd'T'HH:mm:ss.SSSZ")
      (aggregate-together
       [{:name "hourly-page-views"
         :aggregate (a/count (fixed-windows-of 1 :hour :timestamp))}]
       :user-id)))

In this example, we make two additions. First, we parse our string timestamp into milliseconds since the Unix epoch. Second, we aggregate under the alias “hourly-page-views”. Aggregates are given names as a service can host multiple aggregates, and we can query them individually by name.

(deftest aggregate-page-views-test
  (testing "1 hour fixed aggregates grouped by user ID."
    (let [config {:endpoint "http://127.0.0.1:12000"}
          expected {:aggregates
                    {"hourly-page-views"
                     {:contents
                      {"sally"
                       [{:bounds ["2017-06-29T16:00:00.000-00:00"
                                  "2017-06-29T16:59:99.999-00:00"]
                         :value 1}

                        {:bounds ["2017-06-20T18:00:00.000-00:00"
                                  "2017-06-20T18:59:99.999-00:00"]
                         :value 1}]

                       "fred"
                       [{:bounds ["2017-06-28T16:00:00.000-00:00"
                                  "2017-06-28T16:59:59.999-00:00"]
                         :value 3}]}}}}
          result (simulate! config service records)]
      (is (:success? result))
      (is (= expected (get-in result [:result :aggregates]))))))

Roaming returns a map of keys group identifier (user ID) and values aggregates. For our data set, Sally had two separate one-hour windows with page visits, of one visit each. All of Fred’s page visits occurred in the same one-hour window, three times in total. Notice that the last record for Sally is back-dated more than a week behind the others. This doesn’t confuse Pyroclast as it bases its windowing on the event timestamp, not the time at which Pyroclast first saw the record.

Idiomatic usage

Roaming has been designed to be incredibly light on the SDK, meaning that each programming language can choose to present itself in the most idiomatic way possible. In Clojure, functional composition is fashionable. If we wanted to write multiple services only over pageviews to the main site, we can write a function that takes a service, applies the necessary steps, then returns that service. That function can be substituted and reused.

;; Only main page visits.
(defn only-main-site-page-views [service]
  (-> service
     (filters/= :event :page-view)
      (filters/string-starts-with? [:context :url] "https://pyroclast.io")))

;; Roll up 1 hour intervals.
(defn hourly-page-views [service timestamp]
  (agg/aggregate-together
   service
   [{:name "hourly-page-views"
     :aggregate (agg/count (agg/fixed-windows-of 1 :hour timestamp))}]
   :user-id))

;; Shareable timestamp parser.
(defn parse-timestamp [service timestamp]
  (t/parse-datetime timestamp "YYY-MM-dd'T'HH:mm:ss.SSSZ"))

;; Reuse our functions.
(-> (s/new-service)
    (topic/input-topic "events")
    (only-main-site-page-views)
    (parse-timestamp :timestamp)
    (hourly-page-views :timestamp))

Exceptions and debugging

Sometimes you’re going to make mistakes. You might typo a key name or reference a non-existent field. When a record cause an exception, Roaming halts execution and returns with the exception, the original record that set off the failure, and the call-site that the failure transpired.

{:success? false
 :reason "simulation-threw-exception"
 :offending-record
 {:user-id "532553"
  :event "page-view"
  :timestamp "not a timestamp!"
  :context {:url "https://pyroclast.io/console"}}
 :offending-function "io.pyroclast.task-bundles.parse-datetime/parse-datetime"
 :exception-cause "clojure.lang.ExceptionInfo: Invalid format: \"not a timestamp!\""
 :stack-trace
 ["org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)"
  "clj_time.format$parse.invokeStatic(format.clj:160)"
  "clj_time.format$parse.invoke(format.clj:156)"
  "io.pyroclast.task_bundles.parse_datetime$parse_datetime$fn__58062.invoke(parse_datetime.cljc:37)"
  "io.pyroclast.functions$update_keys.invokeStatic(functions.cljc:7)"
  "io.pyroclast.functions$update_keys.invoke(functions.cljc:6)"
  "io.pyroclast.task_bundles.parse_datetime$parse_datetime.invokeStatic(parse_datetime.cljc:33)"
  "io.pyroclast.task_bundles.parse_datetime$parse_datetime.invoke(parse_datetime.cljc:32)"]}

Go hosted & distributed when you’re ready

Roaming is great for local development. Services are run in a deterministic, single threaded environment optimized for speedy launch times. At some point, you’ll want to run your service in a real production-grade environment. Of course, we make this easy. Using the SDK, you can install your service to run on Pyroclast proper in a fully distributed mode. No changes need to be made to your service to do this. Simply instantiate the client with your API keys and deploy.

(create-service! config "page-views-roll-up" service)

(deploy! config "page-views-roll-up" "production")

Try it today

Roaming is available in our private beta program. Sign up to access the beta today.