home / 2019.02.23 8:30 / apache spark / configuration
In this short post, I will go over a small solution for using a file to configure a Spark job. The file will be read at the beginning of the Spark job and its contents will be used to configure various variables of the Spark job. This means we need to build this solution to make the file accessible both when running a job locally, and when deploying the job in the Spark cluster. When deploying jobs, we can also deploy external resources that should be accessible to those jobs by using the --files
argument. So we only need to worry about providing an easily configurable mechanism to access those files both locally or in a cluster.
The first step, configuration of our SBT project, we will use just two dependencies, one to the Spark libraries, which we need to load the file, and another to a json library, because our configuration file will be a json file.
val sparkSql = "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided"
val json = "io.spray" %% "spray-json" % "1.3.5"
lazy val tools = (project in file("spark/Tools"))
.settings(
name := "Tools",
organization := "com.cacoveanu.spark.tools",
libraryDependencies ++= Seq(
sparkSql,
json
)
)
We'll implement a short utility class that can load the file, call it SparkUtil
. We need a way to differentiate wether we are working in a local environment or on a cluster, and we'll use a local
flag to define that. This flag will be set to false
by default, because we are running locally only when testing our code.
import java.io.FileInputStream
import org.apache.spark.SparkFiles
import spray.json._
object SparkUtil {
private def getOperatingPath(path: String, local: Boolean) =
if (local) path else SparkFiles.get(path)
def loadConfigJsonFile(path: String, local: Boolean = false) = {
val operatingPath = getOperatingPath(path, local)
val source = scala.io.Source.fromFile(operatingPath)
val lines: String = try source.mkString finally source.close()
lines.parseJson
}
}
Reading the file and parsing the json in it is simple enough with Scala and a json library. The main problem this object solves is getting the correct path to the file. When running locally, the local path we provide to the loadConfigJsonFile
method is the correct path. Not so in a cluster, where the JAR and dependencies deployed with our Spark job are all copied to temporary folders on worker nodes. If we are not in a local execution context, we can use the SparkFiles.get
method to obtain the real path on the node running this job.
Next I've implemented a utility class that can wrap over a json file and access properties based on a property path. You can see example methods for loading String
, Int
, Double
, array of String
and arrat of Double
properties.
import spray.json.{JsArray, JsObject, JsValue}
import spray.json.DefaultJsonProtocol._
class JsonConfiguration(val path: String)(implicit local: Boolean = false) {
val config: JsValue = SparkUtil.loadConfigJsonFile(path, local)
def getProperty(path: String*): Option[Any] = {
var current: Option[Any] = Some(config)
path.foreach(e =>
current = current match {
case Some(map: JsObject) => map.getFields(e).headOption
case None => None
case _ => None
}
)
current
}
def getOrElse(path: Seq[String], default: String): String = {
getProperty(path:_*) match {
case Some(value: JsValue) => value.convertTo[String]
case _ => default
}
}
def getOrElse(path: Seq[String], default: Int): Int = {
getProperty(path:_*) match {
case Some(value: JsValue) => value.convertTo[Int]
case _ => default
}
}
def getOrElse(path: Seq[String], default: Double): Double = {
getProperty(path:_*) match {
case Some(value: JsValue) => value.convertTo[Double]
case _ => default
}
}
def getOrStringArray(path: Seq[String], default: Seq[String]): Seq[String] = {
getProperty(path:_*) match {
case Some(value: JsArray) => value.convertTo[Seq[String]]
case _ => default
}
}
def getOrDoubleArray(path: Seq[String], default: Seq[Double]): Seq[Double] = {
getProperty(path:_*) match {
case Some(value: JsArray) => value.convertTo[Seq[Double]]
case _ => default
}
}
}
Now we need to write a Spark job that uses a configuration file. First thing we need to worry about, as we start our job, is how to determine if this is a local job or if we are in a cluster. In our program, the difference is made by the presence of an argument named local
with a boolean value. If no argument with that name is present, we just assume that we are on a cluster.
Next we need to initialize the Spark session. We need to do this before we load the file, because SparkFiles.get
will only give us a correct response if the session is already initialized. This means that certain variables, like the local
flag or variables necessary when initializing the Spark session can't be read from the configuration file and need to be provided as command line arguments.
Once we have our Spark session, we can initialize the JsonConfiguration
and read values from it. And we can start loading Spark streams and running our algorithms.
object OurSparkJob {
def main(args: Array[String]): Unit = {
val argmap: Map[String, String] = args
.map(a => a.split("="))
.filter(a => a(0).nonEmpty && a(1).nonEmpty)
.map(a => a(0) -> a(1))
.toMap
implicit val local: Boolean = argConfig.getOrElse("local", "false").toBoolean
val spark = (
if (local) SparkSession.builder().master("local[*]")
else SparkSession.builder()
)
.appName("ourSparkJob")
.getOrCreate()
val jsonConfig = new JsonConfiguration("configuration.json")(local)
val thresholdForAlgorithmA = jsonConfig.getOrElse(
Seq("parameters", "algorithmA", "threshold"),
default = 0.1d
)
val data = spark.readStream
// and so on ...
}
}
So our job is is ourSparkJob.jar
, class com.cacoveanu.spar.OurSparkJob
, and our configuration file is configuration.json
. To deploy this job on a cluster, we must run the following command:
spark-submit --class com.cacoveanu.spar.OurSparkJob --master spark://master-url:7077 --deploy-mode cluster --files configuration.json ourSparkJob.jar