Metamorphic: real-time complex event processing for Clojure

Michael Drogalis - 09/17/2017

Distributed Masonry is pleased to open source Metamorphic, a Clojure(Script) complex event processing library. Complex event processing (CEP) is a technique for analyzing an input stream for sequences of events that match patterns. Patterns are expressed as a series of rules with properties denoting their relative positional attributes. In this post, we’ll explore why we built Metamorphic, what’s special about it, and what’s coming closely behind it in the next 4 weeks. (Impatient? Skip to some examples!)

Motivation

These days at Distributed Masonry, we work on event streaming problems 24/7. Of all the challenges we face, we are recurringly tasked with some form of performing pattern matching over input streams. Pattern matching over event streams is usually discussed in the context of anomaly detection - that is, the ability to detect when something is faulty in real-time. When used appropriately, applications of pattern matching can extend far beyond merely looking for problems in a sea of data. Given a rich event stream to work with, pattern matching opens up the ability to ask more meaningful questions about your data set, and instead find opportunities in that sea of data.

Despite pattern matching’s herculean powers, it remains a curiously difficult problem to solve. In order to detect successful matches, all in-progress matches need to be scanned and updated upon the arrival of a new event. Additionally, each new event may also constitute the beginning of a fresh match. The elephant in the room is that keeping all the data needed to find patterns over time can pile up memory to prohibitive degrees. Further, the amount of processing power needed to find matches can sharply increase over time. Metamorphic is a robust solution to these problems with a strong foundation on academic research.

Oh yeah? Let’s see it

For a simple example, we’ll work through a problem we recently solved in the gaming industry. Given a stream of events from a massive, multiplayer online game, detect players who complete a particular series of quests within 1 hour of starting the challenge. Players who complete the challenge are immediately awarded 1 hour of a special in-game skill. Let’s step back and examine why this is a hard problem to solve, then we’ll express the solution with Metamorphic.

If you’re not familiar with CEP, your initial reaction to solving this problem would probably be to store all events in a database. Upon the arrival of every new event, you’d write the event to the database, then do a select for all relevant events that could constitute a match. This select would need to scan for all events relevant to the player, and again constrain by only picking up events that occurred within the last hour. When a match successfully occurs, you’d send the award to the player, then mark the events as “completed” so that they’re not picked up again for another match.

This solution will work for a little while if your volume of events is low enough. After a while, however, the contention on the database will become high. The cost of these types of queries grows, and a cage match ensues with other writers trying to put more events into the table. This is a performance nightmare situation.

Let’s look at how we solve this problem with Metamorphic instead. Metamorphic uses a specialized data structure to minimize memory overhead and avoid performance penalties that a full scan would incur.

(ns metamorphic.quest-analysis
  (:require [metamorphic.api :as m]
            [metamorphic.runtime :as rt]))


;; First, define a set of predicates - one for each stage of the pattern match.
(defn start-challenge? [event history pattern-sequence pattern]
  (= (:event/action event) :start-trio-challenge))

(defn complete-deep-atlantis-quest? [event history pattern-sequence pattern]
  (and (= (:event/action event) :complete-quest)
       (= (:quest/id event) :deep-atlantis)))

(defn complete-lost-pearl-quest? [event history pattern-sequence pattern]
  (and (= (:event/action event) :complete-quest)
       (= (:quest/id event) :lost-pearl)))

(defn complete-red-stone-quest? [event history pattern-sequence pattern]
  (and (= (:event/action event) :complete-quest)
       (= (:quest/id event) :red-stone)))


;; Next, assemble the predicates into a sequence, describing their positional
;; constraints with `next`, `followed-by`, and `followed-by-any`. We also
;; force this pattern match to occur within 1 hour of its starting time.
(def pattern-sequence
  (-> (m/new-pattern-sequence "Trio quest timed challenge" {:within 3600000})
      (m/begin "start-challenge" start-challenge?)
      (m/followed-by "stage-1" complete-deep-atlantis-quest?)
      (m/followed-by "stage-2" complete-lost-pearl-quest?)
      (m/followed-by "stage-3" complete-red-stone-quest?)))


;; For this example, we'll push all of our events in via a vector.
;; For a real workload, we'd read these off of core.async, or another streaming
;; storage service.
(def events
  [{:event/action :log-in
    :event/timestamp #inst "2017-09-16T12:30:45"}

   {:event/action :start-trio-challenge
    :event/timestamp #inst "2017-09-16T12:32:13"}

   {:event/action :buy-item
    :event/timestamp #inst "2017-09-16T12:38:34"
    :item/name "sword"}

   {:event/action :complete-quest
    :event/timestamp #inst "2017-09-16T12:42:56"
    :quest/id :deep-atlantis}

   {:event/action :sell-item
    :event/timestamp #inst "2017-09-16T12:45:41"
    :item/name "sword"}

   {:event/action :complete-quest
    :event/timestamp #inst "2017-09-16T12:47:01"
    :quest/id :lost-pearl}

   {:event/action :complete-quest
    :event/timestamp #inst "2017-09-16T12:53:20"
    :quest/id :red-stone}])


;; Compile a runtime for each pattern sequence. The runtime
;; keeps track of the progression of matches as events spool in.
(def runtime (rt/initialize-runtime pattern-sequence))


;; Tick across each event - passing in the runtime, and receiving a
;; new one after each segment is evaluated. Time is defined by the user,
;; not the wall clock, allowing for retroactive event time.
(def advanced-runtime
  (reduce
   (fn [next-rt event]
     (let [opts {:event-time (.getTime (:event/timestamp event))}]
       (rt/evaluate-event next-rt event opts)))
   runtime
   events))


;; Matches and places into a vector for extraction.
(def matches (:matches advanced-runtime))

Matches are produced into a key that you can take from after every iteration of the runtime. For this example, we match and filter out irrelevant events, yielding:

[({:event/action :start-trio-challenge
   :event/timestamp #inst "2017-09-16T12:32:13.000-00:00"}

  {:event/action :complete-quest
   :event/timestamp #inst "2017-09-16T12:42:56.000-00:00"
   :quest/id :deep-atlantis}

  {:event/action :complete-quest
   :event/timestamp #inst "2017-09-16T12:47:01.000-00:00"
   :quest/id :lost-pearl}

  {:event/action :complete-quest
   :event/timestamp #inst "2017-09-16T12:53:20.000-00:00"
   :quest/id :red-stone})]

This example has been clearly simplified from its original usage. One notable alteration that we made for the sake of readability is the notion of ownership. Here, we assume all events belong to the same user. In a production setting, we use strong isolation with clojure.core/group-by to separate match instances across logical tenancies.

Metamorphic incrementally computes and updates existing pattern matches after each event in a memory and compute efficient manner.

Metamorphic runs anywhere

Metamorphic has been specially designed to run as a standalone library, which is a sharp contrast from traditional implementations of CEP. Typically, CEP libraries have tight integration with a stream processor or database and are generally difficult/impossible to use without. Metamorphic has no external dependencies, and runs readily in ClojureScript.

At this point, you’re probably wondering how Metamorphic can be useful for high-volume event streams if it’s only a library. While Metamorphic doesn’t require an external stream processor, we have, of course, designed it to run Onyx to pattern match across high volume streams. onyx-cep has actually already gone public, though we’ll make a separate announcement later in the month after documentation is further along.

Naturally, pattern matching is already first-class construct in Pyroclast, where its power can be seamlessly combined with continuous queries and streaming joins.

Next steps

We’re excited to put Metamorphic out for public consumption. Following up closely behind it will be a JavaScript port - metamorphic-js, as well as a set of visualization tools for live-debugging pattern matches. We’re also expecting to have onyx-cep tuned for public usage in the next 2 weeks.

We hope you find Metamorphic useful. Thanks for reading!