Freshet - CQL based Clojure DSL for Streaming Queries (Draft)
This blog post is still a draft.
Interest on continuous queries on streams of data has increased over the last couple of years due to the need of deriving actionable information as soon as possible to be competitive in the fast moving world. Existing Big Data technologies designed to handle batch processing couldn’t handle today’s near real-time requirements and distributed stream processing systems like Yahoo’s S4, Twitter’s Storm, Spark Streaming and LinkedIn’s Samza were introduced into the fast growing Big Data eco-system to tackle real-time requirements. These systems are robust, fault tolerant and scalable to handle massive volumes of streaming data, but lack first class support for SQL like querying capabilities. All of these frameworks provide high-level programming APIs in JVM compatible languages.
In the golden era of stream processing research, a lot of work has been done on query engines and languages for stream processing. But we have yet to adapt these work on streaming query languages to above mentioned distributed stream processing systems widely in use today.
Also with the transition from batch to real-time Big Data, different architectures were proposed to handle the integration of batch and real-time systems (Lambda Archiecture) as well as to revolutionized the way we built today’s systems (Kappa Architecture). Even though there aren’t any standards (like SQL and Relational Algebra for DBs) on implementing these architectures, Summingbird implements Lambda Architecture based on monoids. Also there are other ways to implement Lambda Architecture such as Spark’s Scala API for streaming and batch processing. Even though it is possible to implement Kappa Architecture manually using above mentioned frameworks, there aren’t any high-level frameworks like Summingbird for this purpose. Freshet tries to fill this gap by adapting continuous query semantics and execution planning methods discussed by Arasu et. al. in their paper The CQL Continuous Query Language: Semantic Foundations and Query Execution to implement Kappa Architecture on top of Apache Samza.
Before going into details about Freshet , it’s important to discuss Kappa Architecture and CQL. These are the fundamental ideas and technologies which Freshet is based on.
Kappa Architecture #
Kappa Architecture which has the notion of – Everything Is A Stream – is proposed as an alternative to Lambda Architecture. In the link above, the author argues that, stream processing is a generalization of data-flow DAGs with support for check-pointing intermediate results and continuous output to the end user. And he emphasizes that we can actually use current distributed stream processing framework like Aapache Samza combine with message queue like Kafka, which retains ordered data, to implement use cases handled by Lambda Architecture. Reprocessing is accomplished by replaying the stream through new versions of stream processing code or for a completely new algorithm.
CQL - Continuous Query Language #
CQL - aka Continuous Query Language - is a SQL- based declarative language for expressing queries over data streams and time varying relations. CQL’s abstract semantics are based on two data types - streams and relations - and three types of operations - stream-to-relations, relation-to-relation and relation-to-stream. In CQL, stream is a infinite bag of tuples and relation is a mapping from time Τ to a finite but unbounded bag of tuples. This special variant of the standard relation is called instantaneous relation in the context of CQL, because a relation R in CQL represent a finite but unbounded bag of elements at a given time instance τ. CQL takes advantage of well understood relational semantics and keeps the language simple and queries compact by introducing minimal changes to SQL.
- Window specification derived from SQL-99 to transform streams to relations
- Three new operators to transform time varying relations into streams.
CQL Sample - Filtering A Stream #
SELECT Rstream(*)
FROM PosSpeedStr [Now]
WHERE speed > 65
CQL uses SQL for relation-to-relation transformations, but relations in CQL are different from relations in SQL. CQL relations vary with time. CQL introduces two new concepts: insert/delete streams, which encode both streams and relations in a unified way, and synopses, which contain state (e.g. a counter or buffer of messages) for an operator.
Another important thing is the fact that we can use traditional database relations in CQL queries which enables us to do things like stream-relation joins common in real world applications.
Freshet #
Let’s come back to Freshet . Freshet is a first step towards a complete implementation of Kappa Architecture based on CQL to support continuous queries. Freshet implements a subset (select, windowing, aggregates) of CQL on top of Apache Samza. Freshet implements RStream and IStream relation-to-stream operators, tuple and time based sliding windows to convert streams to relations and basic relation to relation operators for implementing business logic. Following CQL, Freshet uses insert/delete stream to model instantaneous relations.
As shown in above figure, Freshet is built out of five main logical components.
- Query DSL: Implemented as a Clojure DSL and used to express CQL queries against streams. Queries expressed in Freshet DSL will get compiled in to streaming relation algebra model and then will get converted into an execution plan that consists of a set of operators written as Samza stream tasks connected together as a DAG via Kafka queues.
- Query Compiler: Compile SQL model generated from DSL to intermediate representation which can be converted in to execution plan.
- Execution Planner: Generate execution plans (Samza jobs connected via input, intermediate and output streams to form a DAG) based on intermediate representation and current status of the Freshet cluster.
- Scheduler: Does the actual scheduling of Samza Jobs.
- Query Operators: Samza stream tasks. Implement CQL operators like window, select, aggregate, and view generation operators like rstream, istream. These operators, connected via intermediate streams, perform stream processing according to the query express in Freshet DSL.
Freshet DSL #
Freshet Clojure DSL is inspired by Korma Clojure DSL for SQL. Freshet DSL follows the same style as Korma DSL. There are two main constructs in the current Freshet DSL. defstream and select queries. These are two forms I am planning to support in the initial version. Other constructs will be added later.
defstream #
Used to define a new stream. Streams defined using defstream represent Kafka topics in the current implementation. New modifiers will be added to defstream in future to support different input sources. The most important modifier for defstream is the stream-fields modifier, which modifies the stream definition with a field name/type mapping. Clojure keywords are used to specify field names and these keywords will get converted to strings internally. There are pre-defined keywords for specifying types like :string, :integer. Below is how we define a wikipedia activity stream to use in stream queries.
(defstream wikipedia-activity
(stream-fields [:title :string
:user :string
:diff-bytes :integer
:diff-url :string
:unparsed-flags :string
:summary :string
:is-minor :boolean
:is-unpatrolled :boolean
:is-special :boolean
:is-talk :boolean
:is-new :boolean
:is-bot-edit :boolean
:timestamp :long])
(ts :timestamp))
select #
Used to define select queries over streams. Stream filtering using where and aggregators will be supported in the initial version and joins will be added next. Below is a sample select query which filters a stream.
(select wikipedia-activity
(modifiers :istream)
(window (unbounded))
(where (> :diff-bytes 100)))
Query Execution #
Freshet follows the same execution semantics as CQL and flow of execution is shown below.
Window operator converts input stream into a time varying relation and the time varying relation is encoded as insert/delete stream to make it easy to implement relational operators as streaming operators. The relational part of the query is converted to a DAG of Samza operators, which will operate on insert/delete streams according to the query definition. Finally the stream materializer will materialize the stream according to specification of the original query.
Why Samza #
Freshet chose Samza for its initial implementation mainly because
- Samza is fully integrated with Kafka
- Samza supports and encourages stateful stream processing
- Samza’s local storage is really useful for implementing CQL synopses
Samza’s property based job configuration is the only limitation when it comes to Freshet. A Storm-like topology builder would have come in handy for Freshet-like layers on top of Samza.
Current Status and Future Work #
I am working on bridging the Clojure DSL and the CQL operator layer (Samza stream tasks) currently. I plan to do the initial release within couple of weeks. After the initial release of Freshet, I am planning to contribute to Apache Samza’s Stream Query implementation, which is also based on CQL. Once finished, Freshet can be updated to use Apache Samza’s CQL operators directly, rather than having its own.