What would you do if you received a request like this?
Looks like poor old Santa confused us with a company that produces Instant Forests. But maybe Santa knew I’d been trying out Decision Tree Machine Learning (including Random Forests) on our performance data (see previous blogs on Machine Learning with Spark)?
But then I started thinking. Obviously, Santa is “magic” (what online shop can deliver presents to millions of people on Christmas Eve!), so maybe all I had to do was come up with was a simulated virtual tree and Santa could handle the rest?
Where to start? What (in computer science or mathematics) is tree-shaped?
On the first day of Christmas
my true love sent to me:
A Quincunx machine in a Pear Tree
On the second day of Christmas
my true love sent to me:
2 Bean Machines
And a Quincunx machine in a Pear Tree
On the third day of Christmas
my true love sent to me:
3 Galton Boards
2 Bean Machines
and a Quincunx machine in a Pear Tree
On the fourth day of Christmas
my true love sent to me:
4 Pascal’s Triangles
3 Galton Boards
2 Bean Machines
and a Quincunx machine in a Pear Tree
Ok, that’s enough! Ten identical presents are rather boring. A Quincunx machine (also called Bean machine or Galton Board) is a vertical board with nails banged into it in the shape of a Christmas tree.
Here is a clip of one in action.
Balls are dropped from the top (where the “Star” would be on a tree), and bounce either left or right as they hit the nails and drop down to the next level, and so on, until they drop out the bottom. They are collected into bins at the bottom and the height of the bins will eventually approximate a bell curve. Overlaying Pascal’s triangle onto the nails shows the number of different paths that can be taken to get to each bin (as each value is the sum of the two values above it to the left and right, starting with a 1 at the top Star position). The bins towards the middle are reached by more paths and will collect more balls:
Obviously, I’m not the first person to connect these ideas. Here’s a Pascal’s Christmas Tree simulator, a Quincunx simulation, and this blog does a full The 12 days of Pascal’s triangular Christmas!
The Pascal’s Christmas tree simulator (image above) inspired me to use a similar approach for a Christmas tree lights simulator. The tree will be a Galton board, with lights at each nail position (baubles in the above picture). The positions are traditionally rows and columns starting from row 0 at the top, and columns starting from 0 on the left. The star is therefore (0,0). A light will be ON if a ball hits the nail, and will stay on for a second, and then turn OFF. Lights will turn on/off (twinkle!) as balls enter from the top, and drop down to the bottom. If balls drop in fast enough then multiple lights will be on at once.
Time to open up an early Christmas present (Kafka, which is on the Instaclustr roadmap for 2018) and use it to write a scalable Christmas tree lights simulation. Some imagination may be required (perhaps enhanced with a few glasses of Christmas port).
The design choices I had to make were around the number of producers, consumers and topics, what roles they would have, what the message structure would be, and how to handle time.
We’ll start with a single Kafka Producer to drop balls onto the top of the tree (at the Star), with a new ball arriving every second. The “star” is just a simple java producer running in a thread which sends a (0,0) message to the “tree” topic, and sleeps for a second before repeating until the desired number of balls have been released.
Initially, I had thought of using multiple topics to capture the topology of the relationship between the lights, but his would have been too complicated for larger trees. A simpler solution using a single topic (called “tree”) is more obvious. I also decided on a very simple message structure using the Kafka message key as the row value, and the message value as the column. Note that for multiple partition topics this may not be ideal as the key is used for distributing messages across partitions and the small number of row values may not result in a good hash function.
Currently the tree only has the top starlight illuminated all the time. What’s missing? Gravity! To simulate gravity we need a consumer/producer pair, I called this “twinkle”. To twinkle, we receive a message (ball location) from the tree topic, randomly decide if the ball will go left or right, and then publish a message consisting of the new location back to the “tree” topic. Will this work? Not really. This is just an infinite loop, what we really need is a delay queue or explicit timestamp handling, so we can delay the processing of the new ball location until the 1 second ON time has elapsed. A hack for this is to sleep the Twinkle application consumer thread so it only checks for new messages periodically. This is what we have so far.
What’s missing? What should we do with balls that reach the bottom of the tree (the last row)? We could just make them vanish (by not sending a message back to the tree topic). Or better (in the spirit of the Galton Board) let’s add a “count” topic and a Count Consumer to add up the number of balls that fall into each bin under the tree:
The final problem is how to display the tree and lights state? The simplest solution is to have another consumer which subscribes to the “tree” topic and changes the state of the correct lights for each new message in the topic (change of ball location). This is possible and was the approach I tried first. However, the Twinkle and Display consumers have to be in different consumer groups (because of the way the Kafka protocol works, to ensure that they both get every message published to the tree topic), and computing the state and handling timing was tricky:
An improved version computes the state change of the lights in the Tinkle application (step C), and sends a state change message (light location OFF or ON) to the corresponding dedicated topic (Light OFF, Light ON). Every second, the State Display consumer applies all the OFF messages first, and then the ON messages, and then prints the tree lights state (in ASCII). Each “*” is an ON light ON, each “.” is an OFF light.
Here’s a sequence that could be printed for a simple tree with 3 rows, and a single ball dropped in at the top:
Time 1: star lights up (0,0)
*
..
…
Time 2: light at (1,1) ON
.
.*
…
Time 3: light at (2,1) ON
.
..
.*.
Time 4: All lights OFF
.
..
…
Here’s an (edited) run with 10 rows and 100 balls:
Welcome to the Instaclustr XMAS Tree Simulator!!!!!
*
..
…
….
…..
……
…….
……..
………
……….
Etc (lots of balls)
*
**
**.
.**.
..**.
..***.
…**..
*..**…
*..*…..
..*.***…
Etc (no more balls arriving, eventually end up with all lights OFF)
.
..
…
….
..*..
..**..
..**…
..**….
…**….
…..**…
A Happy Normally Distributed Xmas! Counts:
col 0 = 1
col 1 = 2
col 2 = 7
col 3 = 19
col 4 = 28
col 5 = 25
col 6 = 14
col 7 = 4
col 8 = 0
col 9 = 0
Total events counted = 100
More interesting christmas lights with colour could have been simulated by using a richer message structure, e.g. a message number as the key, and a compound value type consisting of (row, col, colour).
Here’s a simplified topology diagram showing just the relationship between producers, topics and consumers.
Is this the best/correct approach for:
- Computing and keeping track of the state of the lights?
- Probably not. Kafka streams and a KTable (which maintains state) may be better. Here’s a blog.
- Handling time?
- Probably not. As I mentioned, using a delay queue or explicit timestamp handling would be better. Essentially I’m using Kafka as a discrete event simulation (DES) which it isn’t really designed for, but in theory, it should work as all you need is events and timestamps. I (briefly) tried using the different timestamp extractors but had a few issues, I suspect that they are designed to work (best) with Kafka streams. So maybe Santa could get a few elves to add some code for this, perhaps using windows.
Will Santa be happy with this? He should be! Given that it’s written in Kafka it will scale massively. It should be easy to increase the speed of the simulation (i.e. run it flat out), increase the size of the tree, and even simulate a forest of trees, each with a slightly different algorithm and/or starting condition, and multiple different ways of displaying the tree/lights. There’s also reprocessing (that I mentioned in this blog), where Kafka persists all the messages, so consumers can choose which messages to process. Consumers could display any historic state of the tree lights.
Canberra was in the news a few years ago with a charity fundraising world record for the number of lights on an artificial Christmas tree. What does ½ a million lights look like?
I just had to see if the Kafka Christmas lights simulator was up to the challenge. It was.
A simulated tree with 500,000 lights and 100,000 balls dropped in ran in 555s, processed over 400 million producer + consumer events, and ran 180 times faster than real-time (running flat out), achieving an average throughput of 43 million events per minute, not bad with everything (Kafka broker and java code) running on my laptop. Why stop there? 1 million and 2 million worked fine, can I claim the world record for the most lights on a simulated Christmas tree?
Here’s the code (Java). Note that I made a minor change to the twinkle application to correctly process the Star light. The Star producer now sends a special message (-1, -1) to Twinkle which interprets this as a special case, i.e. an arriving ball with no location yet, and sends a message back to the tree topic with the star location (0,0) and a (0,0) message to the Light ON topic.
Please check out our Spark Streaming, Kafka and Cassandra Tutorial for details on installing and running Kafka and Zookeeper and visit our GitHub to access all Java Code.
Java Kafka Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 |
KafkaProperties.java: package kafkaxmas; public class KafkaProperties { public static final Boolean DEBUG = false; public static final Boolean DISPLAY = true; public static final String KAFKA_SERVER_URL = "localhost"; public static final int KAFKA_SERVER_PORT = 9092; public static final int KAFKA_PRODUCER_BUFFER_SIZE = 64 * 1024; public static final int CONNECTION_TIMEOUT = 100000; public static final int DELAY = 1000; public static final String TOPIC = "topicTree"; public static final String TOPIC2 = "topicCount"; public static final String TOPICOFF = "topicOff"; public static final String TOPICON = "topicOn"; public static final String CLIENT_ID = "KafkaXmasTreeDemo"; public static final int TREE_ROWS = 10; public static final int BALLS = 1000; private KafkaProperties() {} } XmasTree.java: package kafkaxmas; /* * Top level main program to run Xmas Tree Simulation. * start star producer, twinkle consumer/producer, and consumers for count and display. */ public class XmasTree { public static void main(String[] args) { boolean isAsync = args.length == 0 || !args[0].trim().equalsIgnoreCase("sync"); System.out.println("Welcome to the Instaclustr XMAS Tree Simulator!!!!!"); // Start balls dropping onto top of tree. StarProducer producerThread = new StarProducer(KafkaProperties.TOPIC, isAsync); producerThread.start(); // Start state display consumer. New version with both topics, OFF and ON, passed via args. StateDisplayConsumer displayTree = new StateDisplayConsumer(KafkaProperties.TOPICOFF, KafkaProperties.TOPICON); displayTree.start(); // start count consumer, subscribes to TOPIC2 CountConsumer counts = new CountConsumer(KafkaProperties.TOPIC2); counts.start(); // start twinkle consumer/producer application, subscribe to same topic as the star producer. Twinkle twinkleThread = new Twinkle(KafkaProperties.TOPIC); twinkleThread.start(); // Note that even though the star producer eventually stops, the other threads keep running for ever. } } StarProducer.java: package kafkaxmas; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; // Producer which drops balls onto the top of the tree. // Drop one "ball" onto top of tree every DELAY period, stops when number of balls dropped == BALLS. // Message is of format: key=row, value=col // Changed to send (-1, -1) rather than the original (0, 0), to indicate that the ball needs to be processed specially by Twinkle. public class StarProducer extends Thread { private final KafkaProducer<Integer, Integer> producer; private final String topic; private final Boolean isAsync; private final Boolean debug = KafkaProperties.DEBUG; private final long zeroTime = System.currentTimeMillis(); public StarProducer(String topic, Boolean isAsync) { Properties props = new Properties(); props.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put("client.id", "StarProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producer = new KafkaProducer<>(props); this.topic = topic; this.isAsync = isAsync; } public void run() { int messageNo = 1; int maxMessages = KafkaProperties.BALLS; while (messageNo <= maxMessages) { int row = -1; int col = -1; long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, row, col), new DemoCallBack(startTime, row, col)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, row, col)).get(); if (debug) System.out.println("Sent message: (" + row + ", " + col + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } ++messageNo; long nowTime = System.currentTimeMillis(); if (debug) System.out.println("Star *** " + (nowTime - zeroTime) + ": " + messageNo); try { Thread.sleep(KafkaProperties.DELAY); } catch (InterruptedException e) { e.printStackTrace(); } } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final int message; public DemoCallBack(long startTime, int key, int message) { this.startTime = startTime; this.key = key; this.message = message; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() + "), " + "offset(" + metadata.offset() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } } Twinkle.java: package kafkaxmas; import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Collections; import java.util.Properties; import java.util.Random; /* * Consumer/producer Pair. * Receives from tree topic, sends to count and On/Off state change topics. * Simulated gravity by reading event from tree topic, transforming it the next row and randomly left or right cols. * If event is special (-1, -1) value then a new ball has arrived, send new event (0,0) to tree topic and a Light ON state change. * Else * Send current position Light OFF state change * If on bottom row, then send event to count topic. * Else transform to next row and randomly left or right and send new event to tree topic and light ON state change. * * What are valid producer configs? https://kafka.apache.org/documentation.html#producerconfigs */ public class Twinkle extends ShutdownableThread { private final KafkaProducer<Integer, Integer> producer; private final KafkaConsumer<Integer, Integer> consumer; private final String topic; Boolean debug = KafkaProperties.DEBUG; int maxRow = KafkaProperties.TREE_ROWS; // size of tree static Random rand = new Random(); boolean display = KafkaProperties.DISPLAY; // display ASCII tree or not public Twinkle(String topic) { super("Twinkle", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, "Twinkle"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; consumer.subscribe(Collections.singletonList(this.topic)); // producer, shared across all output topics Properties pprops = new Properties(); pprops.put("bootstrap.servers", KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); pprops.put("client.id", "TwinkleProducer"); pprops.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); pprops.put("value.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); producer = new KafkaProducer<>(pprops); } @Override public void doWork() { int row; int col; long pollInterval = 1000; ConsumerRecords<Integer, Integer> records = consumer.poll(pollInterval); for (ConsumerRecord<Integer, Integer> record : records) { if (debug) System.out.println("Twinkle got records = " + records.count()); if (debug) System.out.println("Twinkle: processing record = (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); row = record.key(); col = record.value(); if (row == -1) // ball dropped in from the top, put it in Star location (0,0) and turn it ON for a second { row = 0; col = 0; // put (0,0) into topic and send ON event producer.send(new ProducerRecord<Integer, Integer>(this.topic, row, col)); if (debug) System.out.println("Twinkle STAR ON + (" + row + ", " + col + ") ON"); producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICON, row, col)); } else { // turn light OFF in current position if (debug) System.out.println("Twinkle + (" + row + ", " + col + ") OFF"); producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICOFF, row, col)); // increment row (gravity!), if row >= maxRow then don't publish back to tree topic, send to count topic instead int nextRow = row + 1; if (nextRow >= maxRow) { if (debug) System.out.println("Twinkle, ball on maxRow!"); // ball drops off bottom, so send event to TOPIC2 for counting producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPIC2, row, col)); } else // random pick left or right direction and send new location back to tree topic and ON state change // { int nextCol = col; // choose left or right bulb if (rand.nextBoolean()) nextCol += 1; if (debug) System.out.println("Twinkle: next " + nextRow + ", " + nextCol); producer.send(new ProducerRecord<Integer, Integer>(this.topic, nextRow, nextCol)); if (debug) System.out.println("Twinkle + (" + nextRow+ ", " + nextCol + ") ON"); producer.send(new ProducerRecord<Integer, Integer>(KafkaProperties.TOPICON, nextRow, nextCol)); } } } // processed all records obtained in poll above, now sleep for some time so that lights will stay on for a while. try { Thread.sleep(KafkaProperties.DELAY); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } CountConsumer.java: package kafkaxmas; import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Collections; import java.util.Properties; // consumer to count the number of balls in each bin when they drop off the bottom public class CountConsumer extends ShutdownableThread { private final KafkaConsumer<Integer, Integer> consumer; private final String topic; private final int maxCols = KafkaProperties.TREE_ROWS; private long counts[] = new long[maxCols]; int balls = KafkaProperties.BALLS; // number of balls expected Boolean debug = KafkaProperties.DEBUG; int runningTotal = 0; long startTime = System.currentTimeMillis(); public CountConsumer(String topic) { super("XmasTreeCountConsumer", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, "XmasTreeCountConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumer = new KafkaConsumer<>(props); this.topic = topic; consumer.subscribe(Collections.singletonList(this.topic)); } @Override public void doWork() { long pollInterval = 1000; ConsumerRecords<Integer, Integer> records = consumer.poll(pollInterval); for (ConsumerRecord<Integer, Integer> record : records) { if (debug) System.out.println("Count Consumer records = " + records.count()); if (debug) System.out.println("Count Consumer, Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); if (record.value() < maxCols) { counts[record.value()]++; runningTotal++; } } if (debug) System.out.println("Count = " + runningTotal); // only display counts at end long sum = 0; for (int i=0; i < maxCols; i++) sum += counts[i]; if (sum >= balls) { System.out.println("A Happy Normally Distributed Xmas! Counts:"); for (int i=0; i < maxCols; i++) System.out.println("col " + i + " = " + counts[i]); System.out.println("Total events counted = " + sum); long endTime = System.currentTimeMillis(); System.out.println("Total time = " + (endTime - startTime)); } try { Thread.sleep(KafkaProperties.DELAY); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } StateDisplayConsumer.java: package kafkaxmas; import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Collections; import java.util.Properties; /* * Consumer for the tree display, receives events from ON and OFF topics and displays ASCII tree. * Modified version, single consumer subscribes to 2 topics and uses records method to return events for each topic in turn. * Consumer docs: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html * Consumer config docs: https://kafka.apache.org/documentation.html#consumerconfigs */ public class StateDisplayConsumer extends ShutdownableThread { private final KafkaConsumer<Integer, Integer> consumer; private final Boolean debug = KafkaProperties.DEBUG; private final String topic1; private final String topic2; // all lights off by default (false) private final int maxRows = KafkaProperties.TREE_ROWS; private final int maxCols = maxRows; private final boolean[][] tree = new boolean[maxRows][maxCols]; public StateDisplayConsumer(String topic1, String topic2) { super("XmasTreeStateDisplayConsumer", false); Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); props.put(ConsumerConfig.GROUP_ID_CONFIG, "XmasTreeStateDisplayConsumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer"); consumer = new KafkaConsumer<>(props); this.topic1 = topic1; this.topic2 = topic2; consumer.subscribe(Arrays.asList(topic1, topic2)); } @Override public void doWork() { // compute current state of lights from OFF and ON messages // Process events in OFF Topic first so if a light changes from OFF to ON instantaneously it will stay on. long pollInterval = 1000; // get records for all topics ConsumerRecords<Integer, Integer> recordsAll = consumer.poll(pollInterval); // get records for OFF topic only //ConsumerRecords<Integer, Integer> recordsOFF = (ConsumerRecords<Integer, Integer>) recordsAll.records(KafkaProperties.TOPICOFF); Iterable<ConsumerRecord<Integer, Integer>> recordsOFF = recordsAll.records(topic1); for (ConsumerRecord<Integer, Integer> record : recordsOFF) { //if (debug) System.out.println("Display Consumer OFF records = " + recordsOFF.count()); if (debug) System.out.println("Display Consumer, OFF Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); // paranoid check in case we had a bigger tree in a previous run and some messages are still hanging around unprocessed. if (record.key() < maxRows && record.value() < maxCols) tree[record.key()][record.value()] = false; } // Now process ON topic messages Iterable<ConsumerRecord<Integer, Integer>> recordsON = recordsAll.records(topic2); //consumer.subscribe(Collections.singletonList(KafkaProperties.TOPICON)); // ConsumerRecords<Integer, Integer> records2 = consumer.poll(pollInterval); for (ConsumerRecord<Integer, Integer> record : recordsON) { //if (debug) System.out.println("Display Consumer ON records = " + recordsON.count()); if (debug) System.out.println("Display Consumer, ON Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset()); // paranoid check in case we had a bigger tree in a previous run and some messages are still hanging around unprocessed. if (record.key() < maxRows && record.value() < maxCols) tree[record.key()][record.value()] = true; } // display tree as ASCII for (int i=0; i < maxRows; i++) { int indent = (maxRows/2) - (i/2); for (int a=0; a < indent; a++) System.out.print(" "); for (int j=0; j <= i; j++) { if (tree[i][j]) System.out.print("*"); else System.out.print("."); } System.out.println(); } // only display the tree every second try { Thread.sleep(KafkaProperties.DELAY); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public String name() { return null; } @Override public boolean isInterruptible() { return false; } } |