Composing Akka behaviors with monads
Do I want to read this?
You might be interested in this post if:
- You are interested in Scala and Akka
- You are curious about how to effectively implement and manage highly concurrent processes
- You want to see an interesting application of the monad type class
What do I need to know to get anything out of this?
If you are familiar enough with Scala enough to know how to use
flatMap
to transform a collection likeList
, and if you are familiar enough with Akka to set up a basic actor system, you should be able to make sense of most of what’s in here.
I’m a busy person, do I really need to read all of this?
Probably not. You can skip to the code here: https://github.com/johnhungerford/akka-util
Pay particular attention to the
BehaviorPhase
trait and its implementationHandlerBehaviorPhase
. It allows you to define a “phase” of an Akka behavior that accomplishes some task and returns some value. These phases can be composed usingflatMap
or afor
comprehension into a single complete actor behavior, thus separating the procedural logic of actor behavior from the component message handlers.DiningHakkersBehaviorPhase
provides an example.Skip to the section A new solution below for more information about how it does that and how it can be used.
Introduction
Over the past few years, Akka has become a standard solution for handling concurrency in high-complexity JVM applications. Its implementation of the actor model via a flexible and intuitive API allows developers to easily design concurrent and asynchronous processes without having to worry about the inevitable race conditions and deadlocks that arise using traditional methods.
Akka’s flexibility is owed to its unopinionated design, however, which can also be a major source of pitfalls for inexperienced users. There are endless ways to construct an actor system for any given use case, and one’s choice of design can have a lot of unforeseen consequences for performance and maintainability. In my experience, the biggest pitfall is actor complexity. It is relatively easy to avoid most common problems by following some well-known best practices, but it is hard to design an actor system that can be easily understood and maintained. As the number of actors and message types grows, it can quickly become prohibitively difficult to follow the system’s behavioral logic or to make simple changes to its behaviors without changing many different parts of the system. Refactoring actor systems like this can be a daunting task.
This problem is especially frustrating to encounter when using Scala, a language with a rich set of functional programming tools. Functional programming’s greatest strength is making it easy to compose complex applications from relatively simple, declarative functions. In a well-designed functional application, it is possible to reason locally about the application’s logic at every “level” of its conceptual structure. While the actor model is the de-facto concurrency model used by functional programming languages, however, it does not enforce local reasoning in and of itself. This is because concurrency ultimately and unavoidably deals with “effects” — the aspects of the system that functional programming most of all tries to abstract away. While the actor model can use pure functions to describe actors and their behaviors, the functions that actually send messages necessarily produce outcomes that are not reducible to those functions’ return values. As a consequence, an actor’s behavior and complexity are not bounded by its type signature in the same way as pure functions.
The question, then, is how do we bring the advantages of functional programming back into the actor model? Is there a way to construct actors so that we can reason about their behavior locally and organize their complexity better? This post will present one method of managing actor complexity by using a monad to describe behavior in distinct “phases”, which can then be easily and flexibly assembled into a complete description of an actor behavior using monadic composition in a for
comprehension.
An example
For this post, I will be using an example from the Akka documentation called Dining Hakkers. Dining Hakkers is based on an old concurrency problem called “Dining Philosophers,” which models a group of diners sitting around a circular table with just one chopstick between each of them. Each diner, or “hakker”, will alternate between thinking and eating. A hakker cannot eat, however, unless it can pick up both chopsticks. If one of the chopsticks has been picked up by a neighboring hakker, it must put the other chopstick down and continue to think for some period before trying to eat again.
This is a useful example because, although simple, it involves a kind of procedural logic: each person and each chopstick will go through different “phases” where their respective behaviors accomplish different tasks. Many elementary Akka examples involve contrived scenarios where each actor basically does one or two things. I have found that one of the biggest difficulties in following actor behavior comes from the fact that most actors in real life end up having some kind of sequential behavior. By walking through the process of modeling the dining hakkers using Akka actors, we can illustrate the problem of actor complexity and show how to solve it, or at least mitigate it.
Let us begin by setting up the skeleton of our actor system without implementing any of the actual behaviors. It is clear that we will need two kinds of actors: the hakker and the chopstick. Without knowing the details of the hakker’s implementation, we know that its behavior will depend on its having access to two chopsticks, one on the left and one on the right. We will accordingly include references to chopstick actors as parameters to the behavior’s constructor. We will also define a command Eat
that will be used to start off the Hakker
‘s cycle of eating and thinking (we will need to define more, but this is the only one that we will pass in from the outside). Finally, we’ll add static methods for generating random durations and random decisions that will enable us to introduce noise to the actor system and make the diners’ interactions more interesting:
object Chopstick { sealed trait ChopstickMessage def apply( ) : Behavior[ ChopstickMessage ] = ??? } object Hakker { sealed trait Command case object Eat extends Command def apply( name : String, left : ActorRef[ ChopstickMessage ], right : ActorRef[ ChopstickMessage ] ) : Behavior[ Command ] = ??? def rndBool() : Boolean = r.nextBoolean() def rndDuration( dur : FiniteDuration, plusOrMinus : FiniteDuration ) : FiniteDuration = { val pmVal = r.nextLong( plusOrMinus._1 + 1 ) val pOrM = FiniteDuration( pmVal, plusOrMinus._2 ) val plus = r.nextBoolean() if ( plus ) dur + pOrM else dur - pOrM } }
We can now define an actor system that will construct the “circle” of hakkers and their corresponding chopsticks. This will respond to some new basic message types to start and stop the system. To kick off the dinner, we will send an Eat
message to each of the hakkers.
object DiningHakkers { import Chopstick._ import Hakker._ import DiningHakkers._ def apply(): Behavior[ DiningHakkersMessage ] = Behaviors.setup { context => var hakkerNames : Seq[ String ] = Nil var chopsticks : Seq[ ActorRef[ ChopstickMessage ] ] = Nil var hakkers : Seq[ ActorRef[ Command ] ] = Nil def newNames( names : Seq[ String ] ) : Unit = hakkerNames = names.distinct def initDiners() : Unit = { chopsticks = hakkerNames.zipWithIndex.map { case (_, i) => context.spawn( Chopstick(), "Chopstick" + i ) } hakkers = hakkerNames.zipWithIndex.map { case (name, i) => context.spawn( Hakker( name, chopsticks( i ), chopsticks( ( i + 1 ) % chopsticks.length ) ), name ) } } def stopDining() : Unit = { hakkers.foreach( h => context.stop( h ) ) chopsticks.foreach( c => context.stop( c ) ) hakkers = Nil chopsticks = Nil } Behaviors.receiveMessagePartial { case NewHakkers( names ) => newNames( names ) Behaviors.same case Start => if ( hakkerNames != Nil && hakkers == Nil ) { initDiners() hakkers.foreach( _ ! Eat ) } else if ( hakkers != Nil ) context.log.info( "Already started..." ) else context.log.info( "No hakkers to dine" ) Behaviors.same case Stop => stopDining() Behaviors.same case Exit => stopDining() Behaviors.stopped } } }
Finally, we can create a main method with a simple command-line interface allowing a user to create, start, and stop the dining hakkers with simple commands:
object Main { def main( args : Array[ String ] ) : Unit = { val actorSystem = ActorSystem( DiningHakkers(), "DiningHakkers" ) println( "nEnter your commandn" ) var repeat : Boolean = true do { var cmd = scala.io.StdIn.readLine().trim.toLowerCase repeat = cmd match { case StartCommand() => actorSystem ! Start; true case StartWithCommand( newHakkerNames ) => actorSystem ! NewHakkers( newHakkerNames.split( """s+""" ).map( _.trim ) ) actorSystem ! Start true case StopCommand() => actorSystem ! Stop; true case ExitCommand() => actorSystem ! Exit; false case _ => println( s"Unknown command: ${cmd}" ); true } } while ( repeat ) actorSystem.terminate() } val StartWithCommand : Regex = """start:s*([a-z0-9s]*)""".r val StartCommand : Regex = "start".r val StopCommand : Regex = "stop".r val ExitCommand : Regex = "exit".r }
Now all that remains is to implement the Hakker and Chopstick behaviors.
A naive approach
Let us begin by taking a naive approach to Dining Hakkers, describing each chopstick and each diner as a single actor with a single behavior. The chopstick behavior is pretty easy to define: it has a mutable state (not a problem within an actor as long as we keep that state closed off to the outside world) to keep track of a) whether or not it has been picked up and b) by whom. These two things can both be described by a single Option
of a reference to whatever actor (may have) picked it up.
We then define Take( ref : ActorRef[ ChopstickMessage ] )
and Put( ref : ActorRef[ ChopstickMessage ] )
messages which can be sent by a diner (or anyone else) to take or put back the chopstick. We include references to the actors so that we can ensure that only the actor who picked up a chopstick can put it back down. We must also define a Taken( ref )
and Busy( ref )
to respond to Take
, which will indicate whether the chopstick is available.
object Chopstick { sealed trait ChopstickMessage final case class Take( ref: ActorRef[ ChopstickAnswer ] ) extends ChopstickMessage final case class Put( ref: ActorRef[ ChopstickAnswer ] ) extends ChopstickMessage sealed trait ChopstickAnswer final case class Taken(chopstick: ActorRef[ ChopstickMessage ] ) extends ChopstickAnswer final case class Busy(chopstick: ActorRef[ ChopstickMessage ] ) extends ChopstickAnswer def apply( ) : Behavior[ ChopstickMessage ] = { Behaviors.setup( ctx => new Chopstick( ctx ) ) } } class Chopstick( ctx : ActorContext[ ChopstickMessage ] ) extends AbstractBehavior[ ChopstickMessage ]( ctx ) { private var takenBy : Option[ ActorRef[ ChopstickAnswer ] ] = None override def onMessage( msg : ChopstickMessage ) : Behavior[ ChopstickMessage ] = msg match { case Take( hakker ) => if ( takenBy.isEmpty ) { takenBy = Some( hakker ) hakker ! Taken( ctx.self ) } else hakker ! Busy( ctx.self ) Behaviors.same case Put( hakker ) => if ( takenBy.contains( hakker ) ) { takenBy = None Behaviors.same } else Behaviors.unhandled } }
The hakker behavior is more complicated. We will first try building its behavior in the same way we did the chopsticks: we will track which chopstick has been picked up or put down in mutable state, and we will define messages corresponding to each action of the diner. Since we will not need responses from the hakker (its output will be logs) its messages will simply be commands. One of these commands, HandleChopstickAnswer
will be a wrapper for the responses from chopstick actors, which can be automatically translated using a message adapter.
object Hakker { sealed trait Command case object Think extends Command case object Eat extends Command final case class HandleChopstickAnswer( msg: ChopstickAnswer ) extends Command def apply( name : String, left : ActorRef[ ChopstickMessage ], right : ActorRef[ ChopstickMessage ] ) : Behavior[ Command ] = ??? } class Hakker( ctx : ActorContext[ Command ], name : String, left : ActorRef[ ChopstickMessage ], right : ActorRef[ ChopstickMessage ] ) extends AbstractBehavior[ Command ]( ctx ) { import Hakker._ import Chopstick._ // MUTABLE STATE private var takeLeftChopstickSuccess : Option[ Boolean ] = None private var takeRightChopstickSuccess : Option[ Boolean ] = None private val adapter = ctx.messageAdapter( HandleChopstickAnswer ) override def onMessage( msg : Command ) : Behavior[ Command ] = { ctx.log.info( s"$name -- left: $takeLeftChopstickSuccess; right: $takeRightChopstickSuccess; msg: $msg") msg match { case Eat => left ! Take( adapter ) right ! Take( adapter ) ctx.log.info( "{} began trying to eat", name ) Behaviors.same case Think => left ! Put( adapter ) right ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 5.seconds, 3.seconds ), ctx.self, Eat ) ctx.log.info( "{} started thinking", name ) Behaviors.same case HandleChopstickAnswer( Busy( `left` ) ) => if ( takeRightChopstickSuccess.nonEmpty ) { if ( takeRightChopstickSuccess.get ) right ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} could not pick up {} so he put down {} and started to think", name, left.path.name, right.path.name ) } else if ( takeRightChopstickSuccess.isEmpty ) { takeLeftChopstickSuccess = Some( false ) ctx.log.info( "{} could not pick up {}", name, left.path.name ) } Behaviors.same case HandleChopstickAnswer( Busy( `right` ) ) => if ( takeLeftChopstickSuccess.nonEmpty ) { if ( takeLeftChopstickSuccess.get ) left ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} could not pick up {} so he put down {} and started to think", name, right.path.name, left.path.name ) } else if ( takeLeftChopstickSuccess.isEmpty ) { takeRightChopstickSuccess = Some( false ) ctx.log.info( "{} could not pick up {}", name, right.path.name ) } Behaviors.same // If one of the chopsticks was able to be taken successfully case HandleChopstickAnswer( Taken( `left` ) ) => if ( takeRightChopstickSuccess.nonEmpty ) { if ( !takeRightChopstickSuccess.get ) { left ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} picked up {} but because {} could not be picked up put it down and started to think", name, left.path.name, right.path.name ) } else { ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Think ) ctx.log.info( "{} picked up {} and started to eat", name, left.path.name ) } } else { takeLeftChopstickSuccess = Some( true ) ctx.log.info( "{} picked up {}", name, left.path.name ) } Behaviors.same case HandleChopstickAnswer( Taken( `right` ) ) => if ( takeLeftChopstickSuccess.nonEmpty ) { if ( !takeLeftChopstickSuccess.get ) { right ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} picked up {} but because {} could not be picked up put it down and started to think", name, right.path.name, left.path.name ) } else { ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Think ) ctx.log.info( "{} picked up {} and started to eat", name, right.path.name ) } } else { takeRightChopstickSuccess = Some( true ) ctx.log.info( "{} picked up {}", name, right.path.name ) } Behaviors.same } } }
In this setup, the diner’s behavior can be kicked off by sending it either an Eat
or Think
message. The Eat
message will cause it to try to pick up the chopsticks by sending Take
messages to each. Its subsequent behavior is determined by its handling of the chopstick response messages, which are adapted in a HandleChopstickResponse
message wrapper. Once it is determined that both have been picked up, it logs that it is now eating, and a self-message is scheduled for the future to switch it from “eating” to “thinking,” which then schedules another Eat
message to switch it back to eating. If one or both chopsticks cannot be picked up, any that has been picked up is put back down, and a Think
message is immediately sent to itself to start the think-eat cycle over again.
When we run the program, we should now see a cascade of logs documenting each update to their states, as the hakkers and chopstick interact in chaotic ways:
Enter your command start: Dasher Dancer Prancer Vixen Comet Cupid Donner Blitzen Rudolph 10:02:16.329 [DiningHakkers-akka.actor.default-dispatcher-8] INFO dininghakkers.DiningHakkers$$anonfun$$nestedInanonfun$apply$1$1 - UPDATING HAKKERS 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-9] INFO dininghakkers.Hakker - dasher began trying to eat 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-6] INFO dininghakkers.Hakker - dancer began trying to eat 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-9] INFO dininghakkers.Hakker - dasher picked up Chopstick0 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - prancer began trying to eat 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-10] INFO dininghakkers.Hakker - vixen began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-9] INFO dininghakkers.Hakker - dasher could not pick up Chopstick1 so he put down Chopstick0 and started to think 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-6] INFO dininghakkers.Hakker - dancer could not pick up Chopstick2 10:02:16.335 [DiningHakkers-akka.actor.default-dispatcher-15] INFO dininghakkers.Hakker - comet began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-7] INFO dininghakkers.Hakker - cupid began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-6] INFO dininghakkers.Hakker - dancer picked up Chopstick1 but because Chopstick2 could not be picked up put it down and started to think 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-7] INFO dininghakkers.Hakker - cupid picked up Chopstick6 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - prancer picked up Chopstick2 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-7] INFO dininghakkers.Hakker - cupid picked up Chopstick5 and started to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - rudolph began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-10] INFO dininghakkers.Hakker - vixen picked up Chopstick3 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - donner began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - blitzen began trying to eat 10:02:16.336 [DiningHakkers-akka.actor.default-dispatcher-15] INFO dininghakkers.Hakker - comet could not pick up Chopstick4 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - prancer could not pick up Chopstick3 so he put down Chopstick2 and started to think 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - rudolph picked up Chopstick8 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-10] INFO dininghakkers.Hakker - vixen picked up Chopstick4 and started to eat 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - donner could not pick up Chopstick6 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - blitzen picked up Chopstick7 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-15] INFO dininghakkers.Hakker - comet could not pick up Chopstick5 so he put down Chopstick4 and started to think 10:02:16.337 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - rudolph picked up Chopstick0 and started to eat 10:02:16.338 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - donner could not pick up Chopstick7 so he put down Chopstick6 and started to think 10:02:16.338 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - blitzen could not pick up Chopstick8 so he put down Chopstick7 and started to think 10:02:21.351 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - prancer began trying to eat 10:02:21.351 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - blitzen began trying to eat 10:02:21.351 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - cupid started thinking 10:02:21.352 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - prancer picked up Chopstick2 10:02:21.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - blitzen could not pick up Chopstick8 10:02:21.353 [DiningHakkers-akka.actor.default-dispatcher-16] INFO dininghakkers.Hakker - prancer could not pick up Chopstick3 so he put down Chopstick2 and started to think 10:02:21.353 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - blitzen picked up Chopstick7 but because Chopstick8 could not be picked up put it down and started to think 10:02:24.351 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - comet began trying to eat 10:02:24.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - comet could not pick up Chopstick4 10:02:24.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - comet picked up Chopstick5 but because Chopstick4 could not be picked up put it down and started to think 10:02:25.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - dasher began trying to eat 10:02:25.353 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - dasher could not pick up Chopstick0 10:02:25.353 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - dasher picked up Chopstick1 but because Chopstick0 could not be picked up put it down and started to think 10:02:26.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - donner began trying to eat 10:02:26.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - donner picked up Chopstick6 10:02:26.352 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - donner picked up Chopstick7 and started to eat 10:02:28.353 [DiningHakkers-akka.actor.default-dispatcher-13] INFO dininghakkers.Hakker - rudolph started thinking 10:02:28.353 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - dancer began trying to eat 10:02:28.354 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - dancer picked up Chopstick1 10:02:28.354 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - dancer picked up Chopstick2 and started to eat 10:02:28.372 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - cupid began trying to eat 10:02:28.373 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - cupid could not pick up Chopstick6 10:02:28.373 [DiningHakkers-akka.actor.default-dispatcher-19] INFO dininghakkers.Hakker - cupid picked up Chopstick5 but because Chopstick6 could not be picked up put it down and started to think
This works just fine so long as we are careful about what messages we send to each diner actor to kick of its behavior. Hakkers require Eat
or Think
messages to kick off their behavior, but what happens if we send them Eat
or Think
messages when they have already started eating or thinking? We can test this with a simple actor system of a single hakker and two chopsticks:
scala> val hakker = ActorSystem( SingleDiningHakker( "hakker" ), "hakker" ) val hakker: akka.actor.typed.ActorSystem[org.hungerford.akka.dininghakkers.Hakker.Command] = akka://hakker scala> hakker ! Hakker.Eat 10:19:17.548 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:17.551 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker picked up chopstick-1 10:19:17.553 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker picked up chopstick-2 and started to eat 10:19:31.574 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker started thinking 10:19:37.590 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:37.591 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker picked up chopstick-1 10:19:37.591 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker picked up chopstick-2 and started to eat hakker ! Hakker.Eat 10:19:45.504 [hakker-akka.actor.default-dispatcher-3] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:45.504 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 10:19:45.505 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 so he put down chopstick-1 and started to think hakker ! Hakker.Eat 10:19:48.584 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:48.584 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 10:19:48.584 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 so he put down chopstick-2 and started to think 10:19:49.611 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker started thinking 10:19:55.519 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:55.519 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker picked up chopstick-1 10:19:55.520 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker picked up chopstick-2 and started to eat 10:19:56.600 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:56.601 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 10:19:56.601 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 so he put down chopstick-1 and started to think 10:19:57.630 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:19:57.631 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 10:19:57.631 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 so he put down chopstick-1 and started to think 10:20:01.620 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:20:01.621 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 10:20:01.621 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 so he put down chopstick-1 and started to think 10:20:05.541 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker started thinking 10:20:06.651 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:20:06.651 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker picked up chopstick-1 10:20:06.651 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker picked up chopstick-2 and started to eat 10:20:09.562 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:20:09.562 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 10:20:09.562 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 so he put down chopstick-1 and started to think 10:20:09.640 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker began trying to eat 10:20:09.640 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-2 10:20:09.640 [hakker-akka.actor.default-dispatcher-5] INFO dininghakkers.Hakker - hakker could not pick up chopstick-1 so he put down chopstick-2 and started to think
After receiving the first Eat
command, the hakker’s behavior is as expected: because it is the only one using its chopstick, it simply cycles between eating and thinking. After sending a second Eat
command, however, it starts compaining about being unable to eat. Having been told to Eat
when already eating, the hakker tries to pick up chopsticks that it’s already holding! And since each Eat
command schedules subsequent Think
commands which in turn schedule Eat
commands, the incoherent behavior produced by an unexpected command reproduces itself indefinitely. Each additional unexpected command compounds this undesired behavior.
The basic problem is that our actor’s behavior does not respond to changes in its state: it always handles every message the same way. The correct behavior of an actor should not be contingent on all other actors in the system only sending expected messages to it. We therefore need to adapt the hakker’s message handling to the phase of its behavior (waiting, thinking, eating, etc.).
The obvious way to do this is to keep track of the phases in mutable state, and matching on this state in each message case:
class Hakker ... { ... // Enum of various hakker states private trait HakkerState private case object Waiting extends HakkerState private case object TryingToEat extends HakkerState private case object Eating extends HakkerState private case object Thinking extends HakkerState // MUTABLE STATE private var currentState : HakkerState = Waiting private var takeLeftChopstickSuccess : Option[ Boolean ] = None private var takeRightChopstickSuccess : Option[ Boolean ] = None private val adapter = ctx.messageAdapter( HandleChopstickAnswer ) override def onMessage( msg : Command ) : Behavior[ Command ] = { msg match { case Eat => currentState match { case Waiting => currentState = TryingToEat left ! Take( adapter ) right ! Take( adapter ) ctx.log.info( "{} began trying to eat", name ) Behaviors.same case Thinking => currentState = TryingToEat left ! Take( adapter ) right ! Take( adapter ) ctx.log.info( "{} began trying to eat", name ) Behaviors.same case _ => Behaviors.unhandled } case Think => currentState match { case Waiting => currentState = Thinking ctx.scheduleOnce( rndDuration( 5.seconds, 3.seconds ), ctx.self, Eat ) ctx.log.info( "{} started thinking", name ) Behaviors.same case Eating => currentState = Thinking left ! Put( adapter ) right ! Put( adapter ) takeRightChopstickSuccess = None takeLeftChopstickSuccess = None ctx.scheduleOnce( rndDuration( 5.seconds, 3.seconds ), ctx.self, Eat ) ctx.log.info( "{} started thinking", name ) Behaviors.same case _ => Behaviors.unhandled } case HandleChopstickAnswer( Busy( `left` ) ) => if ( currentState != TryingToEat ) Behaviors.unhandled else { if ( takeRightChopstickSuccess.nonEmpty ) { if ( takeRightChopstickSuccess.get ) right ! Put( adapter ) currentState = Thinking takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} could not pick up {} so he put down {} and started to think", name, left.path.name, right.path.name ) } else if ( takeRightChopstickSuccess.isEmpty ) { takeLeftChopstickSuccess = Some( false ) ctx.log.info( "{} could not pick up {}", name, left.path.name ) } Behaviors.same } case HandleChopstickAnswer( Busy( `right` ) ) => if ( currentState != TryingToEat ) Behaviors.unhandled else { if ( takeLeftChopstickSuccess.nonEmpty ) { if ( takeLeftChopstickSuccess.get ) left ! Put( adapter ) currentState = Thinking takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} could not pick up {} so he put down {} and started to think", name, right.path.name, left.path.name ) } else if ( takeLeftChopstickSuccess.isEmpty ) { takeRightChopstickSuccess = Some( false ) ctx.log.info( "{} could not pick up {}", name, right.path.name ) } Behaviors.same } case HandleChopstickAnswer( Taken( `left` ) ) => if ( currentState != TryingToEat ) Behaviors.unhandled else { if ( takeRightChopstickSuccess.nonEmpty ) { if ( !takeRightChopstickSuccess.get ) { left ! Put( adapter ) currentState = Thinking takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} picked up {} but because {} could not be picked up put it down and started to think", name, left.path.name, right.path.name ) } else { currentState = Eating ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Think ) ctx.log.info( "{} picked up {} and started to eat", name, left.path.name ) } } else { takeLeftChopstickSuccess = Some( true ) ctx.log.info( "{} picked up {}", name, left.path.name ) } Behaviors.same } case HandleChopstickAnswer( Taken( `right` ) ) => if ( currentState != TryingToEat ) Behaviors.unhandled else { if ( takeLeftChopstickSuccess.nonEmpty ) { if ( !takeLeftChopstickSuccess.get ) { right ! Put( adapter ) currentState = Thinking takeLeftChopstickSuccess = None takeRightChopstickSuccess = None ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Eat ) ctx.log.info( "{} picked up {} but because {} could not be picked up put it down and started to think", name, right.path.name, left.path.name ) } else { currentState = Eating ctx.scheduleOnce( rndDuration( 10.seconds, 5.seconds ), ctx.self, Think ) ctx.log.info( "{} picked up {} and started to eat", name, right.path.name ) } } else { takeRightChopstickSuccess = Some( true ) ctx.log.info( "{} picked up {}", name, right.path.name ) } Behaviors.same } } } }
The complexity of this new setup now makes it harder to understand the behavior, however. It would now be much more difficult for someone unfamiliar with the expected behavior to work it out just by looking through the message handler. This can be improved by breaking up the handling of messages by phase. Rather than matching on the message, we can first match on currentState
, and then define a separate message handler for each phase:
class Hakker( ctx : ActorContext[ Command ], name : String, left : ActorRef[ ChopstickMessage ], right : ActorRef[ ChopstickMessage ] ) extends AbstractBehavior[ Command ]( ctx ) { // Enum of various hakker states private trait HakkerState private case object Waiting extends HakkerState private case object TryingToEat extends HakkerState private case object Eating extends HakkerState private case object Thinking extends HakkerState // MUTABLE STATE private var currentState : HakkerState = Waiting private var takeLeftChopstickSuccess : Option[ Boolean ] = None private var takeRightChopstickSuccess : Option[ Boolean ] = None private val adapter = ctx.messageAdapter( HandleChopstickAnswer ) override def onMessage( msg : Command ) : Behavior[ Command ] = { currentState match { case Waiting => msg match { case Eat => currentState = Thinking ctx.scheduleOnce( 5.seconds, ctx.self, Eat ) ctx.log.info( "{} started thinking" ) Behaviors.same case _ => Behaviors.unhandled } case Thinking => msg match { case Eat => currentState = TryingToEat left ! Take( adapter ) right ! Take( adapter ) ctx.log.info( "{} began trying to eat", name ) Behaviors.same case _ => Behaviors.unhandled } case TryingToEat => msg match ... ...etc... } } }
We have now arrived at a reliable solution for the use case that is not too difficult to follow. This would be an acceptable solution if we knew we would never have to change any aspect of the hakker’s behavior. But what if we wanted to make even some minor change? Say we wanted the diner to stop to chat to a neighbor while eating? Or pick up a chopstick and put it back down when in the course of thinking? We would have to add more HakkerState
subtypes, and trace our way through all the different phases making sure we have all the right phase state changes in the right place.
It should throw up a red flag when we find ourselves having to hunt through many different parts of code to make a simple change when those parts are logically separable. Let’s see if we can find some way to enforce a separation of concerns in our actors’ behaviors.
Breaking up behaviors into components
To begin with, we need to find a way to separate the various behavior phases from each other in some way that allows us to deal with them on their own. Akka offers two obvious ways to do this: spawning subordinate actors, and what Akka calls the “finite state machine.”
Breaking up behavior by spawning child actors
The most obvious way to handle this problem is by using subordinate actors. Rather than trying to handle everything in a single actor, the Hakker behavior will focus only on cycling through the phases, and for each one it will spawn a new actor to handle messages for that phase alone.
To accomplish this, we will define a new subtype of Command
called PhaseMessages
to be used by each child actor to signal to the parent that its phase is complete. The message handler will handle this first, updating its phase’s state and spawning new children as appropriate. All other messages will be piped to the appropriate child as determined by currentState
.
class Hakker( ctx : ActorContext[ Command ], name : String, left : ActorRef[ ChopstickMessage ], right : ActorRef[ ChopstickMessage ] ) extends AbstractBehavior[ Command ]( ctx ) { import Hakker._ import Chopstick._ // Enum of various hakker states private trait HakkerState private case object Waiting extends HakkerState private case object TryingToEat extends HakkerState private case object Eating extends HakkerState private case object Thinking extends HakkerState // MUTABLE STATE private var currentState : HakkerState = Waiting private var takeLeftChopstickSuccess : Option[ Boolean ] = None private var takeRightChopstickSuccess : Option[ Boolean ] = None private val adapter = ctx.messageAdapter( HandleChopstickAnswer ) trait PhaseMessages extends Command case object FinishedWaiting extends PhaseMessages case object FinishedEating extends PhaseMessages case object FinishedThinking extends PhaseMessages case object GotChopsticks extends PhaseMessages case object FailedToGetChopsticks extends PhaseMessages private lazy val waitHandler = ctx.spawn( waitingBehavior( ctx.self ), "waitHandler" ) private var eatHandler : Option[ ActorRef[ Command ] ] = None private var thinkHandler : Option[ ActorRef[ Command ] ] = None private var tryToEatHandler : Option[ ActorRef[ Command ] ] = None override def onMessage( msg : Command ) : Behavior[ Command ] = msg match { case FinishedWaiting => thinkHandler = Some( ctx.spawn( thinkingBehavior( ctx.self, 5.seconds, 3.seconds ), s"thinkHandler-${UUID.randomUUID()}" ) ) ctx.log.info( "{} finished waiting and started thinking", name ) currentState = Thinking Behaviors.same case FinishedEating => eatHandler = None thinkHandler = Some( ctx.spawn( thinkingBehavior( ctx.self, 5.seconds, 3.seconds ), s"thinkHandler-${UUID.randomUUID()}" ) ) ctx.log.info( "{} finished eating and started thinking", name ) currentState = Thinking Behaviors.same case FinishedThinking => thinkHandler = None tryToEatHandler = Some( ctx.spawn( tryingToEatBehavior( ctx.self ), s"tryToEatHandler-${UUID.randomUUID()}" ) ) ctx.log.info( "{} finished thinking and started trying to eat", name ) currentState = TryingToEat Behaviors.same case GotChopsticks => tryToEatHandler = None eatHandler = Some( ctx.spawn( eatingBehavior( ctx.self, 5.seconds, 3.seconds ), s"eatHandler-${UUID.randomUUID()}" ) ) ctx.log.info( "{} got both chopsticks and started eating", name ) currentState = Eating Behaviors.same case FailedToGetChopsticks => tryToEatHandler = None thinkHandler = Some( ctx.spawn( thinkingBehavior( ctx.self, 10.seconds, 5.seconds ), s"thinkHandler-${UUID.randomUUID()}" ) ) ctx.log.info( "{} couldn't get both chopsticks and started thinking", name ) currentState = Thinking Behaviors.same case otherMsg => currentState match { case Waiting => waitHandler ! otherMsg Behaviors.same case Eating => eatHandler.get ! otherMsg Behaviors.same case Thinking => thinkHandler.get ! otherMsg Behaviors.same case TryingToEat => tryToEatHandler.get ! otherMsg Behaviors.same } } private def waitingBehavior( parent : ActorRef[ Command ] ) : Behavior[ Command ] = ??? private def eatingBehavior( parent : ActorRef[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = ??? private def thinkingBehavior( parent : ActorRef[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = ??? private def tryingToEatBehavior( parent : ActorRef[ Command ] ) : Behavior[ Command ] = ??? }
We can then define the individual phases by creating new actors that respond to messages and send initializing messages. These will look pretty similar to their respective sections from the naive implementation, but now none of them contain any reference to any of the other handlers or phases:
private def waitingBehavior( parent : ActorRef[ Command ] ) : Behavior[ Command ] = Behaviors.receiveMessage { case Eat => parent ! FinishedWaiting Behaviors.stopped case Think => parent ! FinishedWaiting Behaviors.stopped case _ => Behaviors.same } private def eatingBehavior( parent : ActorRef[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = Behaviors.setup( ctx => { ctx.scheduleOnce( rndDuration( duration, plusOrMinus ), ctx.self, Think ) Behaviors.receiveMessage { case Think => parent ! FinishedEating Behaviors.stopped case _ => Behaviors.same } } ) private def thinkingBehavior( parent : ActorRef[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = Behaviors.setup( ctx => { ctx.scheduleOnce( rndDuration( duration, plusOrMinus ), ctx.self, Eat ) Behaviors.receiveMessage { case Eat => parent ! FinishedThinking Behaviors.stopped case _ => Behaviors.same } } ) private def tryingToEatBehavior( parent : ActorRef[ Command ] ) : Behavior[ Command ] = Behaviors.setup( ctx => { left ! Take( adapter ) right ! Take( adapter ) Behaviors.receiveMessage { case HandleChopstickAnswer( Busy( `left` ) ) => if ( takeRightChopstickSuccess.nonEmpty ) { if ( takeRightChopstickSuccess.get ) right ! Put( adapter ) ctx.log.info( "{} could not pick up {} so he put down {}", name, left.path.name, right.path.name ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None parent ! FailedToGetChopsticks Behaviors.stopped } else { takeLeftChopstickSuccess = Some( false ) ctx.log.info( "{} could not pick up {}", name, left.path.name ) Behaviors.same } case HandleChopstickAnswer( Busy( `right` ) ) => ... (same as above) ... // If one of the chopsticks was able to be taken successfully case HandleChopstickAnswer( Taken( `left` ) ) => if ( takeRightChopstickSuccess.nonEmpty ) { if ( !takeRightChopstickSuccess.get ) { left ! Put( adapter ) takeLeftChopstickSuccess = None takeRightChopstickSuccess = None parent ! FailedToGetChopsticks ctx.log.info( "{} picked up {} but because {} could not be picked up put it down", name, left.path.name, right.path.name ) Behaviors.stopped } else { parent ! GotChopsticks ctx.log.info( "{} picked up {}", name, left.path.name ) Behaviors.stopped } } else { takeLeftChopstickSuccess = Some( true ) ctx.log.info( "{} picked up {}", name, left.path.name ) Behaviors.same } case HandleChopstickAnswer( Taken( `right` ) ) => ... (same as above) ... case _ => Behaviors.same } } )
Now that we have put the initial behavior of each phase together with the message handler of that phase, we no longer have to make changes in multiple places when we want to change that initial behavior. By parameterizing the initial behavior, we can easily update how we use it by changing what parameters the parent supplies when spawning the child actor.
Despite this convenience, however, not only do we still need state to keep track of which phase the hakker is in, but now we need new message types to signal when each of the phases is complete. As the phases of an actor likes this grows in complexity, the number of data objects connected with the system will proliferate. Moreover, although it is still fairly readable, the form of the parent message handler is still far more complex than it needs to be given that it is essentially just “calling” each phase one after the other.
Breaking up behavior with the finite state machine (FSM)
Another way to break up the behavior is by defining separate behaviors corresponding to each phase and transitioning from one behavior to another in the message handlers. This is possible because each message handler is just a function that takes a message and returns a behavior. It is typical to return either Behaviors.same
or Behaviors.unhandled
, both of which continue the same behavior, but you can in fact return any behavior of the same message type. By returning a new behavior, this becomes the actor’s new behavior that will respond to any subsequent messages.
Akka refers to this pattern as the “finite state machine,” because each behavior is in effect a “state” of the actor. You can even use this pattern to simulate mutable state with immutable data by making the state type a parameter of the behavior:
trait CounterMessage case object Increment extends CounterMessage case object Decrement extends CounterMessage case object Peek extends CounterMessage def counter( value : Int ) : Behavior[ CounterMessage ] = Behaviors.receiveMessage { case Increment => counter( value + 1 ) case Decrement => counter( value - 1 ) case Peek => println( value ) Behaviors.same } scala> val counterSystem = ActorSystem( counter( 0 ), "counter" ) val counterSystem: akka.actor.typed.ActorSystem[CounterMessage] = akka://counter scala> counterSystem ! Increment scala> counterSystem ! Increment scala> counterSystem ! Increment scala> counterSystem ! Peek 3 scala> counterSystem ! Decrement scala> counterSystem ! Peek 1
By calling the same behavior with a new parameter “value”, this new value is what the actor will use when responding to subsequent messages as though it has a new state.
We can use this pattern to redesign our chopstick actor so that it is no longer necessary to maintain mutable state:
object Chopstick { def apply( ) : Behavior[ ChopstickMessage ] = available() private def takenBy( hakker : ActorRef[ ChopstickAnswer ] ) : Behavior[ ChopstickMessage ] = { Behaviors.receive { case (ctx, Take( otherHakker )) => otherHakker ! Busy( ctx.self ) Behaviors.same case (_, Put( `hakker` )) => available() case _ => // here and below it's left to be explicit about partial definition, // but can be omitted when Behaviors.receiveMessagePartial is in use Behaviors.unhandled } } private def available( ) : Behavior[ ChopstickMessage ] = { Behaviors.receivePartial { case (ctx, Take( hakker )) => hakker ! Taken( ctx.self ) takenBy( hakker ) } } }
Now the chopstick simply moves from the available
behavior to the takenBy
behavior, where the actor who took the chopstick is “stored” as an ActorRef
parameter.
For our purposes, however, the real utility of this pattern is that it breaks up complicated behavior into component parts without having to add the complexity of an additional governing actor. (In fact, “Dining Hakkers” is provided by Akka as an example of the finite state machine. The code in this section is pretty much all from that project.)
Most of the new behavior definitions are pretty straightforward. The waiting, thinking, and eating phases can each be defined with just a few lines and are mostly the same as they were in our previous implementations. Initialization can be handled by Behaviors.withTimers
, which schedules the messages needed to transition out of each phase and complete by calling that phase.
val waiting : Behavior[ Command ] = Behaviors.receiveMessagePartial { case Eat => ctx.log.info( "{} starts to think", name ) startThinking( ctx, 5.seconds, 3.seconds ) } private def startEating( ctx : ActorContext[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = { val dur = rndDuration( duration, plusOrMinus ) Behaviors.withTimers[ Command ] { timers => timers.startSingleTimer( Think, Think, dur ) eating } } private lazy val eating : Behavior[ Command ] = { Behaviors.receiveMessagePartial { case Think => ctx.log.info( "{} puts down his chopsticks and starts to think", name ) left ! Put( adapter ) right ! Put( adapter ) startThinking( ctx, 5.seconds, 3.seconds ) } } private def startThinking( ctx : ActorContext[ Command ], duration : FiniteDuration, plusOrMinus : FiniteDuration ) : Behavior[ Command ] = { val dur = rndDuration( duration, plusOrMinus ) Behaviors.withTimers[ Command ] { timers => timers.startSingleTimer( Eat, Eat, dur ) thinking } } private val thinking : Behavior[ Command ] = { Behaviors.receiveMessagePartial { case Eat => left ! Take( adapter ) right ! Take( adapter ) waitForFirstChopstick } }
The “try to eat” phase is a new case. While we could just copy the message handler from our original actor over to a separate behavior, we can greatly simplify the entire process by splitting the behavior into two phases:
private lazy val waitForFirstChopstick : Behavior[ Command ] = Behaviors.receiveMessagePartial { case HandleChopstickAnswer( Taken( `left` ) ) => waitForOtherChopstick( chopstickToWaitFor = right, takenChopstick = left ) case HandleChopstickAnswer( Taken( `right` ) ) => waitForOtherChopstick( chopstickToWaitFor = left, takenChopstick = right ) case HandleChopstickAnswer( Busy( _ ) ) => firstChopstickDenied } } private def waitForOtherChopstick( chopstickToWaitFor : ActorRef[ ChopstickMessage ], takenChopstick : ActorRef[ ChopstickMessage ] ) : Behavior[ Command ] = { Behaviors.receiveMessagePartial { case HandleChopstickAnswer( Taken( `chopstickToWaitFor` ) ) => ctx.log.info( "{} has picked up {} and {} and starts to eat", name, left.path.name, right.path.name ) startEating( ctx, 5.seconds, 3.seconds ) case HandleChopstickAnswer( Busy( `chopstickToWaitFor` ) ) => takenChopstick ! Put( adapter ) startThinking( ctx, 10.milliseconds, 5.seconds ) } } private lazy val firstChopstickDenied : Behavior[ Command ] = { Behaviors.receiveMessagePartial { case HandleChopstickAnswer( Taken( chopstick ) ) => chopstick ! Put( adapter ) startThinking( ctx, 10.seconds, 5.seconds ) case HandleChopstickAnswer( Busy( _ ) ) => startThinking( ctx, 10.seconds, 5.seconds ) }
By separating the “try to eat” phase into a behavior that handles the first chopstick response (whether it happens to be from the chopstick on the right or the left) and then switches to a separate behavior that handles the second chopstick’s response, we don’t have to go through all the different possible combinations of responses from the left and right chopsticks.
Besides allowing us to avoid complicated state-handling, this solution provides a more graceful separation of the individual phase behaviors than spawning child actors did. However, we now lose the ability to define the phase-transition logic separately from the individual behavior phases. Each phase must handle its own transitions to other phases; it therefore does not really provide the logical separation we are looking for.
A new solution
At this point, it would be helpful to lay out what exactly we require. We want to be able to:
1) Separate the behavior phases of the hakker actor, keeping any initial or final behavior like self-messaging or logging that logically belong to a particular phase together with that phase’s message handler.
2) Define the transitions between the phases separately from their individual behaviors, in a way that is easy to read and easy to refactor.
#2 is really what is missing at this point. We have figured out ways to isolate the phases of behavior of our actors, but so far we have only been able to control their transitions either within their own message handlers, breaking the separation of concerns we are seeking, or through another actor. There is no reason, however, why another actor should be controlling what is really just procedural logic. We should not need to define and handle endless new message types just to accomplish basic flow control.
What we really want, then, is to describe each of our behavior phases as statements, or functions, that can be called one after the other in an imperative fashion. We would like to be able to compose the behavior of the hakker more or less as follows:
wait() while( true ) { val gotChopsticks = getChopsticks() if ( gotChopsticks ) eat() else think() }
The above snippet provides a clearer and simpler description of the hakker’s behavior than any of the actor designs we have come up with so far. Moreover, it would make or updating the behavior far simpler: we could add or remove phases just by adding or removing a line, without having to create or remove message or state types simply for flow control and without having to spawn unnecessary actors.
Is this possible though? The above snippet describes each of the behavior phases as a blocking function with or without a return value. But behaviors are not really blocking functions but an abstraction used in a complicated process orchestrated behind the scenes by Akka. If we cannot make behaviors into blocking functions, however, perhaps we can at least find a way to describe them in an imperative style similar in form to a blocking function.
For anyone familiar with functional programming and the problem of dealing with effects, a light- bulb should go off at this point: if we need to describe the composition of pure functions in an imperative style, we need a monad. For anyone unfamiliar with functional programming or monads, don’t worry. We’re not going down the rabbit hole of esoteric functional magic. Basically, we’re just going to show how using flatMap
in a non-obvious way can accomplish something really useful. It’s worth keeping in mind, however, that this is just one case of a pattern used all over the place in functional programming called the “monad.” If you find this example interesting or helpful and want to learn more about monads, a good place to do so is here: http://learnyouahaskell.com/chapters.
With that said, here is how we will proceed: rather than working directly with Akka’s Behavior
type, we are going to define a new type very similar to the Akka behavior. This type will describe a single phase of a behavior. Like Akka’s Behavior
, it will involve defining a message handler. Unlike Behavior
, however, the message handler will not return a new behavior but a custom return type.
This return type will have to capture two cases: after each message, the phase will either continue or end. In keeping with our description of phases as a blocking function, it will end by returning some value (which can be Unit
). We can describe these cases as follows
sealed trait BehaviorDecision[ +T ] case object Stay extends BehaviorDecision[ Nothing ] case class Return[ T ]( value : T ) extends BehaviorDecision[ T ]
NB: the version of this code on github includes another type parameter StateType
; this is to support a version of the class we’re designing that allows you to update state using a third StayWith( newState )
behavior decision type. In our use case this won’t be necessary, so I have ommitted it to keep things simpler
The message handler for the “waiting” phase will now look something like this:
val waitForDinner = ... { case Eat => Return() case Think => Return() case _ => Stay() }
When the actor receives an Eat
or a Think
message, the phase will end with an empty return value (i.e., a return value of Unit
). Any other messages will do nothing, and the actor will remain in the same phase. This allows us to interpret the behavior without reference to what comes after: as far as waitForDinner
is concerned, all it needs to know is that an Eat
or Think
message causes it to end, and all other messages are ignored.
How, then, do we design this abstraction such that we can convert it, somehow, into an actual behavior? It’s clear that each behavior phase will have to be “completed” to produce an actual Akka behavior. The Stay
return type can in all cases be replaced with Behaviors.same
, but Return( x )
will have to be replaced using some function that converts it to an actual behavior. Since the Return case class is just a wrapper for x
, we really just need a function to convert x
to a new behavior.
In general terms, then, our new type, which we will call BehaviorPhase
, can be defined by some method, which we will call behavior
, that takes a function that converts a given “return type” to a an Akka Behavior
thus returning a “completed” Behavior
:
trait BehaviorPhase[ MessageType, +T ] { def behavior( complete : T => Behavior[ MessageType ] )( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] def behavior( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = behavior( _ => Behaviors.same ) }
We have also provided an implemented overload of behavior
to provide a default way of completing the behavior, which simply ignores the return value and completes the phase with Behaviors.same
.
Note that each BehaviorPhase
must have two type parameters: the message type of the Behavior
it will produce and its return type (T
). Note also that its behavior
methods include an implicit ActorContext
parameter. This will be necessary to construct behavior phases that can do things like spawn actors, access references to itself, use Akka’s logging, and various other Akka capabilities.
We can partially implement this using a message handler as follows:
trait HandlerBehaviorPhase[ MessageType, +T ] extends BehaviorPhase[ MessageType, T ] { val partialHandler : ActorContext[ MessageType ] => PartialFunction[ MessageType, BehaviorDecision[ T ] ] override final def behavior( returnBehavior : T => Behavior[ MessageType ] ) ( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = { Behaviors.receiveMessage { msg : MessageType => val partialHandlerFn : PartialFunction[ MessageType, BehaviorDecision[ T ] ] = { partialHandler( ctx ) orElse ( { case _ => Stay } ) } partialHandlerFn( msg ) match { case Stay => Behaviors.same case Return( value ) => returnBehavior( value ) } } } }
Having defined an abstract value partialHandler
that will contain the behavior phase’s message handler, we can now implement behavior
relative to the handler. This is simply a matter of creating a behavior using Behaviors.receiveMessage
, calling our own handler on the message, and then translating the BehaviorDecision return value into an Akka Behavior
by passing it to the complete
function.
Note that we have made the handler a “curried” partial function of ActorContext
(that is, it is a function of ActorContext
that returns another function which is the message handler). While the type signature looks sort of crazy, it just means that we will define our handlers like this:
override val handler = ctx => { case MsgType => ctx.log.info( ... ) ... }
This way of constructing the message handler gives us access to the actor system context when we define our message responses.
We can now define a factory method to generate a BehaviorPhase
by passing a handler:
object BehaviorPhase { def fromHandler[ MessageType, T ]( handler : ActorContext[ MessageType ] => PartialFunction[ MessageType, BehaviorDecision[ T ] ] ) : BehaviorPhase[ MessageType, T ] = { val handlerIn = handler new HandlerBehaviorPhase[ MessageType, T ] { override def handler : ActorContext[ MessageType ] => PartialFunction[ MessageType, BehaviorDecision[ T ] ] = handlerIn } } }
And finally, we can now define the behavior phases of our hakker actor:
private lazy val waitForDinner : BehaviorPhase[ Command, Unit ] = BehaviorPhase.fromHandler[ Command, Unit ]( _ => { case Eat => Return() } ) private def thinkFor( duration : FiniteDuration, plusOrMinus : FiniteDuration ) : BehaviorPhase[ Command, Unit ] = { BehaviorPhase.fromHandler[ Command, Unit ]( ctx => { case Eat => Return() case _ => Stay } } private lazy val waitForFirstChopstick = { BehaviorPhase.fromHandler[ Command, Either[ ActorRef[ ChopstickMessage ], ActorRef[ ChopstickMessage ] ] ]( ctx => { case HandleChopstickAnswer( Taken( cs ) ) if cs == left || cs == right => ctx.log.info( "{} picked up {}", name, cs.path.name ) Return( Right( cs ) ) case HandleChopstickAnswer( Busy( cs ) ) if cs == left || cs == right => ctx.log.info( "{} was unable to pick up {}", name, cs.path.name ) Return( Left( cs ) ) case _ => Stay } ) } private def waitForSecondChopstick( first : ActorRef[ ChopstickMessage ], firstSucceeded : Boolean ) : BehaviorPhase[Command, Boolean ] = { BehaviorPhase.fromHandler[ Command, Boolean ]( ctx => { case HandleChopstickAnswer( Taken( cs ) ) if ( cs == left || cs == right ) && cs != first => if ( firstSucceeded ) { ctx.log.info( "{} also picked up {}", name, cs.path.name ) Return( true ) } else { val adapter = ctx.messageAdapter(HandleChopstickAnswer) cs ! Put( adapter ) ctx.log.info( "{} picked up {} but put it back down because he was unable to pick up {}", name, cs.path.name, first.path.name ) Return( false ) } case HandleChopstickAnswer( Busy( cs ) ) if ( cs == left || cs == right ) && cs != first => if ( firstSucceeded ) { first ! Put( ctx.messageAdapter( HandleChopstickAnswer ) ) ctx.log.info( "{} was unable to pick up {} so he put down {}", name, cs.path.name, first.path.name ) } else ctx.log.info( "{} was also unable to pick up {}", name, cs.path.name ) Return( false ) case _ => Stay } ) } private def eatFor( duration: FiniteDuration, plusOrMinus : FiniteDuration ) : BehaviorPhase[ Command, Unit ] = { BehaviorPhase.fromHandler[ Command, Unit ]( ctx => { case Think => val adapter = ctx.messageAdapter(HandleChopstickAnswer) ctx.log.info("{} stops eating and puts down his chopsticks", name) left ! Put(adapter) right ! Put(adapter) Return() case _ => Stay } ) }
So far our BehaviorPhase
s look a lot like the behaviors we defined using the FSM pattern. The main difference is that now we leave out any reference to other behaviors. For most of our BehaviorPhase
s, we simply return Unit
when the phase is complete. The first behavior of the “try to eat” phase (getFirstChopstick
), however, returns an Either
of ActorRef[ ChopstickMessage ]
on both the right and the left. The Left
result, in keeping with convention, represents a “failure”, and the Right
result corresponds with success. In both cases we need to know which chopstick the response was from. The second behavior of the “try to eat” phase (getSecondChopstick
) accepts the actor ref of the first chopstick to respond, as well as a boolean to indicate if that first chopstick was a failure or a success, and returns a boolean to indicate if the second was a success.
Linking phases with flatMap
It is clear enough how these methods are supposed to fit together: we will have to call getFirstChopstick
and match on the result. If it is a Left( ref )
(failure) we will call getSecondChopstick( ref, false )
and if it is a Right( ref )
(success) we will call getSecondChopstick( ref, true )
. Note that we have decoupled the return type of the former from the parameter of the second: if we want to refactor either of these, we will only have to change how we call the second function after we get the return value of the first.
But how on earth do we link these phases together? This is where flatMap
comes in. flatMap
is a method that maps the return value of a type to that same type with another return value, and “flattens” the result. For instance, flatMap
on a list maps the elements of a list to another list, and then flattens the list of lists into a single list:
scala> val list = List( 1, 2, 3, 4 ) val list: List[Int] = List(1, 2, 3, 4) scala> val newList = list.flatMap( i => List( i, i * 10, i * 100, i * 1000 ) ) val newList: List[Int] = List(1, 10, 100, 1000, 2, 20, 200, 2000, 3, 30, 300, 3000, 4, 40, 400, 4000)
map
and flatMap
need not be confined to collections, however. They can be used for most “contextual” types, or types that “wrap” some value or some process that produces a value, like Option
or Future
. flatMap
called on a Future
, for instance, allows you to generate a second Future
from the successfully completed return value of the first and then “flattens” them into a single future that combines the two processes. Similarly, we can define a flatMap
method on BehaviorPhase
that maps the return value of one BehaviorPhase
to another BehaviorPhase
and “flattens” them into a new BehaviorPhase
whose behavior is to transition from the original to the mapped BehaviorPhase
when the original returns:
trait BehaviorPhase[ MessageType, +T ] extends BehaviorPhase[ MessageType, T ] { def behavior( complete : T => Behavior[ MessageType ] ) : Behavior[ MessageType ] = ... def flatMap[ B ]( fn : T => BehaviorPhase[ MessageType, B ] ) : BehaviorPhase[ MessageType, B ] = { def originalBehavior( completeFn : T => Behavior[ MessageType ] )( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = behavior( completeFn ) new BehaviorPhase[ MessageType, U ] { override def behavior( retBehav : U => Behavior[ MessageType ] )( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = { originalBehavior( t => fn( t ).behavior( retBehav ) ) } } } def map[ B ]( fn : T => B ) : BehaviorPhase[ MessageType, B ] = { def originalBehavior( completeFn : T => Behavior[ MessageType ] )( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = behavior( completeFn ) new BehaviorPhase[ MessageType, U ] { override def behavior( retBehav : U => Behavior[ MessageType ] ) ( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = { originalBehavior( t => retBehav( fn( t ) ) ) } } } }
Basically, our version of flatMap
generates a new BehaviorPhase
instance whose behavior is now “completed” by the behavior of the BehaviorPhase generated by the function passed to it.
Using BehaviorPhase.flatMap
, we can now chain phases together like so:
val waitThenThink = waitForDinner.flatMap( _ => thinkFor( 5.seconds, 3.seconds ) )
This composed BehaviorPhase
now describes a behavior that waits until it receives an Eat
message and then transitions to thinking for a few seconds before it “returns.”
The more complicated case of the “try to eat” phase can be expressed this way:
val tryToEat = waitForFirstChopstick flatMap { case Left( firstChopstick ) => waitForSecondChopstick( firstChopstick, firstSucceeded = false ) case Right( firstChopstick ) => waitForSecondChopstick( firstChopstick, firstSucceeded = true ) }
Not too shabby! We are now in a position to define our entire hakker behavior. But this is not exactly the imperative style we were shooting for. Trying to compose lots of these phases gets pretty ugly:
val combinedHakkerBehavior = { waitForDinner.flatMap( _ => { thinkFor( 5.seconds, 3.seconds ).flatMap( _ => { tryToEat.flatMap( (gotChopsticks : Boolean) => { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } ) } ) } ) }
Can we do better? It turns out we can. You may have noticed that in addition to flatMap
, we also defined a map
method, which simply transforms the return value of the phase. Having both map
and flatMap
allows us to use scala’s for
comprehension, which is just syntactic sugar for chaining nested calls to flatMap
and transforming the result with map
:
for { i <- list1 j <- list2 k <- list3 } yield i + j + k // equivalent to: list1.flatMap( i => { list2.flatMap( j => { list3.map( k => i + j + k ) } ) } )
If we apply this to our BehaviorPhase
s, we end up with something along the lines of:
val combinedBehavior = for { res1 <- behavior1 _ <- behavior2( res ) // ignore the result of behavior2 } yield ()
For anyone used to using for
in the traditional way, as a way of iterating over ranges or collections, this usage may be bewildering. In the functional programming world, such iteration is simply a special case of a more general pattern of nested flatMaps
. In our case, rather than combining and flattening collections in order to iterate over all their permutations, flatMap
combines and flattens BehaviorPhase
s in order to generate a sequence of phases.
This more general use of the for
notation is a convenient way for functional programming languages to compose functions in a way that mimics “imperative” programming. The expression to the right of the <-
operator is like an imperative call to a blocking, impure function, and the parameter to the left of the <-
operator is like a variable storing the result of that function. (If the result should be ignored, you can simply put a _
instead of a named parameter.)
Using a for
comprehension in this way, we can now set down the entire sequence of behaviors of our hakker actor:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } } yield ()
Logic and loops: more functional magic
So far so good, but the astute reader might notice that there are a few serious problems with how we have built our behavior. In the first place, we have omitted all of the initial behaviors of each phases, such as the reaching out for chopsticks to kick off the “try to eat” phase, or the scheduling of self-messages to limit the duration of the “think” and “eat” phases. Because we are defining our behavior only by message handling, we have no means of accomplishing such actions except when a message has actually been received.
Actions without message-handling
To solve this problem, we will need a kind of BehaviorPhase
that does not handle messages, but just “does” something and then returns. This can be implemented fairly easily as follows:
case class Do[ MessageType, +T ]( action : ActorContext[ MessageType ] => T ) extends BehaviorPhase[ MessageType, T ] { override def behavior( returnBehavior : T => Behavior[ MessageType ] ) ( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = { val returnVal = action( ctx ) returnBehavior( returnVal ) } }
This new Do
class will not wait for messages, but will simply call a function we pass to it and return the value. We can link this execution to actual message handling behaviors by calling it first in a for
comprehension:
val initializedPhase = for { _ <- Do[ SomeMessageType, Unit ]( ctx => { ctx.self ! SomeMessage ctx.log.info( "initialized!" ) } ) _ <- messageHandlingPhase }
We can now redefine Hakker
‘s behavior phases so that they Do
what they need to before they start handling messages:
private lazy val waitForDinner : BehaviorPhase[ Command, Unit ] = BehaviorPhase.fromHandler[ Command, Unit ]( _ => { case Eat => Return() } ) private def thinkFor( duration : FiniteDuration, plusOrMinus : FiniteDuration ) : BehaviorPhase[ Command, Unit ] = { for { _ <- Do[ Command, Unit ]( ctx => { val dur = rndDuration( duration, plusOrMinus ) ctx.log.info( "{} starts to think for {}", name, dur.toString ) ctx.scheduleOnce( dur, ctx.self, Eat ) } ) _ <- BehaviorPhase.fromHandler[ Command, Unit ]( ctx => { case Eat => Return() case _ => Stay } ) } yield () } private lazy val getChopsticks = for { _ <- Do[ Command, Unit ]( ctx => { val adapter = ctx.messageAdapter( HandleChopstickAnswer ) ctx.log.info( "{} reaches for his chopsticks", name ) left ! Take( adapter ) right ! Take( adapter ) } ) firstResult <- waitForFirstChopstick secondResult <- firstResult match { case Left( cs ) => waitForSecondChopstick( cs, firstSucceeded = false ) case Right( cs ) => waitForSecondChopstick( cs, firstSucceeded = true ) } } yield secondResult private lazy val waitForFirstChopstick = ... private def waitForSecondChopstick( first : ActorRef[ ChopstickMessage ], firstSucceeded : Boolean ) : BehaviorPhase[Command, Boolean ] = ... private def eatFor( duration: FiniteDuration, plusOrMinus : FiniteDuration ) : BehaviorPhase[ Command, Unit ] = { for { _ <- Do[ Command, Unit ]( ctx => { val dur = rndDuration( duration, plusOrMinus ) ctx.log.info( "{} starts to eat for {}", name, dur.toString ) ctx.scheduleOnce( dur, ctx.self, Think ) } ) _ <- BehaviorPhase.fromHandler[ Command, Unit ]( ctx => { case Think => val adapter = ctx.messageAdapter( HandleChopstickAnswer ) ctx.log.info("{} stops eating and puts down his chopsticks", name) left ! Put(adapter) right ! Put(adapter) Return() case _ => Stay } ) } yield () }
Looping
This takes care of one problem. Another glaring problem, however, is that even when we have all the correct initial behaviors in place for each phase, our composition of all of these phases does not repeat! This becomes clear when we try to convert our BehaviorPhase
into an actual Behavior
using the behavior
method. The default version of that method will call Behaviors.same
after the last phase completes. This means we will end up with a behavior that goes through all of the phases we have defined and then remains forever in the final behavior, which will either be eat
or think
.
A simple way to make the behavior loop is to complete it with a reference to itself. We can do so in one of two ways:
// Method #1 val combinedHakkerBehavior = for { _ <- waitForDinner _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } } yield () lazy val hakkerBehavior = combinedHakkerBehavior.behavior( _ => hakkerBehavior ) // Method #2 lazy val combinedHakkerBehavior = for { _ <- waitForDinner _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } _ <- combinedHakkerBehavior } yield () val hakkerBehavior = combinedHakkerBehavior.behavior
In the first case we “complete” the behavior
method with the result of the behavior
method, generating a recursive loop. In the second case, we insert combinedHakkerBehavior
— the result of our phase composition — as the last phase within that composition. Since combinedHakkerBehavior
is also a BehaviorPhase
, it can be composed with itself recursively via flatMap
. The resulting composition, though it has a return type (Unit
), will never actually complete. Any phases added to the composition after combinedHakkerBehavior
would never be reached. (Note that to reference a val
recursively, you must make it lazy
. Otherwise Scala will try to use its value in its definition before that value has actually been evaluated, resulting in a null pointer exception.)
Both methods will produce the same result. But what is that result? Now the dining hakker will wait, think, try to eat, either eat or think again, and then … wait again. The hakker will leave the dinner altogether and will not return without an externally generated Eat
message. It turns out we don’t want the hakker to loop its entire behavior, but only a portion of it.
One way to deal with this problem is to define the inner cycle separately from the behavior as a whole:
lazy val innerLoop = for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } _ <- innerLoop } yield () val combinedHakkerBehavior = for { _ <- waitForDinner _ <- innerLoop } yield () val hakkerBehavior = combinedHakkerBehavior.behavior
While this works, however, it is a little disappointing. It would be nice to be able to express the entire the behavior in a single expression, without having to break it apart if we don’t want to. Is there way to loop without predefining the phase we are looping?
It turns out there is. We can add a loop
method to BehaviorPhase
to handle the looping for us, which will allow us to generate a loop inside a for comprehension without having to predefine it:
trait BehaviorPhase[ MessageType, +T ] { ... def loop : BehaviorPhase[ MessageType, Unit ] = flatMap( _ => loop ) ... }
By adding the recursive looping functionality to BehaviorPhase
itself, we can easily set apart a section of our for
comprehension to loop endlessly as follows:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else thinkFor( 10.seconds, 5.seconds ) } } yield () ).loop } yield ()
If you have experience dealing with recursive functions you might be wondering if this method of looping is stack-safe. Because the recursive call in the loop
method is made from within a call to flatMap
, it does not support tail call elimination. Nor have we set up a trampoline to convert it to a tail-recursive form. Will our actor eventually produce a stack overflow error after repeating its behavior for a while? It turns out that the way Akka manages behavior transitions already handles this problem. Akka does not keep a call stack of the behaviors, but simply replaces them as they are called. In other words, its behavior construction is already trampolined. Since our flatMap
method uses this form of transitioning to link our phases together, any recursion using flatMap
will be stack-safe.
Optional phases
Our new composition has one remaining problem that needs to be fixed. Since the loop begins with a “think” phase, so long as tryToEat
succeeds, the hakker will cycle between “eat” and “think.” If tryToEat
fails, however, the loop completes with another “think” phase. This means that thinkFor
will be called twice in a row whenever tryToEat
fails. What we’d like to do is replace the if ... else
clause with a single if
statement so that if gotChopsticks
is false
it just skips to the next phase. This is impossible, however, because each line of the for
comprehension must return a BehaviorPhase
. To get around this, then, we’ll need a BehaviorPhase
that does not handle messages but simply skips to the next phase. This could be accomplished using Do
, if we just pass an empty function: Do[ Command, Unit ]( _ => () )
. But this makes it look like we’re trying to “do” something, which is misleading, so let’s make a special case called Skip
which is expressly for the purpose of skipping to the next phase without any action:
case class Skip[ MessageType, +T ]( returnVal : T ) extends BehaviorPhase[ MessageType, T ] { override def behavior( returnBehavior : T => Behavior[ MessageType ] ) ( implicit ctx : ActorContext[ MessageType ] ) : Behavior[ MessageType ] = returnBehavior( returnVal ) override def map[ U ]( fn : T => U ) : BehaviorPhase[ MessageType, U ] = Skip( fn( returnVal ) ) override def flatMap[ U ]( fn : T => BehaviorPhase[ MessageType, U ] ) : BehaviorPhase[ MessageType, U ] = { fn( returnVal ) } }
We can now fix our loop by replacing the last call to thinkFor
with a Skip
:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else Skip[ Command, Unit ]() } } yield () ).loop } yield ()
By allowing you to set its return value, Skip
can be used in if ... else
and match
blocks without conflicting with the return type of the other BehaviorPhases
returned in the block.
We can now use Skip
to add optional phases. For instance, say instead of beginning to eat right after waiting, we want to make it possible either to eat right away or to wait. We can use the rndBool()
method we defined at the beginning to make this decision randomly. We only need to add another line before the loop that calls thinkFor
or Skip
based on a rndBool()
result:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- { val shouldThinkFirst = rndBool() if ( shouldThinkFirst ) thinkFor( 4.seconds, plusOrMinus = 3.seconds ) else Skip[ Command, Unit ]() } _ <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else Skip[ Command, Unit ]() } } yield () ).loop } yield ()
More loops and do-while
Returning to our looping functionality, now that we have started implementing it in BehaviorPhase
, we can come up with ways to exercise even more finely grained control over the flow of our behavior. For instance, we can overload loop
with an integer parameter to repeat a behavior phase a finite number of times:
trait BehaviorPhase[ MessageType, +T ] { ... def loop( times : Int ) : BehaviorPhase[ MessageType, T ] = { if ( times < 1 ) throw new IllegalArgumentException( "Cannot loop less than 1 time" ) else if ( times == 1 ) this else flatMap( _ => loop( times - 1 ) ) } ... }
Now, with minimal changes to the code and without having to touch any of the message handlers we can change the hakker from forever cycling between thinking and eating to cycling only a given number of times until it gets tired and stops:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- { val shouldThinkFirst = rndBool() if ( shouldThinkFirst ) thinkFor( 4.seconds, plusOrMinus = 3.seconds ) else Skip[ Command, Unit ]() } _ <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else Skip[ Command, Unit ]() } } yield () ).loop( 20 ) _ <- Do[ Command, Unit ]( ctx => ctx.log.info( "{} got tired and left the dinner", name ) ) } yield ()
What if we want the hakker to continue the cycle only under certain conditions? This would be equivalent to a do...while
pattern in imperative programming. Why not add a doWhile
method to BehaviorPhase
?
trait BehaviorPhase[ MessageType, +T ] { ... def doWhile( condition : T => Boolean ) : BehaviorPhase[ MessageType, T ] = { this.flatMap( t => { if ( condition( t ) ) doWhile( condition ) else Skip[ MessageType, T ]( t ) } ) } ... }
Our doWhile
method uses the return value of the phase to determine whether or not to loop. We could use this to create a behavior where the hakker tries to eat, and upon failing, takes a break to think, and then tries again and repeats as many times as it takes to succeed, and then leaves the dinner. All we have to do is yield
the boolean result of tryToEat
, which we have been naming gotChopsticks
, and use that as condition for doWhile
:
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- { val shouldThinkFirst = rndBool() if ( shouldThinkFirst ) thinkFor( 4.seconds, plusOrMinus = 3.seconds ) else Skip[ Command, Unit ]() } _ <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else Skip[ Command, Unit ]() } } yield gotChopsticks ).doWhile( res => !res ) _ <- Do[ Command, Unit ]( ctx => ctx.log.info( "{} finally ate and went home", name ) ) } yield ()
Keeping state across loops: fold
But what about this more subtle alteration of the hakker’s behavior: let’s have the hakker cycle endlessly between eating and thinking, just as it did before, but when it fails to eat three times in a row, have it complain before continuing? This would require keeping track of phase results across multiple iterations of a loop. Is this possible without introducing mutable state to our Hakker
class?
Yes it is! We can make a new method just like doWhile
, but instead of simply flatMap
ping to the same method again, it will first recalculate the state we want to track and then pass the new value to the next the iteration of the method. What we are talking about is, in functional programming terms, a fold
operation. We can define versions of fold
that operate over a regular loop
or a doWhile
loop:
trait BehaviorPhase[ MessageType, +T ] { ... def fold[ B ]( init : B )( fn : ((B, T)) => B )( times : Int ) : BehaviorPhase[ MessageType, B ] = { if ( times < 0 ) throw new IllegalArgumentException( "Cannot loopFold less than 0 times" ) else if ( times == 0 ) Skip( init ) else if ( times == 1 ) map( t => fn( (init, t) ) ) else map( t => fn( (init, t) ) ).flatMap( b => fold( b )( fn )( times - 1 ) ) } def foldWhile[ B ]( init : B )( foldFn : (B, T) => B )( condition : (T, B) => Boolean ) : BehaviorPhase[ MessageType, B ] = { this.flatMap( t => { val b = foldFn( init, t ) if ( condition( t, b ) ) foldWhile( b )( foldFn )( condition ) else Skip[ MessageType, B ]( b ) } ) } ... }
Each of these methods takes a initial fold value along with a function that generates a new folded value from both the prior folded value and the return value of behavior phase. In the case of the regular fold
method, you then just provide the number of times to loop, and the resulting behavior will return the folded state after iterating over the behavior that many times. In the case of foldWhile
, you provide a condition
function that will determine whether to keep looping based on either the phase’s return value, the folded value, or both. Here too the new return value is the folded value. (It wouldn’t make sense to fold
over an infinite loop, since if the loop didn’t end we would never be able to use the folded value.)
We can use foldWhile
to break out of the loop after three failures by doing the following:
behaviorLoop.foldWhile( 0 )( ( i, res ) => if ( res ) 0 else i + 1 )( ( _, i : Int ) => i < 3 )
The folded loop will start out with an initial folded value of 0. If the result of tryToEat
is false, it will increment the folded value by 1; if the result is true (success) it will reset it back to 0. The “while” condition simply checks if the folded value is less than 3 (it ignores the BehaviorPhase
‘s return value). Thus as soon as the folded value reaches three, which will only happen if there are three failures in a row, it will break out of the loop.
We can now add a Do
phase right after the foldWhile
loop to log a complaint, and then wrap the whole thing in another loop. By extracting the folded value numTimes
from the foldWhile
loop, we can use this value in the hakker’s complaint. This way, when we want to change the threshold for complaining — say, from three failures in a row to ten — we only need to change it in foldWhile
‘s condition
parameter and not also in the Do
statement.
val combinedHakkerBehavior = for { _ <- waitForDinner _ <- { val shouldThinkFirst = rndBool() if ( shouldThinkFirst ) thinkFor( 4.seconds, plusOrMinus = 3.seconds ) else Skip[ Command, Unit ]() } _ <- ( for { numTimes <- ( for { _ <- thinkFor( 5.seconds, 3.seconds ) gotChopsticks <- tryToEat _ <- { if ( gotChopsticks ) eatFor( 5.seconds, 3.seconds ) else Skip[ Command, Unit ]() } } yield gotChopsticks ) .foldWhile( 0 )( ( i, res ) => if ( res ) 0 else i + 1 )( ( _, i : Int ) => i < 3 ) _ <- Do[ Command, Unit ]( ctx => { ctx.log.info( "{} curses its fate after failing to eat {} times in a row!", name, numTimes ) } ) } yield () ).loop } yield ()
Conclusion
With Do
, Skip
, loop
, doWhile
, fold
, and foldWhile
, we can now compose actor behaviors using pretty much any of the procedural logic we would use in imperative programming. It should be clear, moreover, how useful this kind of composition can be when dealing with actors that have procedural behaviors of the sort we have considered. By separating their procedural logic from their message-handling, we can understand their procedural logic more clearly by simply examining the for
comprehension that composes them, and we can make all kinds of changes to their procedural logic without touching or even thinking about their message handlers.
Each of the above modifications to the procedural logic — changing from a permanent loop to a finite loop, to a conditional loop, to a conditional loop that aggregates state, and adding a new conditional phase — would have been tiresome refactoring efforts if we had to delve into the component behaviors. Each refactor would have required tracing the logic among all of the component behaviors and making sure the new links were in the right places. This process would not only be harder and more time-consuming, but also far more likely to introduce bugs.
Treating each of the behavior phases as a function that returns a value, moreover, provides useful constraints to our design that make it easier to reason both about each phase in isolation as well the various phases’ combined behavior. Having to return a value forces us to construct each phase in relation to a single type of outcome which is made explicit in the type signature of the phase. When we think about how to handle a given message type, we are forced to think about how that message contributes to the required outcome. That outcome’s type, moreover, provides constraints on what the actor can subsequently “become” once a phase is complete, as opposed to regular Akka behaviors which can return any other behavior as long as it has the same message type.
In short, by encapsulating Akka behaviors in BehaviorPhase
monads, we make it easier to reason locally about them, rather than always having to keep track of how each behavior interacts with every other one.
You can find the code for this post on github here: https://github.com/johnhungerford/akka-util. It includes most of the versions of DiningHakkers
that are discussed above along with a command line program that allows you to choose which implementation to run. It also includes a more elaborate version of BehaviorPhase
that includes a stateful implementation in addition to a few more convenience methods.