This is the second post in our series exploring designing and developing and example IOT application with Apache Kafka to illustrate typical design and implementation considerations and patterns. In the previous blog, we introduced our Instaclustr “Kongo” IoT Logistics Streaming Demo Application. The code for Version 1 of the Kongo application was designed as an initial stand-alone tightly-coupled prototype to demonstrate the functional behaviour. It won’t cope with any failures and won’t scale beyond a single server. In this post, our goal is to re-engineer the Kongo application to introduce Apache Kafka so that it is more reliable and scalable.
In order to get Kongo ready for Kafka we decided to make an intermediate design change to make it more loosely coupled. This involved (1) adding explicit event types, and (2) adding an Event Bus to introduce topics, and allow objects to publish, subscribe and consume events to the topics. See Version 2 of the Kongo application for the code details.
1. Event Types
Kongo has two main types of events: sensor events and RFID events. Sensor events are produced by warehouses and trucks and represent measurements of metrics at a particular time and place. RFID events are produced by RFID readers when goods are moved through a warehouse dock to and from trucks and are either UNLOAD
or LOAD
events. We added classes for RFIDLoadEvent
and RIFDUnloadEvent
which are initially just containers for the data including <time, warehouseKey, goodsKey, truckKey>
. Both classes have the same fields but there is a difference in semantics (i.e. the truck is either loaded or unloaded).
We also added a Sensor event class with <time, warehouse or truck location, metric, value> fields. The simulation loop (Simulate.java
) was changed so that explicit RFID Load and Unload and Sensor event objects are now created. But what should we do with them?
2. Loose-Coupling
An earlier attempt at a loosely-coupled communication system (Source: Wikimedia)
To get the application ready for Kafka we need to make it loosely coupled in a way that will hopefully make it easier to be distributed across multiple processes and servers. As an initial step towards this, we decided to introduce a publish-subscribe eventing pattern. Typically this means creating 1 or more event queues, defining what will happen when each type of event is received, setting up subscriptions to topics, and sending events to the correct event queues. We used the Google Guava EventBus to implement this. EventBus
allows publish-subscribe-style communication between components (but isn’t a general pub-sub inter-process mechanism).
2.1 Sensor Events: One or Many Topics?
We tackled the sensor events first. The initial design was for a single sensor topic with all the sensor events (from warehouses and trucks) posted to it. E.g. in the simulate loop:
Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehouseKey, "temp", value);
sensorTopic.post(sensor);
In the original prototype code, there is an inefficient and tightly coupled call to a method to find all the Goods
at the location of the sensor and check all the rules. Introducing loose coupling means that the sensor event producers don’t need to have any knowledge of what happens to the events anymore. But something does!
The initial loosely-coupled design registered all Goods objects with the topic when the Goods were first created. I.e.
EventBus sensorTopic = new EventBus(“sensorTopic”);
for (Goods goods: allGoods.values())
topic.register(goods);
The EventBus
register method registers listener methods on the object passed in the argument. Listener methods must have a @Subscribe annotation and are specific to event types that will be posted. For example, the listener method for the Goods objects looks like this:
// subscribe Goods object listener to Sensor events
@Subscribe
public void sensorEvent(Sensor sensor)
{
System.out.println("GOT SENSOR EVENT! Object=" + tag + ", event=" + sensor.toStr());
if (goodsInLocation(event.tag)
String v = violatedSensorCatRules(sensor);
// etc
}
This worked, but had the obvious problem that every sensor event was sent to every Goods
object, which then had to check if it could ignore the event or not (based on location). Here’s a diagram showing this “many-to-many” pattern:
This won’t scale to millions of Goods
objects. One of the expected benefits of pub-sub is to enable consumers to subscribe to only a subset of events that are relevant to them. A simple improvement was therefore to have multiple EventBuses
(topics), one topic per location, and then subscribe Goods
to only the topic that they are located at. Here’s the code for publishing sensor events to the correct topic. For the full code see Github.
// Find the topic corresponding to the warehouse location
// and post the sensor event to only that topic
EventBus topic = topics.get(warehousekey);
Sensor sensor = new Sensor(time, "SENSOR WAREHOUSE", warehousekey, "temp", value);
topic.post(sensor);
This refinement worked well, and is highly scalable, even with increasing numbers of topics (e.g. 1000 warehouses + 2000 trucks = 3000 topics) and Goods objects (millions). Here’s the diagram for this pattern:
In theory there is a further refinement based on the observation that not all Goods
objects care about every sensor metric (although eventually the code might not really be loosely coupled anymore). It may therefore be possible to further refine the subscriptions by metric type.
This isn’t yet a complete solution as Goods
objects move around, and we need to run the co-location rules to check ifGoods
of different categories are permitted to travel in the same truck. As they change locations, the subscriptions must be dynamically updated. We haven’t implemented RFID event topics and handlers yet, and as this is where theGoods
movement events come from, this is obviously the next step.
2.2 RFID Events
The design for RFID events was to have an EventBus/topic for each of load and unload events, with the handlers defined in the RFIDLoadEvent
and RFIDUnloadEvent
classes (in hindsight 2 topics weren’t really necessary given that we had different RFID event types):
rfidLoadTopic = new EventBus("load");
rfidUnloadTopic = new EventBus("unload");
RFIDLoadEvent loadHandler = new RFIDLoadEvent();
rfidLoadTopic.register(loadHandler);
RFIDUnloadEvent unloadHandler = new RFIDUnloadEvent();
rfidUnloadTopic.register(unloadHandler);
What should the handlers do for each event type? For RIFDUnloadEvent the logic is as follows:
RFIDUnloadEvent
: Move theGoods
object from a truck to a warehouse
- Get the key of the
Goods
to unload from the event. - Find the goods object given the
Goods
key. - Find the location topic that the
Goods
key is currently registered with (truck). - Unregister the goods from that topic.
- Find the location topic of the warehouse.
- Register the goods object with that topic.
What this sequence of steps achieves is that theGoods
object will stop receiving sensor events from the truck location, and start receiving sensor events at the warehouse location.
The RFIDLoadEvent
handler is slightly more complex as it must also deal with co-locatedGoods
rules checking:
RFIDLoadEvent
: Move theGoods
object from a warehouse to a truck, and check for co-location rules violations
- Get the key of the
Goods
to load from the event. - Find the goods object given the
Goods
key. - Find the location topic that the
Goods
key is currently registered with (warehouse). - Unregister the goods from that topic.
- Find the location topic of the truck.
- Create colocatedCheckEvent <time, goods, truck>.
- Post this event to the truck location topic.
- Register the goods object with truck location topic.
Steps 1-6 are similar to the step for the Unload
events, but the remaining 2 steps are critical and are used to construct a new event type (colocatedCheckEvent
) and post it to the truck location. This results in all the Goods
objects that have already been loaded onto the truck checking their rules to see if they are happy with the new Goods
object to be loaded or not.
The Goods
class has a new method handler, colocatedRulesEvent(ColocatedCheckEvent event)
, which gets theGoods
object to be loaded, and checks the co-location rules between “this” object and the object to be loaded. Note that we are now sending two event types to sensor topics, and each type has a different handler.
I initially (accidently) tried a simpler solution which just re-used the sensor location topics and added the RFID event handlers to theGoods
objects. This also worked, but required each object to check if it was the intended recipient of the load/unload event. In practice it is inefficient as everyGoods
object except the one being moved would unnecessarily receive the event and then just ignore it.
What’s still missing? We probably need to do something with the rules violations (sensor and co-location). The obvious thing would be to have a violation topic and publish the violation events in that.
Did these design changes make it easier to migrate the Kongo application to Kafka? Find out next blog.
A grammatical note: Goods is both plural and singular!
That makes sense as using Good for a singular Goods (which I have caught myself doing) could cause confusion, being an ethical judgement, in contrast to “Bad”).
https://www.wordhippo.com/what-is/the-singular-of/goods.html