Event stream processing unplugged - part 1

Introduction

Welcome to event streaming unplugged, this is the first in a series of articles covering the topic of realtime evemt stream processing. This is a practical programming series with resources at unplugged part 1. What you will learn in these articles:
  • The basics of unbounded event processing
  • Functional reactive programming introducing concepts like map, filtering, groupBy, flatMap
  • Constructing directd acyclic execution graphs
  • Managing event streams of heterogenous types
  • Imperative event programming, integrating application code
  • Input and output
  • Building, testing and deploying
  • Auditing, logging and debugging event execution flow
This article starts with a simple hello world and then moves onto a more complex example. The second example calculates the mark to market value for a set of traded currency pairs. Two independent event streams, instrument market price and executed trades are processed. Batches of trades and reset signals are also supported. A more complex example serves as a meaningful comparison to an imperative solution.

The opensource event stream processing library Fluxtion is used to implement the examples. I hope to receive feedback on the library and make improvements.

Unplugged from what?

There are many great products and projects available that offer realtime stream processing capabilities, but in general they come connected to an infrastructure service or some other unrelated concerns. This series is focused on processing logic so we are unplugged from:
  • Threads - No RxJava, Flow, co-routines
  • Messaging infrastructure - No kafka streams, Amazon kinesis
  • Grid processing - No Hazelcast jet, Akka streams
We are solely concerned with the logic of processing event streams. Imagine if Java 8 streams were tightly coupled to a persistent store being present. Although useful in many situations this would create confusion for developers and reduced the uptake of library. 

Concerns are separated if not of core importance. Event stream processing as a subject suffers from complexity overload, it is my strong opinion less is more in this domain. Simplicity is the key to understanding and deriving value from event stream programming. It is the application's responsibility to read data, create events, manage threads, integrate a stream processor and route events to it.  

If we follow this route we will guarantee independence from vendor solutions that bundle business logic processing with a required supporting service. Ultimately we can then move business logic to the best supporting infrastructure without being tied to some form of middleware. 

Real world trading experience

Over the last 15 years I have been conected with algorithmic trading on both the buy and sell side. I feel I can make these observations about event stream programming:
  • If you are involved with algorithmic trading you will probably come across this pattern
  • Almost all banks/funds have some home grown stream processing derivative they use
  • The learning curve is steep but worth it
  • Very useful once understood, the danger is all problems are nails for the stream processing hammer
  • Hand coding listeners/events becomes untenable over time creating a large technical debt. Forcing the use of a stream processing library.
  • There is usually an existing messaging and threading infrastructure that cannot be changed. Processing logic engines must integrate easily
  • Single threaded applications with little shared state reading from lock free queues are generally faster, 
  • Applications that are driven purely from events, are deterministic, easier to reason about, simpler to test and replay from an hsitoric data source
  • Determining event flow in the processing engine requires tool support, without it developer productivity plummets.
The key message is coding event processing logic by hand is error prone, expensive and becomes untenable as complexity rises. Generally most teams observe this problem and introduce an event processing framework to remove the technical debt.

Hello world event stream

To get started here is five minute hello world event stream primer, before exploring a more complex example. 

Listen to two unbounded streams of data, extract a value from a stream when a new event is received. Apply a binary function to the two values, in this case add. If the sum is greater than 100 then log the value to the console. The function is stateless but the streams move independently and must preserve the last value received to apply as an argument to the function. The function and dependent filter are exercised if either event stream updates.

Code example

01    public class HelloWorld {
02 public static void main(String[] args) {
03 //builds the EventProcessor
04
EventProcessor eventProcessor = Fluxtion.interpret(cfg -> {
05 var data1Stream = subscribe(Data1.class)
06 .console("rcvd -> {}")
07 .mapToDouble(Data1::value);
08

09
subscribe(Data2.class)
10 .console("rcvd -> {}")
11 .mapToDouble(Data2::value)
12 .map(Double::sum, data1Stream)
13 .filter(d -> d > 100)
14 .console("OUT: sum {} > 100");
15
});
16
//init and send events
17
eventProcessor.init();
18
//no output < 100
19
eventProcessor.onEvent(new Data1(20.5));
20
//no output < 100
21
eventProcessor.onEvent(new Data2(63));
22
//output > 100 - log to console
23
eventProcessor.onEvent(new Data1(56.8));
24
}
25
26 public record Data1(double value) {
27 }
28
29 public record Data2(double value) {
30 }
31 }
Line 4-15: Creating an in-memory event processor, with 
The instance returned is an event processor that is target for application events and contains all processing logic. The next 10 lines are the construction logic for the excution graph. 

Lines 5-11: Subscribe to two independent unbounded streams for data types Data1 and Data2. The subscribe command creates a node in the graph that is connected to incoming events of that type. Extract the double value for each stream with a map operation. A console operation peeks into the stream and logs the value when the double value is updated. A fluent style api chains the map operations as a child of the parent node.

Line 12: Apply the binary function Double#sum if either parent stream updates and store the result in this stream. The two streams are joined at the binary function, effectively creating a connected graph.

Line 13: Apply the filter every time the sum updates, if the filter returns true, the value of the sum is published to the child operation, in this case log to console

Events are sent to the event processor with a call to EventProcessor#onEvent. The processor will take care of all dispatch logic, state management, invoking functions with the expected values and in the correct order 

Execution output

1    rcvd -> Data1[value=20.5]
2 rcvd -> Data2[value=63.0]
3 rcvd -> Data1[value=56.8]
4 OUT: sum 119.8 > 100

Comparison to java 8 streams

On the surface the code looks very similar to java 8 streams, intentionally so. So what are the differences in behaviour to java 8 streams:
  • The data is unbounded, in java streams finite sets of data are processed and a terminal operation triggers the processing.With the event processor a new event initiates a process cycle.
  • The event processor is long lived and state is held in the graph, such as the last event value recieved. Streams are essentially stateless objects that are transient and cannot be re-used.
  • Streams are single pipelines with a single execution path, an event processor is a graph of nodes with an arbitrary number of execution paths.
  • Streams only handle a single event type, the event processor supports an arbitrary number of input event types.
  • Event processors have a lifeycle, they can be started and stopped streams are one shot in operation.

Processing graph

Under the covers the call to Fluxtion#interpret generates an event processing graph that will be used to process events, The graph is a topologically sorted set of nodes. An event processor ensures nodes are invoked in topological and only if one of their parent has been invoked previously. 

The hello world event processor graph as a diagram:





Fluxtion

The open source Fluxtion library has been created from my experience in various funds and banks and the demands of event stream processing. The library is confined to processing logic and the building of complex processsing graphs. Advanced features such as ahead of time source code generation and zero gc are supported. I hope to get feedback from people following the tutorials and improve the end product.

The fluxtion streaming api is intended to be similar to the java streams api to reduce the learning curve for developers new to the event stream programming paradigm.

Realtime trade calcualtion

The hello world example is a gentle introduction that is easily implemented with a handwritten solution. We next address a problem of medium complexity to demonstrate the advantage of using a framework over a bespoke solution. See here for the code example.

Requirements

  • Publish to an application component, mark to market, positions and overall profit for a set of traded instruments. 
  • Values are updated whenever there is a new trade, an instrument price update or a reset action 
  • Batches of trades should result in a single update. 
  • The system supports a reset action returning all values to their intial state. 
  • Only publish values that have changed

Input events from the application

Trade
Instrument prices
Reset signal

Outputs published to listeners

Position: A vector of currency trade positons
Mark to market: A vector of currency positon converted to USD equivalent value 
Profit: The sum of mark to market vector

Calculations

  • One FX trade creates two position chnages one for each currency, positive for a buy negative for a sell. 
  • Currency positions from each trade contributes to a cumulative position for that currency.  
  • The position for a currency is used to calculate the market value relative to another currency, in this case USD. 
  • If no currency->USD rate is available the mark to market value is NaN for that currency position. 
  • The sum of all currency mark to market values is the overall protit or loss of the trading position. 

Solution

The interesting code for the solution is all in one class, TradingCalculator.java with three main sections
  • Building the graph - describes and constructs an instance of the event processor that supports the business logic.
  • Public service methods - a set of methods for use by the enclosing application, each method results in one or more events pushed to the graph.
  • Static helper functions - stateless functions that are invoked by various nodes in the event processor

Building the graph

All of the constructs will be covered in greater detail in future articles, the point of this article is to compare building a similar solution with an imperative model, so the code breakdown is brief
01    private void buildProcessor(SEPConfig config) {
02 var resetTrigger = subscribeToSignal("reset");
03
var publishTrigger = subscribeToSignal("publish");
04

05
var assetPosition = subscribe(Trade.class)
06 .flatMap(Trade::tradeLegs)
07 .groupBy(TradeLeg::id, TradeLeg::amount, Aggregates.doubleSum())
08 .resetTrigger(resetTrigger);
09

10
var assetPriceMap = subscribe(PairPrice.class)
11 .map(TradingCalculator::toCrossRate)
12 .groupBy(Trade.AssetPrice::id, Trade.AssetPrice::price)
13 .resetTrigger(resetTrigger);
14

15
var posDrivenMtmStream = assetPosition.map(GroupByStreamed::keyValue)
16 .map(TradingCalculator::markToMarketPosition, assetPriceMap.map(GroupBy::map))
17 .updateTrigger(assetPosition);
18

19
var priceDrivenMtMStream = assetPriceMap.map(GroupByStreamed::keyValue)
20 .map(TradingCalculator::markToMarketPrice, assetPosition.map(GroupBy::map))
21 .updateTrigger(assetPriceMap);
22

23
//Mark to market to sink as a map
24
var mtm = posDrivenMtmStream.merge(priceDrivenMtMStream)
25 .groupBy(KeyValue::getKey, KeyValue::getValueAsDouble)
26 .resetTrigger(resetTrigger)
27 .map(GroupBy::map)
28 .updateTrigger(publishTrigger)
29 .filter(Predicates.hasMapChanged())
30 .sink("mtm");
31

32
//Positions to sink as a map
33
assetPosition.map(GroupBy::map)
34 .updateTrigger(publishTrigger)
35 .filter(Predicates.hasMapChanged())
36 .sink("positions");
37

38
//sum of mtm is profit
39
mtm.mapToDouble(TradingCalculator::totalProfit)
40 .filter(Predicates.hasDoubleChanged())
41 .sink("profit");
42
}

line 2-3 Because data is always live we sometimes need to override when events are propogated from a node. The subscribitions connect a node to a signal event, keyed by a string key. These nodes are used later in the graph for triggering publishing and resetting behaviour.

 line 5-8 Publishes a positon map. Subscribes to Trade objects and uses a flatmap to create an iteration over the trade legs. A groupby maps with a cumulative sum function for each key and stores the result in a map. This is the whole position calculation. GroupBy is a stateful node, holding the keys and values in a map. GroupBy node can be reset and the underlying map is cleared, the reset trigger is connected to the reset signal node above.

line 10-12 Publishes an asset price to base currency map. Subscribes to PairPrice and uses groupBy to partition into a map keyed by asset currency. A map function calls into a static user function TradingCalculator#toCrossRate to calculate the mark to market rate.

line 15-17 Creates a stream of mark to market updates for an asset, triggered when the assetPosition has updated. TradingCalculator#markToMarketPosition is a stateless binary function and would trigger if either input changes, so we override the update trigger

line 19-21 Creates a stream of mark to market updates for an asset, triggered when the assetPriceMap has updated

line 24-30 Publishes the mark to map to a sink when there is a change in the map. The two market to market streams are merged into a single update stream. The update is merged into a map, and the whole map is published downstream(line 27) to an extrnal sink. A filter gates the update to the sink and checks for a change to the previous published version. Triggers for resetting and overriding the update are provided. Batches of trades are handled the update is published when the batch is finished processing.

line 33-36 Publishes position map to a sink. Update trigger is overriden and changed filter is applied to the output.

line 39-41 Publishes total profit to a sink. Triggered from a change to the upstream node, mtm. Applies a stateless function TradingCalculator#totalProfit to calculate the profit sum in the book 

A note on input and output:

  • Subscriptions feed data from the application into the event processor
  • Sinks push data from a node in the event processor to an external consumer

Mapping to events

Service methods are mapped into events for consumption by the event processor. Event processor supports helper methods for sending keyed signals and registering sinks for output, these utility methods are only wrappers around sending events to the event processor. 
01    public TradingCalculator() {
02 streamProcessor = Fluxtion.interpret(this::buildProcessor);
03
streamProcessor.init();
04
}
05
06 public void processTrade(Trade trade) {
07 System.out.println("\nrcvd trade -> " + trade);
08
streamProcessor.onEvent(trade);
09
streamProcessor.publishSignal("publish");
10
}
11
12 public void priceUpdate(PairPrice price) {
13 System.out.println("\nrcvd price -> " + price);
14
streamProcessor.onEvent(price);
15
streamProcessor.publishSignal("publish");
16
}
17
18 public void reset() {
19 System.out.println("\nreset");
20
streamProcessor.publishSignal("reset");
21
streamProcessor.publishSignal("publish");
22
}
23
24 public void markToMarketConsumer(Consumer<Map<String, Double>> listener) {
25 streamProcessor.addSink("mtm", listener);
26
}
27
28 public void positionsConsumer(Consumer<Map<String, Double>> listener) {
29 streamProcessor.addSink("positions", listener);
30
}
31
32 public void profitConsumer(DoubleConsumer listener) {
33 streamProcessor.addSink("profit", listener);
34
}

Helper functions

A set of static helper methods are invoked by the graph, referenced in #map invocations in the graph construction. Theese functions are specific for the needs of the application but are not special in anyway, library functions like Double#sum can be used if suiitable. 
01    public static KeyValue<String, Double> markToMarketPrice(
02 KeyValue<String, Double> assetPrice, Map<String, Double> assetPositionMap) {
03 if (assetPrice == null || assetPositionMap.get(assetPrice.getKey()) == null) {
04 return null;
05
}
06 return new KeyValue<>(assetPrice.getKey(), assetPositionMap.get(assetPrice.getKey()) * assetPrice.getValue());
07
}
08
09 public static KeyValue<String, Double> markToMarketPosition(
10 KeyValue<String, Double> assetPosition, Map<String, Double> assetPriceMap) {
11 if (assetPosition == null) {
12 return null;
13
}
14 if (assetPosition.getKey().equals(baseCurrency)) {
15 return new KeyValue<>(assetPosition.getKey(), assetPosition.getValue());
16
}
17 if(assetPriceMap == null){
18 return new KeyValue<>(assetPosition.getKey(), Double.NaN);
19
}
20 return new KeyValue<>(
21 assetPosition.getKey(),
22
assetPriceMap.getOrDefault(assetPosition.getKey(), Double.NaN) * assetPosition.getValue());
23
}
24
25 public static double totalProfit(Map<String, Double> m) {
26 return m.values().stream().mapToDouble(Double::doubleValue).sum();
27
}
28
29 public static AssetPrice toCrossRate(PairPrice pairPrice) {
30 if (pairPrice.id().startsWith(baseCurrency)) {
31 return (new AssetPrice(pairPrice.id().substring(3), 1.0 / pairPrice.price()));
32
}
33 return (new AssetPrice(pairPrice.id().substring(0, 3), pairPrice.price()));
34
}

Running the example

Below is the actual output for running the main program. It is worth noting in the output how filters only publish data if there has been a change to the upstream value. This can greatly reduce load on downstream systems.

Main program

Creates the trading calculator, registers sinks for displaying output to console and then exercises the service interface methods of the calculator.
01    public static void main(String[] args) {
02 TradingCalculator tradingCalculator = new TradingCalculator();
03
//add listeners for output
04
tradingCalculator.markToMarketConsumer(
05 m -> System.out.println("Asset mark to market\t:" + m));
06
tradingCalculator.positionsConsumer(
07 m -> System.out.println("Asset positions\t\t\t:" + m));
08
tradingCalculator.profitConsumer(
09 d -> System.out.println("Total trading profit\t:" + d));
10
//send trades and rates
11
tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
12
tradingCalculator.processTrade(Trade.bought("EURUSD", 250d, 130d));
13
tradingCalculator.processTrade(Trade.sold("EURCHF", 120d, 100d));
14
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
15
tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.2));
16
tradingCalculator.processTrade(Trade.bought("GBPJPY", 20d, 26000d));
17
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.0));
18
//reset
19
tradingCalculator.reset();
20
//trade batch after reset
21
tradingCalculator.priceUpdate(new PairPrice("EURUSD", 1.5));
22
tradingCalculator.priceUpdate(new PairPrice("GBPUSD", 1.25));
23
tradingCalculator.priceUpdate(new PairPrice("USDJPY", 202));
24
tradingCalculator.priceUpdate(new PairPrice("USDCHF", 1.25));
25
tradingCalculator.processTrades(
26 Trade.bought("EURUSD", 20d, 11d),
27
Trade.bought("GBPJPY", 20d, 26000d),
28
Trade.sold("EURCHF", 120d, 100d)
29 );
30
}

Execution output

The output is self explanatory and demonstrates the functionality for:
  • Position calculations
  • Mark to makrket calcualtions
  • Total profit calculation
  • Only publishing changes to sinks
  • Resetting to zero state
  • Handling sets of trades as a batch
01    rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
02 Asset mark to market :{EUR=NaN, USD=-130.0}
03 Total trading profit :NaN
04 Asset positions :{EUR=250.0, USD=-130.0}
05
06 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=250.0], contra=TradeLeg[id=USD, amount=-130.0]]
07 Asset mark to market :{EUR=NaN, USD=-260.0}
08 Asset positions :{EUR=500.0, USD=-260.0}
09
10 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
11 Asset mark to market :{CHF=NaN, EUR=NaN, USD=-260.0}
12 Asset positions :{CHF=100.0, EUR=380.0, USD=-260.0}
13
14 rcvd price -> PairPrice[id=EURUSD, price=1.5]
15 Asset mark to market :{CHF=NaN, EUR=570.0, USD=-260.0}
16
17 rcvd price -> PairPrice[id=USDCHF, price=1.2]
18 Asset mark to market :{CHF=83.33333333333334, EUR=570.0, USD=-260.0}
19 Total trading profit :393.33333333333337
20
21 rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
22 Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=570.0, GBP=NaN, USD=-260.0}
23 Total trading profit :NaN
24 Asset positions :{CHF=100.0, JPY=-26000.0, EUR=380.0, GBP=20.0, USD=-260.0}
25
26 rcvd price -> PairPrice[id=EURUSD, price=1.0]
27 Asset mark to market :{CHF=83.33333333333334, JPY=NaN, EUR=380.0, GBP=NaN, USD=-260.0}
28
29 reset
30 Asset mark to market :{}
31 Total trading profit :0.0
32 Asset positions :{}
33
34 rcvd price -> PairPrice[id=EURUSD, price=1.5]
35
36 rcvd price -> PairPrice[id=GBPUSD, price=1.25]
37
38 rcvd price -> PairPrice[id=USDJPY, price=202.0]
39
40 rcvd price -> PairPrice[id=USDCHF, price=1.25]
41
42 Trade batch - start
43 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=20.0], contra=TradeLeg[id=USD, amount=-11.0]]
44 rcvd trade -> Trade[dealt=TradeLeg[id=GBP, amount=20.0], contra=TradeLeg[id=JPY, amount=-26000.0]]
45 rcvd trade -> Trade[dealt=TradeLeg[id=EUR, amount=-120.0], contra=TradeLeg[id=CHF, amount=100.0]]
46 Trade batch - complete
47 Asset mark to market :{CHF=80.0, JPY=-128.7128712871287, EUR=-150.0, GBP=25.0, USD=-11.0}
48 Total trading profit :-184.7128712871287
49 Asset positions :{CHF=100.0, JPY=-26000.0, EUR=-100.0, GBP=20.0, USD=-11.0}
50
51 Process finished with exit code 0

Comparing to an imperative solution

As with any imperative versus functional approach the same arguments hold true, where appropriate functional solutions are advantageous. Functional solutions are not always the best fit for every problem when compared to imperative approaches but in this case I think the balance is in favour of the functional style.

State 

There are at least six items of state, for maps and filters. These would have to be coded and managed correctly in the imperative solution all of the state is transparently handled in the functional approach.

Dispatch

With imperative implemtation connecting of the state to the calculations has to be manually written. Specific entry methods for each type of input need to be created. Connecting the output to the application requires listener registration and mechanical code that adds no value.

Triggering

All the triggering of calculations has to be manually written and tested, filters need to be invoked in the right place and order. Again the functional approach takes care of all this logic transparently.

Control signals

Resetting and publshing signals needs to be co-ordinated with state management. 

Testing

Helper functions are easily testable as external static functions. Sinks can be registered with the processor and a set of events fired in. This is a repeatable testing approach for any event processor. A custom set of tests has to be written for the imperative case. If there is a problem in the live application we can guarantee the situation can be reproduced in the functional ase if the event stream is captured. The same state guarantees cannot be made about the imperative solution.

Composing complexity

Complexity and functions are composed in the construction phase, This is easy to alter and update as the pattern is well understood. In the imperative solution is freeform, any approach could have been formulated and built. This will require time to understand and fix bugs for anyone unfamiliar with the code base.

Conditional logic

There are no explicit conditional statements in the functional solution. In the imperative approach the code would be littered with if/else statements, each of these has to be understood and behave correctly. Poor conditional logic is a rich source of bugs 

Reasoning about the logic

Because a repeatable construction pattern is followed there is virtually no learning curve for a new developer to understand the code and fix bugs. Because the triggering and order of execution are predictable there is no need to understand a custom event dispatch solution, again making the learning curve easier.

Code size

The core imperative solutino maybe smaller in code, but the other requirements such as listener registration, filters, state management, declaraing variables, exposing methods for testing etc. will create code noise and probably make the solution larger than the functional definition

Conclusion

I hope this article has created interest in event stream programming and demonstrated its use can be quite easy if we uncouple non-core concerns. Comparing the imperative approach to the functional approach shows the value of former in this case. As complexity rises the functional case becomes compelling.

Please download the example and play with it, I welcome any feedback on the library. 

Comments

Popular posts from this blog

Waste free coding

Serialize your lambdas for a rainy day - save to file