home / 2018.10.11 21:30 / scala / akka / cluster / spring

Distributed Akka Deployment

This posting will explore the configuration necessary to run an Akka system distributed across multiple machines.

Example Application

We’ll need a program that simulates a master actor starting one or more worker actors and periodically sending them work to do. We’ll also need a way to simulate the worker actors doing an expensive job (one that takes some amount of time). Once we have all this, we’ll look into how the system behaves, how we create more worker actors locally to improve the performance of the app, and finally how we can run this app on multiple processes/machines and how parallelization works in those conditions.

As with previous example, I am using Gradle and Spring Boot to build the project (but will look into SBT and a lighter/scala-focused build environment in a future investigation). The build.gradle file is listed below:

buildscript {
    ext {
        springBootVersion = '2.0.5.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

apply plugin: 'java'
apply plugin: 'scala'
apply plugin: 'eclipse'
apply plugin: 'org.springframework.boot'
apply plugin: 'io.spring.dependency-management'

group = 'com.cacoveanu.akka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = 1.8

sourceSets.main.scala.srcDir "src/main/java"
sourceSets.main.java.srcDirs = []
sourceSets.test.scala.srcDir "src/test/java"
sourceSets.test.java.srcDirs = []

configurations {
    scalaCompilerPlugins { transitive = false }
}

repositories {
    mavenCentral()
}

dependencies {
    implementation('org.scala-lang:scala-library:2.12.5')
    implementation('org.springframework.boot:spring-boot-starter-web')
    compile('com.typesafe.akka:akka-actor_2.12:2.5.11')
    testImplementation('org.springframework.boot:spring-boot-starter-test')
}

When we start the Spring Application, we also create the actor system and the master actor through the Spring configuration:

package com.cacoveanu.akka.distributed

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.context.annotation.Bean
import akka.actor.{ActorRef, ActorSystem, Props}
import com.cacoveanu.akka.distributed.actors.{MasterActor, WorkerActor}

@SpringBootApplication
class Configuration {

  @Bean
  def createActorsSystem(): ActorSystem = {
    ActorSystem("distributedSystem")
  }

  @Bean(Array("masterActor"))
  def createMasterActor(actorSystem: ActorSystem): ActorRef = {
    actorSystem.actorOf(Props.create(classOf[MasterActor]), "masterActor")
  }

}

object DistributedApplication {

  def main(args: Array[String]): Unit = {
    SpringApplication.run(classOf[Configuration])
  }

}

In our master actor we use timers to introduce a pause between processing batches. As soon as the processing is done with one batch, we schedule the start of the next batch. Batches of data that need to be processed are created randomly. At first, we will only create one worker actor that our master communicates with. We also monitor how much time it takes for each batch to get processed. As we parallelize this process we can expect the processing time to decrease.

package com.cacoveanu.akka.distributed.actors

import akka.actor.{Actor, Timers}
import com.cacoveanu.akka.distributed.actors.WorkerActor.{Result, WorkOn}
import org.slf4j.LoggerFactory

import scala.concurrent.duration._
import scala.util.Random

object MasterActor {
  case object Start
  case object ScheduleRestart
}

class MasterActor extends Actor with Timers {
  import MasterActor._

  private val logger = LoggerFactory.getLogger(classOf[MasterActor])

  timers.startSingleTimer("start", Start, 500 millis)

  private var startTime = 0L
  private var runningJobs = 0

  private val worker = context.actorOf(WorkerActor.props(500))

  private def getDataToProcess(): Iterable[String] =
    //Seq.fill(1 + Random.nextInt(20))(System.nanoTime).map(l => l.toString)
    Seq.fill(20)(System.nanoTime).map(l => l.toString)

  override def receive: Receive = {
    case Start =>
      val data = getDataToProcess()
      runningJobs = data.size
      logger.info(s"starting $runningJobs jobs")
      if (runningJobs > 0) {
        startTime = System.currentTimeMillis()
        data.foreach(m => worker ! WorkOn(m))
      }
      else self ! ScheduleRestart

    case Result(message) =>
      logger.info(s"done work, result: $message")
      runningJobs = runningJobs - 1
      if (runningJobs == 0) self ! ScheduleRestart

    case ScheduleRestart =>
      logger.info(s"processing of the batch took ${System.currentTimeMillis() - startTime} milliseconds")
      logger.info("schedulling master actor restart")
      timers.startSingleTimer("start again", Start, 500 millis)
  }
}

In the worker actor we simulate long-lasting operations by puttin the thread to sleep. Once the work is done, we send the result message back to the master. The pause of the worker is configurable.

package com.cacoveanu.akka.distributed.actors

import akka.actor.{Actor, Props, Timers}

object WorkerActor {
  def props(pause: Long): Props = Props(new WorkerActor(pause))

  case class WorkOn(message: String)
  case class Result(message: String)
}

class WorkerActor(val pause: Long) extends Actor with Timers {
  import WorkerActor._

  private def process(message: String): String = {
    Thread.sleep(pause)
    message + " done"
  }

  override def receive: Receive = {
    case WorkOn(message) => sender ! Result(process(message))
  }
}

Running the above program, we’ll see that we get similar run times outputted to the logs for each batch of 20 jobs:

starting 20 jobs
...
processing of the batch took 10024 milliseconds
schedulling master actor restart
starting 20 jobs
...
processing of the batch took 10056 milliseconds
schedulling master actor restart
starting 20 jobs
...
processing of the batch took 10063 milliseconds

Local Parallelization

In Akka, parallelization of operations can be achieved simply by increasing the number of actors handling that work. In our case we will create a pool of 4 worker actors, instead of a single worker actors, with the following change in the master actor:

private val worker = context.actorOf(
  RoundRobinPool(4).props(
    WorkerActor.props(500)
  )
)

If we run this parallelized version of our program we will get exactly what we expect, a runtime consistently 4 times lower than the one we were registering with just one worker actor:

starting 20 jobs
...
processing of the batch took 2500 milliseconds
schedulling master actor restart
starting 20 jobs
...
processing of the batch took 2500 milliseconds
schedulling master actor restart
starting 20 jobs
...
processing of the batch took 2500 milliseconds
schedulling master actor restart
starting 20 jobs
...
processing of the batch took 2500 milliseconds

Multiple Process Parallelization

The first thing we will try to do is to start another actor system on the same machine but in a different process, listening on a different port. This means a new project (at this point), so we’ll have the master project and the worker project. On the master project, we need to configure the Akka system to start in distributed mode:

@Bean
def createActorsSystem(): ActorSystem = {
  val customConfiguration = ConfigFactory.parseString(
    """
      |akka {
      |  actor {
      |    provider = remote
      |  }
      |  remote {
      |    enabled-transports = ["akka.remote.netty.tcp"]
      |    netty.tcp {
      |      hostname = "127.0.0.1"
      |      port = "2553"
      |    }
      | }
      |}
    """.stripMargin)
  ActorSystem("distributedSystem", ConfigFactory.load(customConfiguration))
}

In the master project, we just need to create the master actor, but we also need to configure it to access the worker actors remotely, so this is how we initialize the reference to the worker actor inside the master actor:

private val worker = context.actorSelection("akka.tcp://distributedSystemWorkers@127.0.0.1:2552/user/workerActor")

Next, we need to create the worker project, very similar to the master one but with the code that initializes and runs the worker actor. An important thing to pay attention to is that the case classes you use to communicate between the two projects are in the same packages. The initialization of the worker system and the worker actor:

@SpringBootApplication
class Configuration {

  @Bean
  def createActorsSystem(): ActorSystem = {
    val customConfiguration = ConfigFactory.parseString(
      """
        |akka {
        |  actor {
        |    provider = remote
        |  }
        |  remote {
        |    enabled-transports = ["akka.remote.netty.tcp"]
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = 2552
        |    }
        | }
        |}
      """.stripMargin)
    ActorSystem("distributedSystemWorkers", ConfigFactory.load(customConfiguration))
  }

  @Bean(Array("workerActor"))
  def createWorkerActor(actorSystem: ActorSystem): ActorRef = {
    actorSystem.actorOf(
      RoundRobinPool(4).props(
        WorkerActor.props(500)
      ), "workerActor")
  }
}

If we now start the two projects (start the worker first) we should see the following output on the master:

processing of the batch took 2628 milliseconds
schedulling master actor restart
starting 20 jobs
processing of the batch took 2512 milliseconds
schedulling master actor restart
starting 20 jobs
processing of the batch took 2500 milliseconds
schedulling master actor restart
starting 20 jobs
processing of the batch took 2500 milliseconds
schedulling master actor restart
starting 20 jobs
processing of the batch took 2530 milliseconds
schedulling master actor restart
starting 20 jobs

We can see that the results are roughly the same, very little overhead registered on some of the jobs.

Of course this setup is not very usefull, we need to know what the address and port of every remote system is and get access to actors on that system. What would be a lot more usefull is if we could make a setup where everything is a lot more transparent, where we can have a system that we can grow by adding new nodes to it, and the work done by akka actors gets distributed over these nodes without extra configuration. And this exactly what can be achieved with an Akka cluster configuration.

Akka Cluster

The first change we need to make is to configure our app to be able to run on the same machine but on different ports, because we’ll test this on a single machine. This means we need to configure the spring server port and the Akka system port. This can be done easily with Spring by using program arguments starting with --. So we add the following as program arguments to our instances of DistributedApplication (we’ll use 3 instances to test):

Then we need to update our Spring startup class to read those arguments:

object DistributedApplication {

  def main(args: Array[String]): Unit = {
    SpringApplication.run(classOf[Configuration], args:_*)
  }

}

The server.port and logging.file properties are supported by Spring and we don’t need to do anything else to make them work, but we need to inject the akka.system.port property in our configuration class, so we can use it when we configure the Akka system:

@SpringBootApplication
class Configuration {

  @Value("${akka.system.port}")
  @BeanProperty
  val akkaPort: String = ""

}

This part is done, next we configure the akka system to run in clustered mode. We use the following custom configuration when we initialize the Akka system (and to which we inject the akka.system.port property):

@SpringBootApplication
class Configuration {

  @Value("${akka.system.port}")
  @BeanProperty
  val akkaPort: String = ""

  @Bean
  def createActorsSystem(): ActorSystem = {
    val customConfiguration = ConfigFactory.parseString(
      s"""
        |akka {
        |  actor {
        |    provider = cluster
        |  }
        |  remote {
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = $akkaPort
        |    }
        |    artery {
        |      enabled = on
        |      canonical.hostname = "127.0.0.1"
        |      canonical.port = $akkaPort
        |    }
        |  }
        |
        |  cluster {
        |    seed-nodes = [
        |      "akka://distributedSystem@127.0.0.1:2551",
        |      "akka://distributedSystem@127.0.0.1:2552"]
        |
        |    auto-down-unreachable-after = 10s
        |  }
        |
        |}
      """.stripMargin)
    ActorSystem("distributedSystem", ConfigFactory.load(customConfiguration))
  }
}

The configuration tells our distributedSystem that actors are provided by the cluster, that the system should start on the IP 127.0.0.1 and the port configured by the akka.system.port property, and metions what the seed nodes of the cluster are.

Another thing we’ll do for this example application is make sure we start one master actor, only on one of the nodes:

@Bean(Array("masterActor"))
def createMasterActor(actorSystem: ActorSystem): ActorRef = {
  if (akkaPort == "2551") actorSystem.actorOf(Props.create(classOf[MasterActor]), "masterActor")
  else null
}

Then we also create a worker on each node we start up:

@Bean(Array("workerActor"))
def createWorkerActor(actorSystem: ActorSystem): ActorRef = {
  actorSystem.actorOf(WorkerActor.props(500), "workerActor")
}

Now, our master needs a way to send work to all workers in a cluster, however many workers there are in the cluster at one time. For this, we need to create a router in our master than can detect when new nodes are added to the cluster and select the worker actors on those nodes by path (changes in MasterActor.scala):

private val worker = context.actorOf(
  ClusterRouterGroup(
    RoundRobinGroup(List("/user/workerActor")),
    ClusterRouterGroupSettings(
      totalInstances = 100,
      routeesPaths = List("/user/workerActor"),
      allowLocalRoutees = true,
      useRoles = Set("dc-default")
    )
  ).props()
)

Ok, now we are ready to start our application, one instance at a time. We start the first instance, the one that creates our master actor and one worker actor, and we follow the outputted logs. We’ll get the following performance:

starting 20 jobs
worker actor received message 3871350820359
worker actor received message 3871350833951
worker actor received message 3871350835839
worker actor received message 3871350836972
worker actor received message 3871350837727
worker actor received message 3871350838482
worker actor received message 3871350839237
worker actor received message 3871350839992
worker actor received message 3871350840747
worker actor received message 3871350841125
worker actor received message 3871350841880
worker actor received message 3871350842635
worker actor received message 3871350843390
worker actor received message 3871350848299
worker actor received message 3871350849054
worker actor received message 3871350849809
worker actor received message 3871350850564
worker actor received message 3871350851319
worker actor received message 3871350851697
worker actor received message 3871350852829
processing of the batch took 10022 milliseconds
schedulling master actor restart

So yes, the master actor starts 20 tasks, the worker actor on this node takes all the tasks and processes them, each task takes 500 milliseconds so in total it takes 10 seconds to do the work.

But now let’s start the second instance of our application and watch what the logs print on our master instance:

starting 20 jobs
worker actor received message 3901444757951
worker actor received message 3901444760971
worker actor received message 3901444761349
worker actor received message 3901444771921
worker actor received message 3901444772298
worker actor received message 3901444772676
worker actor received message 3901444773431
worker actor received message 3901444773808
worker actor received message 3901444774186
worker actor received message 3901444774564
processing of the batch took 5023 milliseconds
schedulling master actor restart

After the master started 20 jobs, the worker actor on the main node only received 10 of them, but the work was completed and it took half the time. This means that the second node has successfully joined the cluster and the second worker actor was discovered by the router in the master, and work was routed to it. If we look at the logs of the second node we’ll see the worker actor received message printouts that confirm the work is being done by this node also.

Predictably, if we also start the third node, the work will be done even faster:

starting 20 jobs
worker actor received message 3921544041517
worker actor received message 3921544042649
worker actor received message 3921544043027
worker actor received message 3921544043782
worker actor received message 3921544044160
worker actor received message 3921544044915
worker actor received message 3921544045670
processing of the batch took 3514 milliseconds
schedulling master actor restart

This achieves our goal of building a powerful service that we can parallelize very easily and spread the workload over multiple nodes (on a machine, on an actual cluster). With Akka, this switch from a single node to multiple nodes requires only minimal changes in your application. In conclusion, besides the fact that Akka, with the actor model it implements, makes writing multi-threadded applications more manageable, adopting it also gives you the option of deploying your processing application on multiple machines when the need requires it, and makes this multi-node configuration also very easy to adopt.