home / 2019.02.23 8:30 / apache spark / configuration

Spark Job File Configuration

In this short post, I will go over a small solution for using a file to configure a Spark job. The file will be read at the beginning of the Spark job and its contents will be used to configure various variables of the Spark job. This means we need to build this solution to make the file accessible both when running a job locally, and when deploying the job in the Spark cluster. When deploying jobs, we can also deploy external resources that should be accessible to those jobs by using the --files argument. So we only need to worry about providing an easily configurable mechanism to access those files both locally or in a cluster.


The first step, configuration of our SBT project, we will use just two dependencies, one to the Spark libraries, which we need to load the file, and another to a json library, because our configuration file will be a json file.

val sparkSql = "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided" val json = "io.spray" %% "spray-json" % "1.3.5" lazy val tools = (project in file("spark/Tools")) .settings( name := "Tools", organization := "com.cacoveanu.spark.tools", libraryDependencies ++= Seq( sparkSql, json ) )

File Access

We'll implement a short utility class that can load the file, call it SparkUtil. We need a way to differentiate wether we are working in a local environment or on a cluster, and we'll use a local flag to define that. This flag will be set to false by default, because we are running locally only when testing our code.

import java.io.FileInputStream import org.apache.spark.SparkFiles import spray.json._ object SparkUtil { private def getOperatingPath(path: String, local: Boolean) = if (local) path else SparkFiles.get(path) def loadConfigJsonFile(path: String, local: Boolean = false) = { val operatingPath = getOperatingPath(path, local) val source = scala.io.Source.fromFile(operatingPath) val lines: String = try source.mkString finally source.close() lines.parseJson } }

Reading the file and parsing the json in it is simple enough with Scala and a json library. The main problem this object solves is getting the correct path to the file. When running locally, the local path we provide to the loadConfigJsonFile method is the correct path. Not so in a cluster, where the JAR and dependencies deployed with our Spark job are all copied to temporary folders on worker nodes. If we are not in a local execution context, we can use the SparkFiles.get method to obtain the real path on the node running this job.

Json Configuration File Utility

Next I've implemented a utility class that can wrap over a json file and access properties based on a property path. You can see example methods for loading String, Int, Double, array of String and arrat of Double properties.

import spray.json.{JsArray, JsObject, JsValue} import spray.json.DefaultJsonProtocol._ class JsonConfiguration(val path: String)(implicit local: Boolean = false) { val config: JsValue = SparkUtil.loadConfigJsonFile(path, local) def getProperty(path: String*): Option[Any] = { var current: Option[Any] = Some(config) path.foreach(e => current = current match { case Some(map: JsObject) => map.getFields(e).headOption case None => None case _ => None } ) current } def getOrElse(path: Seq[String], default: String): String = { getProperty(path:_*) match { case Some(value: JsValue) => value.convertTo[String] case _ => default } } def getOrElse(path: Seq[String], default: Int): Int = { getProperty(path:_*) match { case Some(value: JsValue) => value.convertTo[Int] case _ => default } } def getOrElse(path: Seq[String], default: Double): Double = { getProperty(path:_*) match { case Some(value: JsValue) => value.convertTo[Double] case _ => default } } def getOrStringArray(path: Seq[String], default: Seq[String]): Seq[String] = { getProperty(path:_*) match { case Some(value: JsArray) => value.convertTo[Seq[String]] case _ => default } } def getOrDoubleArray(path: Seq[String], default: Seq[Double]): Seq[Double] = { getProperty(path:_*) match { case Some(value: JsArray) => value.convertTo[Seq[Double]] case _ => default } } }

Configuring the Spark Job

Now we need to write a Spark job that uses a configuration file. First thing we need to worry about, as we start our job, is how to determine if this is a local job or if we are in a cluster. In our program, the difference is made by the presence of an argument named local with a boolean value. If no argument with that name is present, we just assume that we are on a cluster.

Next we need to initialize the Spark session. We need to do this before we load the file, because SparkFiles.get will only give us a correct response if the session is already initialized. This means that certain variables, like the local flag or variables necessary when initializing the Spark session can't be read from the configuration file and need to be provided as command line arguments.

Once we have our Spark session, we can initialize the JsonConfiguration and read values from it. And we can start loading Spark streams and running our algorithms.

object OurSparkJob { def main(args: Array[String]): Unit = { val argmap: Map[String, String] = args .map(a => a.split("=")) .filter(a => a(0).nonEmpty && a(1).nonEmpty) .map(a => a(0) -> a(1)) .toMap implicit val local: Boolean = argConfig.getOrElse("local", "false").toBoolean val spark = ( if (local) SparkSession.builder().master("local[*]") else SparkSession.builder() ) .appName("ourSparkJob") .getOrCreate() val jsonConfig = new JsonConfiguration("configuration.json")(local) val thresholdForAlgorithmA = jsonConfig.getOrElse( Seq("parameters", "algorithmA", "threshold"), default = 0.1d ) val data = spark.readStream // and so on ... } }

Deploying on a Cluster

So our job is is ourSparkJob.jar, class com.cacoveanu.spar.OurSparkJob, and our configuration file is configuration.json. To deploy this job on a cluster, we must run the following command:

spark-submit --class com.cacoveanu.spar.OurSparkJob --master spark://master-url:7077 --deploy-mode cluster --files configuration.json ourSparkJob.jar