home / 2021.07.08 10:00 / spark / test-driven / unit test / mock api

Spark Test-driven Development with API Integration

This very short article is an addition to the previous Spark test driven development article, where I showed you how to use in-memory databases as output locations for your Spark job tests. This time, we go over a method to mock an external API that our job is supposed to get data from. The API would provide two interfaces, one to get the index of the data available, and another to download the data files.

object ApiSparkJob { def main(args: Array[String]): Unit = { val argmap = getArgumentsMap(args) val local = argmap.getOrElse("local", "false").toBoolean // read input parameters val indexUrl = argmap.getOrElse("index_url", "") val outputLocation = argmap.getOrElse("output_location", "") implicit val spark: SparkSession = ( if (local) SparkSession.builder().master("local[*]") else SparkSession.builder() ) // spark configuration .getOrCreate() import spark.implicits._ val index: Dataset[DataEntry] = spark.crateDataset(downloadIndex(indexUrl)) val data: Dataset[DataFile] = index.map(entry => downloadFile(entry.dataUrl)) // transform your data output.write.mode("append").parquet(outputLocation) } def downloadIndex(indexUrl: String): List[DataEntry] = { // make rest call, download index json, parse it to objects } def downloadFile(fileUrl: String): DataFile = { // make rest call to API to download the data file } } case class DataEntry( id: String, timestamp: Date, dataUrl: String ) case class DataFile( url: String, data: Array[Byte] )

You can see the very simple version of the Spark job loading data from an API. It downloads the index, then downloads each file in the index and holds it in memory as a Byte array. We can then perform transformations on those files, parse them, clean them up. Finally, the files get saved to HDFS as parquet files. Now we need to test this.

In our test, we will need to mock the API interfaces. We can do this by putting up a web server at the beginning of the test that can serve an index file and several data files. The test begins with a cleanup step, then we start the mock server, we run the test, we stop the server and we verify the output. The web server is using another class to handle requests, the MockApiHandler.

class ApiSparkJobTest extends FlatSpec with Matchers { val INPUT = "src/test/resources/input/api" val OUTPUT = "src/test/resources/output/api" def cleanup() = { FileUtils.deleteDirectory(new File(OUTPUT)) } "running api spark job" should "download and save data" in { cleanup() val port = 8080 val serverRoot = s"http://localhost:$port" val server = startMockServer(port, new MockApiHandler(INPUT, serverRoot)) println("started server...") val indexUrl = s"$serverRoot/index" ApiSparkJob.main(Array( "local=true", "index_url=" + indexUrl, "output_location=" + OUTPUT )) server.stop(0) implicit val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate() val output = spark.read.parquet(OUTPUT) output.count() should be (4) } def startMockServer(port: Int, handler: HttpHandler) = { val executorPool = Executors.newFixedThreadPool(10).asInstanceOf[ThreadPoolExecutor] val server = HttpServer.create(new InetSocketAddress("localhost", port), 0) server.createContext("/", handler) server.setExecutor(executorPool) server.start() server } }

The MockApiHandler class uses two regex matchers to decide if a call made to the server is trying to access the index or a data file. The dataUrlMatcher is also selecting the ID of the data file being accessed by parsing the URL. These matching operations are very elegantly handled with Scala Regex and pattern matching. If the URL matches something that is expected, we try to return a file from disk in the response.

class MockApiHandler(dataLocation: String, serverRoot: String) extends HttpHandler { val indexUrlMatcher = "^/index.*".r val dataUrlMatcher = "^/data/([0-9]+)".r override def handle(httpExchange: HttpExchange): Unit = { val method = httpExchange.getRequestMethod val url = httpExchange.getRequestURI.toString println(s"request method $method, url $url") val (content, contentType, fileName) = (method, url) match { case ("GET", indexUrlMatcher(_*)) => val fileName = "index.json" (readTemplate(fileName), "application/json", fileName) case ("GET", dataUrlMatcher(id)) => val fileName = id + ".txt" (readFile(fileName), "application/octet-stream", fileName) case _ => (null, null, null) } if (content != null) { httpExchange.getResponseHeaders.add("Content-Type", contentType) if (contentType == "application/octet-stream") { httpExchange.getResponseHeaders.add("Content-Disposition", s"""attachment; filename="$fileName"""") } httpExchange.sendResponseHeaders(200, content.length) val outputStream = httpExchange.getResponseBody() outputStream.write(content) outputStream.flush() outputStream.close() } else { httpExchange.sendResponseHeaders(404, 0) httpExchange.close() } } private def readTemplate(name: String): Array[Byte] = { val bytes = readFileBytes(name) val template = new String(bytes, StandardCharsets.UTF_8) val contents = template.replaceAll("ROOT", serverRoot) contents.getBytes(StandardCharsets.UTF_8) } private def readFileBytes(fileName: String): Array[Byte] = { val path = dataLocation + "/" + fileName if (new File(path).exists()) { Files.readAllBytes(Paths.get(path)) } else { null } } }

We have two types of files, the data files we return as they are, and the index file, which is a template where we must insert the mock server root URL. With data files, the response must also indicate this is an attachment that is supposed to be downloaded. If the file does not exist, we return a 404 response.

For this test setup to work, we will need some test data in the input src/test/resources/input/api folder. We have the following files there:

- index.json - 1.txt - 2.txt - 3.txt - 4.txt

And the index.json file contains the following:

[ { "id": "1", "timestamp": "2021-07-08T08:01:00.000Z", "dataUrl": "ROOT/data/1" }, { "id": "2", "timestamp": "2021-07-08T08:02:00.000Z", "dataUrl": "ROOT/data/2" }, { "id": "3", "timestamp": "2021-07-08T08:03:00.000Z", "dataUrl": "ROOT/data/3" }, { "id": "4", "timestamp": "2021-07-08T08:04:00.000Z", "dataUrl": "ROOT/data/4" } ]

This simple approach can help you mock and test Spark integrations with external APIs. All you need is to capture some of the APIs responses and make your HttpHandler return those responses when the code under test asks for them.