home / 2021.07.08 10:00 / spark / test-driven / unit test / mock api
This very short article is an addition to the previous Spark test driven development article, where I showed you how to use in-memory databases as output locations for your Spark job tests. This time, we go over a method to mock an external API that our job is supposed to get data from. The API would provide two interfaces, one to get the index of the data available, and another to download the data files.
object ApiSparkJob {
def main(args: Array[String]): Unit = {
val argmap = getArgumentsMap(args)
val local = argmap.getOrElse("local", "false").toBoolean
// read input parameters
val indexUrl = argmap.getOrElse("index_url", "")
val outputLocation = argmap.getOrElse("output_location", "")
implicit val spark: SparkSession = (
if (local) SparkSession.builder().master("local[*]")
else SparkSession.builder()
)
// spark configuration
.getOrCreate()
import spark.implicits._
val index: Dataset[DataEntry] = spark.crateDataset(downloadIndex(indexUrl))
val data: Dataset[DataFile] = index.map(entry => downloadFile(entry.dataUrl))
// transform your data
output.write.mode("append").parquet(outputLocation)
}
def downloadIndex(indexUrl: String): List[DataEntry] = {
// make rest call, download index json, parse it to objects
}
def downloadFile(fileUrl: String): DataFile = {
// make rest call to API to download the data file
}
}
case class DataEntry(
id: String,
timestamp: Date,
dataUrl: String
)
case class DataFile(
url: String,
data: Array[Byte]
)
You can see the very simple version of the Spark job loading data from an API. It downloads the index, then downloads
each file in the index and holds it in memory as a Byte
array. We can then perform transformations on those files,
parse them, clean them up. Finally, the files get saved to HDFS as parquet files. Now we need to test this.
In our test, we will need to mock the API interfaces. We can do this by putting up a web server at the beginning of the
test that can serve an index file and several data files. The test begins with a cleanup step, then we start the mock
server, we run the test, we stop the server and we verify the output. The web server is using another class to handle
requests, the MockApiHandler
.
class ApiSparkJobTest extends FlatSpec with Matchers {
val INPUT = "src/test/resources/input/api"
val OUTPUT = "src/test/resources/output/api"
def cleanup() = {
FileUtils.deleteDirectory(new File(OUTPUT))
}
"running api spark job" should "download and save data" in {
cleanup()
val port = 8080
val serverRoot = s"http://localhost:$port"
val server = startMockServer(port, new MockApiHandler(INPUT, serverRoot))
println("started server...")
val indexUrl = s"$serverRoot/index"
ApiSparkJob.main(Array(
"local=true",
"index_url=" + indexUrl,
"output_location=" + OUTPUT
))
server.stop(0)
implicit val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()
val output = spark.read.parquet(OUTPUT)
output.count() should be (4)
}
def startMockServer(port: Int, handler: HttpHandler) = {
val executorPool = Executors.newFixedThreadPool(10).asInstanceOf[ThreadPoolExecutor]
val server = HttpServer.create(new InetSocketAddress("localhost", port), 0)
server.createContext("/", handler)
server.setExecutor(executorPool)
server.start()
server
}
}
The MockApiHandler
class uses two regex matchers to decide if a call made to the server is trying to access the index
or a data file. The dataUrlMatcher
is also selecting the ID of the data file being accessed by parsing the URL.
These matching operations are very elegantly handled with Scala Regex and pattern matching. If the URL matches something
that is expected, we try to return a file from disk in the response.
class MockApiHandler(dataLocation: String, serverRoot: String) extends HttpHandler {
val indexUrlMatcher = "^/index.*".r
val dataUrlMatcher = "^/data/([0-9]+)".r
override def handle(httpExchange: HttpExchange): Unit = {
val method = httpExchange.getRequestMethod
val url = httpExchange.getRequestURI.toString
println(s"request method $method, url $url")
val (content, contentType, fileName) = (method, url) match {
case ("GET", indexUrlMatcher(_*)) =>
val fileName = "index.json"
(readTemplate(fileName), "application/json", fileName)
case ("GET", dataUrlMatcher(id)) =>
val fileName = id + ".txt"
(readFile(fileName), "application/octet-stream", fileName)
case _ =>
(null, null, null)
}
if (content != null) {
httpExchange.getResponseHeaders.add("Content-Type", contentType)
if (contentType == "application/octet-stream") {
httpExchange.getResponseHeaders.add("Content-Disposition", s"""attachment; filename="$fileName"""")
}
httpExchange.sendResponseHeaders(200, content.length)
val outputStream = httpExchange.getResponseBody()
outputStream.write(content)
outputStream.flush()
outputStream.close()
} else {
httpExchange.sendResponseHeaders(404, 0)
httpExchange.close()
}
}
private def readTemplate(name: String): Array[Byte] = {
val bytes = readFileBytes(name)
val template = new String(bytes, StandardCharsets.UTF_8)
val contents = template.replaceAll("ROOT", serverRoot)
contents.getBytes(StandardCharsets.UTF_8)
}
private def readFileBytes(fileName: String): Array[Byte] = {
val path = dataLocation + "/" + fileName
if (new File(path).exists()) {
Files.readAllBytes(Paths.get(path))
} else {
null
}
}
}
We have two types of files, the data files we return as they are, and the index file, which is a template where we must
insert the mock server root URL. With data files, the response must also indicate this is an attachment that is supposed
to be downloaded. If the file does not exist, we return a 404
response.
For this test setup to work, we will need some test data in the input src/test/resources/input/api
folder. We have the
following files there:
- index.json
- 1.txt
- 2.txt
- 3.txt
- 4.txt
And the index.json
file contains the following:
[
{
"id": "1",
"timestamp": "2021-07-08T08:01:00.000Z",
"dataUrl": "ROOT/data/1"
},
{
"id": "2",
"timestamp": "2021-07-08T08:02:00.000Z",
"dataUrl": "ROOT/data/2"
},
{
"id": "3",
"timestamp": "2021-07-08T08:03:00.000Z",
"dataUrl": "ROOT/data/3"
},
{
"id": "4",
"timestamp": "2021-07-08T08:04:00.000Z",
"dataUrl": "ROOT/data/4"
}
]
This simple approach can help you mock and test Spark integrations with external APIs. All you need is to capture some
of the APIs responses and make your HttpHandler
return those responses when the code under test asks for them.