In Part 3 of my Cadence blog series, we introduced the Drone Delivery Demo application, focusing on the Drone Workflow. In this blog we’ll look at the Drone Delivery from the perspective of the Order Workflows, understand how the Drone and Order workflows interact, discover some extra Cadence+Kafka integration patterns, and explore some new Cadence features (e.g. retries, side effects, queries, and continue as new).
1. The Drone Delivery Application Architecture
As we explore the Drone Delivery application further, we’ll use this high-level architecture diagram to see what’s going on in more detail. The top 2 swimlanes are the Apache Kafka® components, and the bottom 2 swimlanes are the Cadence Workflows. I’ve simplified the workflow steps for clarity. Looking at the bottom swimlane, each Drone Workflow maps to an actual physical drone, with steps corresponding to drone delivery lifecycle events.
From the perspective of Orders, the numbered steps reflect the Order/Delivery lifecycle events as follows:
- Create new Order
- Create new Order Workflow
- Order ready for delivery
- Drone waiting for order
- Drone allocated an Order
- Order picked up by Drone and on way to delivery, location updated while in flight
- Order delivered and checked, order completed
Now let’s look at the Order flow in more detail.
2. New Order
A customer orders something from the Drone Delivery App to start the Order and Drone delivery process (Source: Shutterstock)
The first step in the Order lifecycle is a customer actually ordering something from a drone delivery app. We assume this triggers a Create New Order event which is placed on the Kafka New Orders topic. This brings us to the first of our new Cadence+Kafka integration patterns, the “Start New Cadence Workflow from Kafka” pattern (1) below.
This pattern is simple. An independent Kafka consumer runs forever, and picks up the next order from the New Orders topic, and using a Cadence client it creates and starts a new Order Workflow Instance. Here’s some example code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
WorkflowClient workflowClient = WorkflowClient.newInstance( new WorkflowServiceTChannel(ClientOptions.newBuilder().setHost(host).setPort(7933).build()), WorkflowClientOptions.newBuilder().setDomain(domainName).build()); Properties kafkaProps = new Properties(); try (FileReader fileReader = new FileReader("consumer2.properties")) { kafkaProps.load(fileReader); } catch (IOException e) { e.printStackTrace(); } // uses a unique group kafkaProps.put("group.id", "newOrder"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) { consumer.subscribe(Collections.singleton(newordersTopicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.print("Consumer got new Order WF creation request! "); System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); String orderName = record.value().toString(); OrderWorkflow orderWorkflow = workflowClient.newWorkflowStub(OrderWorkflow.class); System.out.println("Starting new Order workfow!"); WorkflowExecution workflowExecution = WorkflowClient.start(orderWorkflow::startWorkflow, orderName); System.out.println("Started new Order workfow! Workflow ID = " + workflowExecution.getWorkflowId()); } } } } |
So this is an example of running some Cadence code in a Kafka consumer. We’ve actually seen something similar to this before in Part 2, where we sent a signal to a running Cadence Workflow in a Kafka Consumer. The difference is that in this example we start a Cadence Workflow.
3. The Order Workflow
The Order Workflow is straightforward. Once started, it generates random order and delivery locations (that are guaranteed to be within drone delivery and return to the base range), sends a message to Kafka in an activity to say it’s ready for delivery (see below), updates location state in a loop (received via a signal from the Drone Workflow), and then waits until the “orderComplete” state to exit. Other activities are plausible, including checking for delivery violations and sending location updates to Kafka for analysis and mapping.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
public static class OrderWorkflowImpl implements OrderWorkflow { @Override public String startWorkflow(String name) { System.out.println("Started Order workflow " + name + ", ID=" + Workflow.getWorkflowInfo().getWorkflowId()); // Order creates fake order and delivery locations // randomly generated but within range of Drones startLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(baseLocation, 0.1, maxLegDistance)); System.out.println("Order WF startLocation = " + startLocation.toString()); deliveryLocation = Workflow.sideEffect(LatLon.class, () -> DroneMaths.newDestination(startLocation, 0.1, maxLegDistance)); System.out.println("Order WF deliveryLocation = " + deliveryLocation.toString()); // A real activity - request a drone - wraps a Kafka producer activities.readyForDelivery(name); boolean delivered = false; String endState = "orderComplete"; while (!delivered) { Workflow.await(() -> newState != ""); System.out.println("order " + name + " got signal = " + newState); updates.add(newState); if (newState.equals(endState)) { delivered = true; System.out.println("Order WF exiting!"); } lastState = newState; newState = ""; } return "Order " + name + " " + endState; } } |
We’ll explain the Side Effects below.
4. Drone Gets Next Order for Delivery
A taxi rank models people (orders) waiting in line for the next available taxi (drone). (Source: Shutterstock)
Once the order is ready for pickup (potentially after a delay due to order preparation time), we’re ready for the major coordination between the Drone and Order workflows, using the “get the next job from a queue” Cadence+Kafka pattern (2, 3).
The behaviour we want from this interaction is (a) drones become ready to be allocated an order to deliver, (b) orders become ready for delivery, (c) exactly one order is allocated to exactly one drone. That is we don’t want drones “fighting” over orders, drones trying to deliver more than one order, or orders that miss out being allocated a drone. (a) and (b) can occur in any order, and there may be 0 or more drones or orders ready at any time.
How does this pattern work in practice? It actually consists of two Cadence+Kafka sub-patterns.
The first pattern (2) is a simple one-way notification from Cadence to Kafka. The Order Workflow has an activity, readyForDelivery(), that wraps a Kafka producer. This is a remote call so can fail, so that’s why I used a Cadence activity even though it’s not long running, and doesn’t wait for a reply, unlike the Cadence+Kafka microservices pattern we demonstrated in blog 2, which sends a notification, and then waits for a reply from Kafka. The producer sends the Order’s ID to the “Orders Ready” topic, and then the Order Workflow blocks while waiting for a signal from a Drone to say that it has been picked up, using Workflow.await().
But how does a Drone acquire a ready order? This is where the second pattern comes in (3). The Drone Workflow has a Wait For Order Activity (3). This wraps a Kafka consumer (3a) which actually runs in the Cadence activity thread, so it is transient, lasting only as long as the activity runs. It polls the Orders Ready topic until a single order is returned (3b), resulting in the activity completing (3c).
Kafka consumers are used in a slightly atypical way to normal for this use case. There is exactly one consumer per Drone Workflow in the Wait for Order state, the consumer keeps polling the topic until a single record is returned, and then terminates. To ensure only 1 order is returned, we set max.poll.records to 1.
Each of these consumers share a common consumer group, so orders are distributed among all the waiting Drones, but only one drone can get each order. We don’t use a Kafka key, so records are just delivered to consumers in a round-robin fashion. There may be some overhead due to consumers joining and leaving the group regularly (mainly lag due to rebalancing), and as the number of drones increases the number of partitions on the topic needs to increase to ensure that there are sufficient partitions for the number of consumers. The rule is partitions >= consumers. You may be tempted to set partitions to a very high number to start with, but previous experiments reveal that too many partitions can reduce Kafka cluster throughput, and that there is an optimal number of partitions depending on the cluster size (<= 100 partitions is fine for normal operation). If you do happen to have lots more drones, just increase the size of your Kafka cluster and it will keep pace. Here’s the waitForOrder() activity implementation:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
public static class DroneActivitiesImpl implements DroneActivities { public String waitForOrder(String name) { // Kafka consumer that polls for a new Order that's been created and is ready for pickup to trigger Drone delivery trip // Each Drone can only have 1 order at a time, and each order can only be delivered by 1 drone (or drone wars may result) Properties kafkaProps = new Properties(); try (FileReader fileReader = new FileReader("consumer2.properties")) { kafkaProps.load(fileReader); } catch (IOException e) { e.printStackTrace(); } // set max.poll.records to 1 so we onlyu get 1 order at time. // All consumers waiting for order are in their own shared consumer group // NOTE that this means we need partitions >= number of Drones - assumption is this is < 100 for performance reasons kafkaProps.put("group.id", "waitForOrder"); kafkaProps.put("max.poll.records", "1"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps)) { consumer.subscribe(Collections.singleton(orderjobsTopicName)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { System.out.print("waitForOrder got an order! "); System.out.println(String.format("topic = %s, partition = %s, offset = %d, key = %s, value = %s", record.topic(), record.partition(), record.offset(), record.key(), record.value())); // ensure that we don't get this order again consumer.commitAsync(); return record.value().toString(); } } } catch (Exception e) { e.printStackTrace(); } return ""; } } |
The code is available in our Github repository.
Well, that’s enough for this blog. In the next part, we’ll continue with a summary of the Cadence+Kafka integration patterns we’ve used, and cover some of the new Cadence features in more detail.