Saturday, February 14, 2009

Retroactive Event Processing with Scala Actors

Martin Fowler talked about Event Sourcing. Greg Young mentioned about using Event Sourcing in algorithmic trading applications. Very recently Jonas Boner hacked up Martin's example using asynchronous Scala actors. That's heck of a precedence to conclude that there is enough meat in this lunch.

Let me summarize my thoughts on Event Sourcing using Scala actors, and how it can be used to perform retroactive monitoring and diagnostics in an environment that supports code hot-swapping. The thoughts are from the perspective of a real world application with real world requirements. And yes, this is also a trading application that processes security trades that pass through various state transitions in its lifecycle before being finally persisted in the database. In a trading application, tracking exceptions is one of the main components, and exception monitors play an important role in the way the trading desk would like to explore and visualize the processing pipeline. The whole idea is to prevent manual intervention. Hence trades falling off the straight-thru-processing pipeline because of exceptions need to be put back on rails, sometimes even digging into the past, patching up services real-time, and exploring the what-if s and what-would-have-happened scenarios. In short, there is enough reason to do retroactive mining of the application through replays and hotswaps.

Without further ado, let us hack up something similar that accumulates events as they occur in the lifecycle of a trade .. but first, some trivial abstractions (of course elided for demonstration purposes) to set up the stage of the domain model ..

sealed trait InstrumentType
case object EQ extends InstrumentType
case object FI extends InstrumentType

// instrument to be traded  
case class Instrument(isin: String, name: String, insType: InstrumentType)

// trade domain model: immutable
case class Trade(id: Int, ref: String, ins: Instrument, 
  qty: Int, unitPrice: Int, taxFee: Int, net: Int)


The domain objects have been simplified beyond imagination, but still enough to serve the purpose of this post. The only point of note is that all of them have been modeled as immutable abstractions.

As I mentioned above, a trade goes through many state transitions in its lifecycle. We model each transition as being triggerred through an event, which we would like to capture in an event queue.

// base class
sealed abstract case class Event {
  val recorded = new Date
}

// any event that occurs in the lifecycle of a trade  
sealed abstract case class TradeProcessingEvent(val trade: Trade) 
  extends Event

// enrich the trade with tax/fee and other values  
case class EnrichTrade(override val trade: Trade) 
  extends TradeProcessingEvent(trade)

// compute the net value of the trade using market practices and rules
case class ValueTrade(override val trade: Trade) 
  extends TradeProcessingEvent(trade)

// show all events    
case object Show


Once again, as per the best practices, the events are modeled as immutable objects. Also, we keep the events independent of the processing logic, and keep all processing in the layer that seems most natural for them - the domain service layer. And here, we model the service as an actor, that receives all events for the trade and does appropriate processing ..

// a dummy service mainly for demonstration using mock values
val tradingService_1 = actor {
  loop {
    react {
      case e@EnrichTrade(t@trade) => 
        // fills up the tax/fee part
        println("processing event " + e + " out trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 100, t.net))

      case v@ValueTrade(t@trade) =>
        // fills up the net value of the trade
        println("processing event " + v + " out trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 1000))
    }
  }
}


And finally the EventProcessor that sources event processing for all trades and acts as the facade to the layer above. EventProcessor is also modeled as an actor, for obvious reasons of scalability, eventual consistency and all the benefits that asynchronous models offer.

class EventProcessor extends Actor {
  def act = loop(tradingService_1, Nil)
    
  def loop(ts: Actor, events: List[TradeProcessingEvent]) {
    react {
      case Replay =>
        events.reverse.foreach(ts ! _)
        loop(ts, events)
          
      case event: TradeProcessingEvent =>
        ts ! event
        loop(ts, event :: events)
          
      case Show =>
        events.reverse.foreach(println)
        loop(ts, events)
    }
  }
}

// event for replaying from the queue  
case object Replay extends Event


Note that the loop in the actor above takes the list of events as the state and maintains it in each recursive call. The list gets appended to when we receive new events - this is an effective idiom in functional programming of state management, which does not induce any mutual side-effect. The service is also part of the state, and is used to process every event that comes its way. This will have a very important role to play, as we move along ..

REPL ing it out


Ok, now with enough meat on our plates, let us hack the hell out. Let us fire up the scala REPL and see some of the above stuff in action ..


scala> import org.dg.event.EventSourcing._
import org.dg.event.EventSourcing._

scala> i1
res0: org.dg.event.EventSourcing.Instrument = Instrument(ISIN_1,IBM,EQ)

scala> t1
res1: org.dg.event.EventSourcing.Trade = Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)

scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@6bb93c

scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@6bb93c



What we have done so far is some boilerplate stuff in defining an Instrument, a Trade and start up our EventProcessor. As the above session indicates, all are cool and healthy, as they should be .. ok .. next step .. let us fire some events and check the event queue ..


scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)


scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)


scala> e ! Show

EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))
ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0))



Two events have been fired, a trade has been enriched and valuated and the result trades have been printed in the REPL. For simplicity, these independent events operating on the same trade are not chained and the impact of one does not affect the source trade of another. In real life, this may as well be necessary .. So the 2 events above process the trades correctly and as the Show message indicates, there are 2 events in the queue that are sequenced properly.

Can we Replay the events ?


Sure, we can. That is one of the major benefits of the entire architecture. In fact, the event queue is a live shot of the sequence of activities that has happened on the trades flowing through the pipeline. It is right up there to be manipulated by multiple consumers in various ways for application visualization and monitoring.

In case of exceptions, the trader desk may need to rerun the sequence, often selectively. Replay message does that precisely. The current implementation shows an unconditional replay, while in reality, we can have variations like ..


Replay(date: Date)        // replay all events filtered by the input date
Replay(trade: Trade)      // replay all events for the particular trade
Replay(ins: Instrument)   // replay all events for all trades on the input Instrument



Let us see what Replay gives us ..


scala> e ! Replay

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) result trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)



As expected, Replay replays the event list and simulates the processing as of some filter condition. This actually opens up a world of options in that the computations can be replayed, situations can be simulated and exceptions can be monitored, visualized and debugged more efficiently, which is one of the vital mandates of an STP trading system.

And now for fun extrema ..



Excerpted from the article of Martin Fowler, let us look at this seemingly innocuous line ..

So this discussion has made the assumption that the application processing the events stays the same. Clearly that's not going to be the case. Events handle changes to data, what about changes to code?

As Martin goes on to mention in the same article, code changes may occur due to addition of new features, defect fixes and temporal logic. There may be scenarios where we may need to process already occurred events with the newly introduced fix. Event sourcing provides a great way out for such practices - in our trading application, exception monitors often need to peek at the event queue, pick up the event at fault and re-process the same using the new version of the service introduced as a fix. Martin Fowler defines a Retroactive Event as one that can be used to (a)utomatically correct the consequences of a incorrect event that's already been processed.

Scala actors support hotswapping of code, can we take advantage of this feature and process retroactive events using Event Sourcing ? The real fun begins here, we need to add the capability of executing event processing with retroactive effect, i.e. each event in the event queue will be processed by the processor which existed at the time of its original processing. We can implement hotswapping in Scala through states being passed to actors in a recursive loop. And since we need to store the service that existed at that point in time, we need to pass that as state as well .. Like the following ..

class EventProcessor extends Actor {
  def act = loop(tradingService_1, Nil)
    
  def loop(ts: Actor, events: List[(Actor, TradeProcessingEvent)]) {
    react {
      case Replay =>
        events.reverse.foreach(=> x._1 ! x._2)
        loop(ts, events)
          
      case event: TradeProcessingEvent =>
        ts ! event
        loop(ts, (ts, event) :: events)
          
      case Show =>
        events.reverse.foreach(println)
        loop(ts, events)

      case HotSwap(s) =>
        loop(s, events)
    }
  }
}

// event for hotswapping
case class HotSwap(s: Actor) extends Event


Here we are passing the service as an additional state and storing it in the event queue as well. The storage can be made more efficient in the production code, but I guess, the idea is clear in the above implementation.

So let us implement another version of the service, do a hotswapping and watch the events being processed retroactively ..

// another version of the trading service
val tradingService_2 = actor {
  loop {
    react {
      case e@EnrichTrade(t@trade) =>
        // new mocked values, 200 for tax/fee
        println("processing event " + e + " result trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, 200, t.net))

      case v@ValueTrade(t@trade) =>
        // new mocked values, 2000 for net value
        println("processing event " + v + " result trade: " + 
          Trade(t.id, t.ref, t.ins, t.qty, t.unitPrice, t.taxFee, 2000))
    }
  }
}


The following session demonstrates how events can be processed retroactively when the code changes. The event queue is still consistent and has everything to replay as of the past. This is an extremely powerful idiom of Event Sourcing that can implement features for real time application analysis. Any problem, bug detected on your code base can be swapped out in favor of the earlier version and the event queue gives you the power of replaying every event with the version of code that you want.


scala> val e = new EventProcessor
e: org.dg.event.EventSourcing.EventProcessor = org.dg.event.EventSourcing$EventProcessor@88a970

scala> e.start
res2: scala.actors.Actor = org.dg.event.EventSourcing$EventProcessor@88a970

scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)


scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)


scala> e ! Show

(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))


scala> e ! HotSwap(tradingService_2)

scala> e ! EnrichTrade(t1)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)

scala> e ! ValueTrade(t1)

processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)

scala> e ! Show
(scala.actors.Actor$$anon$1@825cf3,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@825cf3,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))
(scala.actors.Actor$$anon$1@1f6f3dc,ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)))


scala> e ! Replay
processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,100,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,1000)

processing event EnrichTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,200,0)
processing event ValueTrade(Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,0)) out trade: Trade(1,ref-1,Instrument(ISIN_1,IBM,EQ),100,20,0,2000)




In the above, the first two replays are with the earlier version of the service, after which we hotswapped tradingService_1 with tradingService_2. The last 2 replays reflect this change and runs with the newer version of the service.

Monday, February 09, 2009

Learning Haskell: Solving the Josephus Flavius game

Haskell supports lazy evaluation and recursive let. Both of these can be used very elegantly to implement recursive data structures. Consider this snippet ..

repeat :: a -> [a]
repeat x = let xs = x:xs
    in xs


In the above function, the result is named 'xs', which is again used inside the recursive let. Hence there is a circular loop, where the result in one stage of the evaluation gets fed back into the input. This is the principle of circular programming, and is best used in lazily evaluated languages. Languages that employ strict evaluation semantics will fail to evaluate the above.

Consider an alternative definition of repeat ..

repeat x = x : repeat x


Haskell implements the above function also as a thunk and evaluates *incrementally* and *on demand*. But the evaluation proceeds differently from the earlier version, in the sense that the latter version keeps on consuming memory and appending elements during evaluation. The earlier version is smart enough and traverses the circular structure only.

In Haskell, this technique is also referred to as "tying the knot", and is an effective substitute for mutable references. This technique has been used quite effectively in optimizing traversals on recursive structures - algorithms that usually need multiple traversals in a strict evaluation language can be tackled using a single traversal. But that is the subject of another post, some other day ..

Over the weekend I came across this puzzle, better known as the Josephus Flavius game ..

"Josephus Flavius was a famous Jewish historian of the first century at the time of the Second Temple destruction. During the Jewish-Roman war he got trapped in a cave with a group of 40 soldiers surrounded by romans. The legend has it that preferring suicide to capture, the Jews decided to form a circle and, proceeding around it, to kill every third remaining person until no one was left. Josephus, not keen to die, quickly found the safe spot in the circle and thus stayed alive."

I dug into the problem and tried to find a solution in Haskell. It ended up a cool application of circular programming ..

josephus init nth = 

    -- alive will have the sequence of victims in the last part
    let l = alive
        (v, s) = splitAt (length alive - length init) alive
    in (s, last v)

    where 

    -- circular: alive gets fed back
    alive = init ++ victim (length init) alive []

    -- find victims and aggregate sequence
    victim 0 line seq = seq
    victim n line seq = 
        let (f, s) = splitAt (nth-1) line
        in f ++ (victim (n-1) (tail s) (seq ++ [head s]))


The program returns the last survivor as well as the entire elimination order of the victims. For the Josephus problem, the solution is ..


*Main> josephus [1..40] 3
([3,6,9,12,15,18,21,24,27,30,33,36,39,2,7,11,16,20,25,29,34,38,4,10,17,23,31,37,5,14,26,35,8,22,40,19,1,32,13,28],28)




The first element of the tuple is the sequence in which the victims get eliminated, while the last element (28) is the last survivor. Felt good for a Haskell newbie ..