All I want for Christmas is…a new real-time stream processing technology:
ClickHouse®!
I wonder what happens if I stack these blocks?! (Source: Adobe Stock)
My first Christmas Special blog was one of my first ever Apache Kafka blogs, An Apache Kafka® Christmas Tree Light Simulation. Around this time last year, I wrote my second Christmas Special blog, An Apache Kafka® and RisingWave Stream Processing Christmas Special.
This blog is also a Christmas Special—a sequel to last year’s episode!
Introduction
I’ve been interested in the problems of temporal reasoning, stream and event processing, ML over streaming data, etc., for a while no. (In fact, thinking back I realized it’s exactly 40 years since I finished my MSc research into autonomous robot machine learning in a toy block’s world).
One of the features of this toy block’s world was that the robot had to learn causal laws about the interaction of itself (a simulated robot arm and simulated vision), and a simulated block’s world (with simulated gravity, a center of gravity for block stacking, falling blocks, etc.) over time.
I used a combination of Prolog (a logic programming language) and temporal logic (Allen’s temporal logic) to enable temporal reasoning—i.e. what events always/never occurred before, at the same time as, or after other events, etc.
Ten years later I snuck temporal logic into the Australian Broadcasting Corporation’s new D-Cart system (a digital radio studio); I designed and implemented extensions to the UNIX-like file system to enable storage, search, and retrieval of multiple media types alongside the audio data.
A temporal file system made sense as it could handle finding transcriptions that related to a portion of audio and other tricks that I’m pretty sure the broadcasters (or probably even the other developers) didn’t know existed under the console.
As usual with such old computer technology, there isn’t much trace of it these days. There is a jogger wheel in the powerhouse museum, and this Electronics Australia issue reports that each D-Cart had 12×1.2GB disk drives (compared to the 2001 1st generation iPod with 5 GB and the capacity for 1,000 songs)—I do recall disk space being at a premium!
Time to fast forward a few years (ok, decades) and see what the state of the art is for real-time temporal data processing now.
Santa’s Elves’ toy packing problem
(Source: Adobe Stock)
Here’s a recap of the predicament that Santa got himself into last year:
Santa’s Elves are normally very productive, but recently something has gone amiss in Santa’s workshop! The “toy” Elves and the “box” Elves have stopped talking to each other and have set up 2 separate production lines.
Instead of a single streamlined production system with the toys and boxes being matched on 1 conveyor, there are now 2 conveyor belts. The “toy” conveyor is run by one group of Elves, and they put random toys on for packing. The “box” conveyor is run by a different group of Elves, and they just toss random boxes onto it.
Why does this matter?
Well, each toy has a unique “toyType” and requires a specific box matching that type for it to be packed in—each toyType has a custom box for size, shape, and instructions etc., so you can’t use a random box for a given toy.
The packing Elves therefore need some help to match toys and boxes. We assume they pick a toy off the toy conveyor and have to find a matching box from the box conveyor as fast as possible, then place the toy in the box on another conveyor belt which takes it to Santa’s sack for delivery.
But time is critical! Because of the speed of the process, there’s a maximum grace period of 1 minute for each packing Elf to find a matching box, otherwise the unmatched toys and unused boxes fall off the end of the conveyor belt and an alarm goes off and the whole process grinds to an almighty halt!
Last year we helped Santa and Elves out by building a couple of demonstration applications to help match toys and boxes across real-time streams:
- An Apache Kafka® Streams solution: using Kafka + Kafka Streams DSL
- A RisingWave solution: using Kafka + RisingWave
Kafka Streams DSL is the approach I’ve used most in the past and I’m way more proficient with that than SQL-based approaches. RisingWave is an open source streams processing database that relies on materialized views and cloud native storage for speed and scalability and uses a SQL variant to program it.
So, what’s new this year? NetApp Instaclustr recently launched a brand new service: Instaclustr managed ClickHouse®.
I know nothing (much) about ClickHouse, but after reading this introduction by my DevRel colleague Hichem Kenniche, I decided it was worth trying out—it’s optimized for real-time analytics over columns, using SQL, and has multiple ways of integrating with Kafka (including Kafka® Connect and the Kafka table engine).
But first, a caveat: for this blog I will not be integrating it with Kafka, the data will be static and the queries run on demand rather than continuously as would be expected in a stream processing pipeline.
Why did I start with this approach? Actually, I did try and jump in the deep end to build a real-time Kafka+ClickHouse pipeline but realized it was too tricky given my lack of experience with both ClickHouse and SQL—I needed a gentler introduction.
The easiest way to try out ClickHouse is to sign up for a no obligation (and no credit card) 30-day free trial of our managed services. The support documentation on creating and configuring a ClickHouse cluster is here.
Following the documentation I easily spun up a developer-sized ClickHouse cluster on Instaclustr on AWS in a few minutes with the default settings. Further documentation is available (and examples and details are available on the Instaclustr console) for using ClickHouse.
The easiest way to connect is using a web browser, to the URL/username/password available from the cluster console, or you can connect with the clickhouse client.
You can download ClickHouse and try it yourself (and I did, and it works). It’s open source, and you will need to do this to get the ClickHouse client anyway:
- Download: curl https://clickhouse.com/ | sh
- Start the server: ./clickhouse server
- Start the client: ./clickhouse client
Step 1: Create the toys and boxes conveyors
First, we need to create two tables, representing the two conveyor belts or input streams we used in the problem last year. Note that I don’t know SQL and asked the very helpful ChatGPT for assistance with the more complex ClickHouse SQL—it’s pretty good!
Let’s create the toys and boxes tables:
1 2 3 4 5 |
CREATE TABLE toys ( `timestamp` DateTime, `toyType` UInt32, `toyId` UInt32 ) ENGINE = MergeTree ORDER BY timestamp CREATE TABLE boxes ( `timestamp` DateTime, `toyType` UInt32, `boxId` UInt32 ) ENGINE = MergeTree ORDER BY timestamp |
Each toy/box has a timestamp (when it was put on the conveyor belt), the type of toy (toyType) it either is or it can box (we imagine toys are different sizes, shapes, etc.), and a unique identifier (toyId or boxId). The toys and boxes will be matched over the toyType id.
Merging trees and engines?!
The merging of trees and an engine in a wood-burning locomotive. (Source: Adobe Stock)
Here’s the first mystery: what is an ENGINE and MergeTree?
ClickHouse has multiple Table Engines for different purposes (MergeTree is a family of engines with multiple examples, including one called MergeTree, there are also Log engines, Integration engines, and special engines!) MergeTree is evidently good for speed and lots of data.
Step 2: Populate the toys’ and boxes’ production lines
Given that I didn’t want to use ClickHouse with Kafka just yet, I was a bit worried about how I was going to populate the toys and boxes tables. It was pretty easy to use SQL to do this, and ClickHouse has support for lots of random functions and data generation examples. I used the following SQL to generate 2 minutes’ worth of random data:
1 2 |
INSERT INTO toys SELECT now() - floor(randUniform(0, 120)), floor(randUniform(0, 100)), number FROM numbers(300) INSERT INTO boxes SELECT now() - floor(randUniform(0, 120)), floor(randUniform(0, 100)), number FROM numbers(300) |
It’s a match! (Source: Adobe Stock)
Step 3: Find all matching toys and boxes
Given that the SQL was getting harder, it was time to turn to the guru (ChatGPT). Here’s what I said:
I have two tables in ClickHouse, toys with columns timestamp (DateTime), toyType (UInt32) and toyId (UInt32), and boxes with columns timestamp (DateTime), toyType (UInt32) and boxId (UInt32). Write me a query to return toys and boxes with the same toyType.
As I expected, the answer involved a “JOIN” (one reason I needed help):
1 2 3 4 5 6 7 8 9 10 11 12 |
SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId FROM toys t JOIN boxes b ON t.toyType = b.toyType |
This query performs an inner join on the toyType column, ensuring that only rows with matching toyType values from both tables are returned.
This query worked perfectly with basically the same functionality I obtained last year from Kafka Streams and RisingWave. The output is a table with columns: row, toyTimestamp, toyType, toyId, boxTimestamp and boxId:
- │ 2024-12-05 11:25:29 │ 41 │ 89 │ 2024-12-05 11:26:39 │ 121 │
- │ 2024-12-05 11:25:29 │ 41 │ 89 │ 2024-12-05 11:26:43 │ 143 │
From this you can tell that it’s working ok, for each toy(Id) there are some matching boxes along with extra information such as the toyId they match on and the times. But we have more than one match…
(Source: Adobe Stock)
Step 4: Only find matches within 60 seconds
So far, I had ignored the 60 second time constraint that was present in the original problem (boxes and toys start falling off the conveyors if the Elves can’t find a match within 60s), so next I asked ChatGPT to add a 60s time limit for considering potential matches:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 |
This worked as expected and reduced the number of returned matches.
Step 5: At most one match per toy
However, maybe a better solution is to only produce at most one match per toy per 60s period, so I asked ChatGPT to refine it again and also report the time difference:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
WITH RankedMatches AS ( SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId, ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) AS timeDifferenceInSeconds, ROW_NUMBER() OVER (PARTITION BY t.toyType ORDER BY ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp))) AS rowNum FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 ) SELECT toyTimestamp, toyType, toyId, boxTimestamp, boxId, timeDifferenceInSeconds FROM RankedMatches WHERE rowNum = 1 |
This query uses a Common Table Expression (CTE) named RankedMatches to assign a row number to each match, partitioned by toyType and ordered by the time difference. The outer query then filters to keep only the first row (rowNum = 1) for each toyType.
Here are the first 2 rows of the result, the last column is the time difference in seconds:
- │ 2024-12-05 11:26:19 │ 16 │ 221 │ 2024-12-05 11:26:13 │ 21 │ 6 │
- │ 2024-12-05 11:27:05 │ 37 │ 14 │ 2024-12-05 11:27:10 │ 169 │ 5 │
Step 6: Run the query “now”
I haven’t fully addressed the question of when/how often the query is run, so decided as an interim experiment to modify it to run “now”, so that only the last 60s of data is considered for matches:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
WITH RecentMatches AS ( SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId, ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) AS timeDifferenceInSeconds, ROW_NUMBER() OVER (PARTITION BY t.toyType ORDER BY ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp))) AS rowNum FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 WHERE t.timestamp >= now() - INTERVAL 60 SECOND ) SELECT toyTimestamp, toyType, toyId, boxTimestamp, boxId, timeDifferenceInSeconds FROM RecentMatches WHERE rowNum = 1 |
This query does the following:
- The RecentMatches CTE filters the toys table to include only rows from the last 60 seconds.
- It joins the toys and boxes tables on toyType and ensures the timestamps are within 60 seconds of each other.
- The ROW_NUMBER() function assigns a unique row number to each match, partitioned by toyType and ordered by the time difference.
- The outer query filters to keep only the first match (rowNum = 1) for each toyType.
This should give you at most one match per toyType within the last 60 seconds.
Repopulating the toys and boxes data again, and running this query repeatedly demonstrates that it works—the number of matches starts high and finishes at zero when the 60s has elapsed.
Echidnas rolled up as a ball can easily tumble (Source: Adobe Stock)
Step 7: Tumbling time windows
So far, the way I’ve handled (or ChatGPT has handled) the relative time calculation is just using math. I wondered if, like Kafka Streams and RisingWave, ClickHouse has Time Window Functions?
It does! It has tumble and hop.
So, I asked ChatGPT to change the SQL to use these:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
WITH WindowedMatches AS ( SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId, ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) AS timeDifferenceInSeconds, ROW_NUMBER() OVER (PARTITION BY t.toyType, tumble(t.timestamp, INTERVAL 60 SECOND) ORDER BY ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp))) AS rowNum FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 ) SELECT toyTimestamp, toyType, toyId, boxTimestamp, boxId, timeDifferenceInSeconds FROM WindowedMatches WHERE rowNum = 1 |
This query does the following:
- The WindowedMatches CTE uses the tumble function to create 60 second windows based on the timestamp from the toys table.
- It joins the toys and boxes tables on toyType and ensures the timestamps are within 60 seconds of each other.
- The ROW_NUMBER() function assigns a unique row number to each match within each 60 second window, partitioned by toyType and ordered by the time difference.
- The outer query filters to keep only the first match (rowNum = 1) for each toyType within each 60 second window.
This should give you at most one match per toyType within each 60 second window.
It was a bit hard to understand the results, so I asked ChatGPT to add the window and ORDER BY the windowStart:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
WITH WindowedMatches AS ( SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId, ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) AS timeDifferenceInSeconds, tumbleStart(t.timestamp, INTERVAL 60 SECOND) AS windowStart, tumbleEnd(t.timestamp, INTERVAL 60 SECOND) AS windowEnd, ROW_NUMBER() OVER (PARTITION BY t.toyType, tumble(t.timestamp, INTERVAL 60 SECOND) ORDER BY ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp))) AS rowNum FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 ) SELECT toyTimestamp, toyType, toyId, boxTimestamp, boxId, timeDifferenceInSeconds, windowStart, windowEnd FROM WindowedMatches WHERE rowNum = 1 ORDER BY windowStart |
This worked as expected, with all matches per window now grouped together.
(Source: Adobe Stock)
Step 8: Hopping time windows
Finally, I tried a hopping window.
To use a hop window (also known as a sliding window) in ClickHouse, we can adjust the query to include the hop window functions. Here’s how you can rewrite the query to use a hop window with a 60 second interval and a 30 second slide:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
WITH WindowedMatches AS ( SELECT t.timestamp AS toyTimestamp, t.toyType, t.toyId, b.timestamp AS boxTimestamp, b.boxId, ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) AS timeDifferenceInSeconds, hopStart(t.timestamp, INTERVAL 30 SECOND, INTERVAL 60 SECOND) AS windowStart, hopEnd(t.timestamp, INTERVAL 30 SECOND, INTERVAL 60 SECOND) AS windowEnd, ROW_NUMBER() OVER (PARTITION BY t.toyType, hop(t.timestamp, INTERVAL 30 SECOND, INTERVAL 60 SECOND) ORDER BY ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp))) AS rowNum FROM toys t JOIN boxes b ON t.toyType = b.toyType AND ABS(toUnixTimestamp(t.timestamp) - toUnixTimestamp(b.timestamp)) <= 60 ) SELECT toyTimestamp, toyType, toyId, boxTimestamp, boxId, timeDifferenceInSeconds, windowStart, windowEnd FROM WindowedMatches WHERE rowNum = 1 ORDER BY windowStart |
This query does the following:
- The WindowedMatches CTE uses the hop function to create sliding windows with a 60 second interval and a 30 second slide based on the timestamp from the toys table.
- It joins the toys and boxes tables on toyType and ensures the timestamps are within 60 seconds of each other.
- The ROW_NUMBER() function assigns a unique row number to each match within each sliding window, partitioned by toyType and ordered by the time difference.
- The outer query filters to keep only the first match (rowNum = 1) for each toyType within each sliding window.
This should give you at most one match per toyType within each sliding window.
But which one is “correct”?
Tumbling windows in ClickHouse are non-overlapping—so the results are unique. Hopping windows in ClickHouse can overlap, so there may be some redundant results. I’ll revisit the correct window semantics later when I try the real-time streaming version out.
So, has ClickHouse lived up to its reputation for real-time time-series processing? Well, so far so good–I’ve managed to get a “streaming” problem working with static data and use joins and a couple of different time window functions to handle matching over time.
And it looks like ClickHouse has an even richer set of temporal semantics. It has lots of data and time types, time zones, time-based aggregations and batching functions, time and date grouping intervals, time and date functions, and time window functions including lag, lead, preceding and following operators.
Probably enough temporal semantics to build a time machine–at least one for real-time data processing!
(Source: Adobe Stock)
The ClickHouse SQL code examples are available on our Github.