home / 2019.08.09 14:00 / apache spark / apache kafka / docker / spark streaming / custom stream receiver / kafka rest

Spark Streaming Read from Kafka Rest Proxy

This article is a follow-up to the previous Kafka article and presents a different way to connect Spark with Kafka when the two services are in different clouds, and when there are limitations on what services can be exposed to the outside world. In our case, the Kafka service can't be directly exposed to the Internet, but we can set up access through a proxy. In the previous article, the proxy we used was just forwarding the Kafka protocol to the Kafka service. This time, we will be using a REST proxy. In some network setups, some protocols may be restricted. Usually, HTTP/S should be considered safe to use and unrestricted, which is why we are exploring the REST proxy over Kafka. The complicated part in this setup is reading data into a Spark stream from a Kafka REST proxy.

Test setup

We will start with the basic Zookeeper and Kafka images as before, this time Kafka will not be secured. For the rest proxy, we will use the REST proxy for Kafka from Confluent. The following Dockerfile will download and setup the proxy:

FROM ubuntu RUN apt-get update RUN apt-get install -y wget RUN apt-get install -y nano RUN apt-get install -y net-tools RUN apt-get install -y default-jre WORKDIR /opt RUN wget http://packages.confluent.io/archive/5.3/confluent-5.3.0-2.12.tar.gz RUN tar -xvzf *.tar.gz COPY kafka-rest.properties /opt/confluent-5.3.0/etc/kafka-rest/ ENTRYPOINT /opt/confluent-5.3.0/bin/kafka-rest-start /opt/confluent-5.3.0/etc/kafka-rest/kafka-rest.properties

As you can see, a kafka-rest.properties file is copied to the server. The file contents are the following:

zookeeper.connect=zookeeper:2181 bootstrap.servers=PLAINTEXT://kafka:9092

We just tell the REST proxy how to access the Zookeeper and Kafka services.

The docker-compose.yml file is the following:

version: '2' services: zookeeper: build: ./zookeeper ports: - "2181:2181" kafka: build: ./kafka ports: - "9092:9092" rest-proxy: build: ./rest-proxy ports: - "8082:8082"

You can start all this up and maybe create a test topic and write some messages to it, just so we are prepared.

The custom receiver

This is where the complicated part is. If we want to read data from Kafka to Spark we would normally use the Kafka source providers that come with Spark. Unfortunately, there is no source provider for a Kafka REST endpoint, so we need to write one ourselves. Moreover, working with Kafka is not as simple as reading from a REST endpoint. There are several steps a Kafka client needs to perform to be able to read from Kafka:

The consumer within consumer group concept is the way Kakfa allows parallel consumption from its topics. Each topic is split into partitions. If we have a consumer group with a consumer that reads from that topic, all messages will go to the single consumer. If we want to read from Kafka in a parallel manner, for example from multiple Spark workers, we must create multiple consumers in a single consumer group. If we do this, Kafka will know to send each consumer messages from the same subset of partitions. So, if a topic has 4 partitions, and we create two consumers in a consumer group to read data from that topic, cosumer 1 will receive messages from partitions 1 and 2, and consumer 2 will receive messages from partitions 3 and 4. This way, Kafka ensures each message is only processed once by each consumer group.

With this in mind, we can implement a custom Spark receiver for the Kafka REST endpoint in the following manner:

package com.cacoveanu.spark import java.net.SocketTimeoutException import java.util.UUID import io.circe.parser.decode import io.circe.generic.auto._ import org.apache.spark.internal.Logging import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.receiver.Receiver import scalaj.http.Http class KafkaRestReceiver(consumerGroup: String, restProxyUrl: String, topics: Seq[String]) extends Receiver[Message](StorageLevel.MEMORY_AND_DISK_2) with Logging { private def registerBinaryConsumer(consumer: String) = { val url = s"http://$restProxyUrl/consumers/$consumerGroup" val body = s""" |{ | "name": "${consumer}", | "format": "binary", | "auto.offset.reset": "earliest" |} """.stripMargin val response = Http(url) .postData(body) //.proxy("localhost", 3128) .header("Accept", "application/vnd.kafka.v2+json") .header("Content-Type", "application/vnd.kafka.v2+json") .asString val result = if (response.code == 200) { decode[RegisterResponse](response.body) match { case Left(_) => None case Right(res) => Some(res) } } else None result } private def closeConsumer(consumerUrl: String) = { val url = consumerUrl val response = Http(url) .method("DELETE") //.header("X-HTTP-Method-Override", "DELETE") .header("Content-Type", "application/vnd.kafka.v2+json") .asString if (response.code == 204) { true } else { println(response.code) false } } private def subscribeConsumerToTopics(consumerUrl: String, topics: Seq[String]) = { val url = s"$consumerUrl/subscription" val body = s""" |{"topics":["${topics.mkString("\",\"")}"]} """.stripMargin val response = Http(url) .postData(body) //.proxy("localhost", 3128) .header("Accept", "application/vnd.kafka.v2+json") .header("Content-Type", "application/vnd.kafka.v2+json") .asString if (response.code == 200) true else false } private def readMessages(uri: String) = { val url = s"$uri/records" try { val response = Http(url) .header("Accept", "application/vnd.kafka.binary.v2+json") //.timeout(60000, 60000) .asString if (response.code == 200) { decode[Seq[Message]](response.body) match { case Left(failure) => println(failure) None case Right(result) => Some(result) } } else None } catch { case e: SocketTimeoutException => e.printStackTrace() None } } override def onStart(): Unit = { val consumerId = UUID.randomUUID().toString val registerResponse = registerBinaryConsumer(consumerId) registerResponse match { case Some(RegisterResponse(_, uri)) => println(s"consumer url: $uri") subscribeConsumerToTopics(uri, topics) new Thread("Rest Receiver") { override def run() { receive(uri) } }.start() case None => } } override def onStop(): Unit = { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } private def receive(uri: String) { while(!isStopped) { // read data from rest endpoint val data = readMessages(uri) data match { case Some(v) => println(s"read ${v.size} messages") store(v.iterator) case None => } } val ccRes = closeConsumer(uri) println(s"close consumer: $ccRes") } } case class RegisterResponse(instance_id: String, base_uri: String) case class Message(topic: String, key: Option[String], value: String, partition: Int, offset: Int)

I will go over the solution step by step. For a start, we are implemeting the Receiver interface. This interface has two methods. onStart is responsible with initializing and starting the threads that will bring data into Spark, and onStop, responsible with closing those threads. When Spark stops the receivers, it also sets a flag called isStopped. In our code, we use that flag to stop the processing.

Our receiver will need to know three things to work:

override def onStart(): Unit = { val consumerId = UUID.randomUUID().toString val registerResponse = registerBinaryConsumer(consumerId) registerResponse match { case Some(RegisterResponse(_, uri)) => println(s"consumer url: $uri") subscribeConsumerToTopics(uri, topics) new Thread("Rest Receiver") { override def run() { receive(uri) } }.start() case None => } }

When a reveicer is created, the onStart method is called. This method will generate a unique ID for the consumer to be registered, then it will use the registerBinaryConsumer method to make a call to the REST proxy and register the consumer. The consumer will be registered under the consumer group defined for the Spark application. The result will be a consumer URL. We use this URL to subscribe to the desired topics, then we start a thread that will periodically receive messages from Kafka using the consumer URL.

private def receive(uri: String) { while(!isStopped) { // read data from rest endpoint val data = readMessages(uri) data match { case Some(v) => println(s"read ${v.size} messages") store(v.iterator) case None => } } val ccRes = closeConsumer(uri) println(s"close consumer: $ccRes") }

The receive method will run until the receiver is stopped by Spark. The method will keep trying to read messages from the consumer URL, and if any new messages are found, it converts them to case classes and stores them into Spark. Calling the store method is what sends data to the Spark stream. At the end of the receive method, once the receiver was stopped, we try to close the consumer by calling its URL with a DELETE HTTP method.

It is important to signal the closing and starting of a consumer in a consumer group. If a new consumer is started, Kafka needs to know, to start sending messages to that consumer as well, otherwise the consumer would be unused and of no consequence. For Kafka to reliably use the new consumer, it needs to take some partitions away from existing consumers and assign them to the new consumer. This is all done by Kafka when a new consumer is added to a consumer group. In the same manner, when a consumer is removed, Kafka must handle the orphaned partitions and reassign them to consumers that are still running. This is why, in case we are stopping our custom receiver, we must use the Kafka REST API to notify of the consumers' deletion.

The spark program

Following is the entire Spark program that uses our custom receiver to read data from a Kafka REST endpoint, in a streaming Spark context.

package com.cacoveanu.spark import java.io.{File, PrintWriter} import java.util.Base64 import org.apache.spark.SparkConf import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.util.Utils object SparkStreamingRestKafka { def main(args: Array[String]): Unit = { val argmap: Map[String, String] = args .map(a => a.split("=")) .filter(a => a(0).nonEmpty && a(1).nonEmpty) .map(a => a(0) -> a(1)) .toMap val local = argmap.contains("local") val sparkConf = new SparkConf().setAppName("CustomKafkaRestReceiver") if (local) sparkConf.setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.sparkContext.setLogLevel("WARN") val restProxyUrl = argmap.getOrElse("kafka_proxy" ,"localhost:8082") val consumerGroup = argmap.getOrElse("consumer_group", "spark_consumer_" + System.currentTimeMillis().toString) val topics = Seq(argmap.getOrElse("topic", "test")) val resultPath = argmap.getOrElse("result_path", "r") val streamData: ReceiverInputDStream[Message] = ssc.receiverStream(new KafkaRestReceiver(consumerGroup, restProxyUrl, topics)) val messages = streamData.map(m => { println(m.offset) KafkaDecoder.decode(m.value) }) messages.foreachRDD(rdd => { if (! rdd.isEmpty()) { rdd.foreachPartition(partition => { println("writing partition") val result = partition.mkString("\n") val filename = resultPath + "/r-" + System.currentTimeMillis().toString + ".txt" val pw = new PrintWriter(new File(filename)) pw.write(result) pw.close }) } }) ssc.start() ssc.awaitTermination() } } import java.util.Base64 object KafkaDecoder { def decode(message: String): String = { new String(Base64.getDecoder.decode(message)) } }

The first steps is setting up the program with the use of program arguments. One interesting problem I noticed is that, if we run Spark locally with a single worker thread, local[1], the result won't get printed in your output location until the very end of the execution. This is because that thread is busy with reading and processing the stream, so there are no resources to print your results. That is why, if we are running in local mode, we need at least two threads.

Another thing we need is a unique consumer group name. This is generated on the driver, when the program arguments are read, and then passed to

Custom receivers are compatible only with the older DStream APIs of Spark, but that API will do its job of monitoring the data source and executing the processing steps as new data appears. We create a new stream to which we provide our custom receiver. Next, we map the binary messager we receive on that stream into strings. This decoding is very simple to do and can be seen in the KafkaDecoder class. Once we have the string messages from our topic, we are ready to write them to our output location. We use a foreachRDD method that further splits exection by partitions. These are the RDD partitions, if the RDD is indeed distributed. Each partition available at that point in time will write the messages it contains in a file on disk. And this concludes our program, which we can continue to test in the following section.

Parallelized test setup

For this step, we will need to put up multiple Spark nodes, a master and at least two workers. I will go over how the master and worker images are created in a different article, but let's imagine we have them and we will add them to our docker-compose.yml file:

version: '2' services: zookeeper: build: ./zookeeper ports: - "2181:2181" kafka: build: ./kafka ports: - "9092:9092" rest-proxy: build: ./rest-proxy ports: - "8082:8082" spark-master: image: docker-spark_spark-master ports: - "8080:8080" volumes: - /mnt/spark-apps:/opt/spark-apps - /mnt/spark-data:/opt/spark-data spark-slave: image: docker-spark_spark-slave environment: - MASTER_URL=spark://spark-master:7077 volumes: - /mnt/spark-apps:/opt/spark-apps - /mnt/spark-data:/opt/spark-data

You will see the slave connects to the master through MASTER_URL. We also have shared volumes between the Spark master and slave(s). This is because the Spark applications and output locations need to be accessible to all nodes in the Spark cluster. This can be achieved in different ways, most common done by running Spark over a Hadoop cluster, but we can also use a shared network location, mounted at the same point on all nodes.

We are now ready to assemble our Spark app. To do that successfully, you must add a project/plugins.sbt file to your project that imports the assembly plugin:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.8")

I am also using the following build.sbt file to build the project:

import sbtassembly.AssemblyPlugin.autoImport name := "spark-kafka-rest" val projectVersion = "0.1" scalaVersion := "2.11.12" val spark = "org.apache.spark" %% "spark-core" % "2.4.0" % "provided" val sparkSql = "org.apache.spark" %% "spark-sql" % "2.4.0" % "provided" val sparkStreaming = "org.apache.spark" %% "spark-streaming" % "2.4.0" % "provided" val sparkSqlKafka = "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.0" val sparkSqlKafkaStreaming = "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.4.0" val scalajHttp = "org.scalaj" %% "scalaj-http" % "2.4.1" val circeCore = "io.circe" %% "circe-core" % "0.11.1" val circeGeneric = "io.circe" %% "circe-generic" % "0.11.1" val circeParser = "io.circe" %% "circe-parser" % "0.11.1" lazy val spark_app = (project in file(".")) .settings( name := "spark-kafka-rest", version := projectVersion, organization := "com.cacoveanu.spark", libraryDependencies ++= Seq( spark, sparkSql, sparkStreaming, sparkSqlKafka, sparkSqlKafkaStreaming, scalajHttp, circeCore, circeGeneric, circeParser ) ) val meta = """META.INF(.)*""".r assemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".html" => MergeStrategy.first case n if n.startsWith("reference.conf") => MergeStrategy.concat case n if n.endsWith(".conf") => MergeStrategy.concat case meta(_) => MergeStrategy.discard case x => MergeStrategy.first }

Once you have all these in place, you can run sbt assembly in your project root folder. This will create the assembly file under the target folder.

Next, we need to start up our cluster. Go to the folder where your docker project is and run docker-compose up --scale spark-slave=2 -d. This will start a cluster with a Kafka server, a Zookeeper server, a Kafka REST proxy server, one Spark master and two Spark slaves.

We must next copy the assembly file to the spark-apps volume shared between the Spark nodes. We can do this with the docker cp assembly.jar <spark-node-id>:/opt/spark-apps (you can obtain a Spark node ID with docker ps, any Spark node will do since the volume is shared between them).

Next, we need to start our Spark app on the cluster. We can do this from any Spark node with the following command:

/opt/spark-2.4.3-bin-hadoop2.7/bin/spark-submit --deploy-mode cluster --master spark://spark-master:7077 --class com.cacoveanu.spark.SparkStreamingRestKafka /opt/spark-apps/assembly.jar kafka_proxy=rest-proxy:8082 result_path=/opt/spark-data/

We use the spark-submit script from the Spark binary location, we deploy on the cluster represented by the master at spark-master:7077, the class we run is com.cacoveanu.spark.SparkStreamingRestKafka and can be found in the JAR at /opt/spark-apps/assembly.jar. We also provide the program arguments, where the Kafka REST proxy that we need to read data from is located and where to save the results. Once you run this command, you can go to the Spark management console at localhost:8080 and see that a driver and an application are running, and if you click on the application you should see that it is running on two worker nodes.

To really go deep into understanding how our application is functioning, we must inspect the logs of the workers. We can connect to each of the workers with docker exec -it <worker-id> bash and navigate to the Spark work folder at /opt/spark-2.4.3-bin-hadoop2.7/work. There, on each of the workers, we should find a folder with the application ID, as listed in the Spark management console, and in those folders we will find the stdout file where the println instructions in our code write their output. If we inspect those files, we can understand what part of the code is executed on what worker. And here we find a small surprise: only one of the workers has initialized a custom receiver and is reading data from the Kafka REST endpoint. Our data read process is not being distributed at all, and all the work of registering multiple consumers in the same consumer group is irrelevant, we have one consumer in one consumer group running on one worker node.

This is how a custom Spark consumer works. Spark has no way of knowing that the consumer can parallelize. If we have a consumer reading from a socket, or some standard REST endpoint, Spark has no way of guaranteeing that there will be no data duplication, or data loss if it decides to create multiple instances of that consumer. Spark will start a single instance and have a single entry-point of the data into the streaming context. Once the data is in a Spark RDD, it can get distributed over worker nodes and processed in parallel, so there is still some advantage at that level.

As it happens, the Kafka REST proxy is offering us a mechanism to parallelize data ingestion without duplicates, but we have a little more work to do before we can take advantage of that.

Real parallelism

To read data from multiple workers, we need to create multiple custom consumers. We can then read a separate stream from each consumer, on different worker nodes. Those streams can be merged if necessary. How expensive this merge operation is will depend on your data and your cluster, I have not made a production test yet to be able to offer any advice or even know if it's a good idea. Maybe having a single entry point is the only sensible way of operating with a custom consumer. Our updated code should look like this:

object SparkStreamingRestKafka { def main(args: Array[String]): Unit = { val argmap: Map[String, String] = args .map(a => a.split("=")) .filter(a => a(0).nonEmpty && a(1).nonEmpty) .map(a => a(0) -> a(1)) .toMap val local = argmap.contains("local") val sparkConf = new SparkConf().setAppName("CustomKafkaRestReceiver") if (local) sparkConf.setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.sparkContext.setLogLevel("WARN") val restProxyUrl = argmap.getOrElse("kafka_proxy" ,"localhost:8082") val consumerGroup = argmap.getOrElse("consumer_group", "spark_consumer_" + System.currentTimeMillis().toString) val topics = Seq(argmap.getOrElse("topic", "test")) val resultPath = argmap.getOrElse("result_path", "r") val readers = argmap.getOrElse("readers", "1").toInt val streams: immutable.Seq[DStream[Message]] = for (_ <- 1 to readers) yield ssc.receiverStream(new KafkaRestReceiver(consumerGroup, restProxyUrl, topics)) val streamData: DStream[Message] = streams.reduce(_ union _) val messages = streamData.map(m => { println(m.offset) KafkaDecoder.decode(m.value) }) messages.foreachRDD(rdd => { if (! rdd.isEmpty()) { rdd.foreachPartition(partition => { println("writing partition") val result = partition.mkString("\n") val filename = resultPath + "/r-" + System.currentTimeMillis().toString + ".txt" val pw = new PrintWriter(new File(filename)) pw.write(result) pw.close }) } }) ssc.start() ssc.awaitTermination() }

The first change is that we read a new input parameter called readers, this will represent the number of custom receivers we want to have in the application. Next, we initialize the desired number of custom receivers, each yielding a DStream, so we have a collection of streams. Then, we perform the (questionable?) union of all the streams. The rest of the processing is as before.

For this new test, I started a cluster with three Spark worker nodes.

We run our test again on the cluster, this time with the following command:

/opt/spark-2.4.3-bin-hadoop2.7/bin/spark-submit --deploy-mode cluster --master spark://spark-master:7077 --class com.cacoveanu.spark.SparkStreamingRestKafka /opt/spark-apps/assembly.jar kafka_proxy=rest-proxy:8082 result_path=/opt/spark-data/ readers=3

If we now inspect the stdout files of our application on our worker nodes we will first notice that three different consumers have been registered under the same consumer group. Further inspection of those logs will show us that different workers are processing (converting from binary to string) different messages, and different workers are writing the results to disk. We no longer have workers splitting their responsibility just across function, but also across data partitions.

Spark worker 1 stdout:

registering consumer 3569dc71-181d-4394-903e-92e7ff5c5c09 with consumer group spark_consumer_1565336913896 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565336913896/instances/3569dc71-181d-4394-903e-92e7ff5c5c09 read 5 messages writing partition 0 1 2 3 4 read 2 messages writing partition read 4 messages 7 writing partition 7 8 9 10 writing partition read 2 messages writing partition writing partition

Spark worker 2 stdout:

registering consumer 12e00986-5082-4956-b809-95469669fe1c with consumer group spark_consumer_1565336913896 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565336913896/instances/12e00986-5082-4956-b809-95469669fe1c

Spark worker 3 stdout:

registering consumer d325e45d-7a35-40e8-a332-98b9e71840b2 with consumer group spark_consumer_1565336913896 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565336913896/instances/d325e45d-7a35-40e8-a332-98b9e71840b2 0 5 writing partition 5 6 writing partition writing partition 11 writing partition 11 12

Alas, there still seems to be some issue with our setup. As you can see, the second worker just initializes a receiver, registers a consumer, and does nothing more. The third worker writes its partitions, but does not read any messages. We still have a parallelization issue somewhere. It may be related to the Kafka setup, maybe there are not enough partitions on the topic for three consumers. A quick check shows us that is the case, the topic has a single partition, but we can fix this.

We first stop the Spark application driver. Then, we connect to the Kafka node and remove the test topic with ./kafka-topics.sh --bootstrap-server kafka:9092 --topic test --delete. Then we can recreate the topic with more partitions: ./kafka-topics.sh --bootstrap-server kafka:9092 --create --replication-factor 1 --partitions 12 --topic test. Finally we must restart the Spark streaming app. Once the app is up, we write some messages to that topic with the console producer: ./kafka-console-producer.sh --broker-list kafka:9092 --topic test.

Going back to the logs for each worker, we can see that the setup is now successful. All three workers are now reading messages, sharing the load between them.

Spark worker 1 stdout:

registering consumer 29b9e1e8-ca86-49bc-8d05-9c06f2b6a8f4 with consumer group spark_consumer_1565341373338 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565341373338/instances/29b9e1e8-ca86-49bc-8d05-9c06f2b6a8f4 read 2 messages writing partition 1 1 read 2 messages 1 writing partition writing partition 1 1 writing partition writing partition read 1 messages writing partition 2 read 2 messages 2 writing partition 2 2 read 1 messages 2 writing partition 2 writing partition writing partition 3 3 writing partition 3

Spark worker 2 stdout:

registering consumer 1dc32fc4-95e8-4986-9666-30760d359e25 with consumer group spark_consumer_1565341373338 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565341373338/instances/1dc32fc4-95e8-4986-9666-30760d359e25 read 16 messages 0 writing partition 0 1 0 1 0 0 0 0 0 0 0 0 0 1 0 1 read 2 messages 1 writing partition 1 1 writing partition 1 1 writing partition 2 writing partition writing partition 2 2 read 1 messages 2 writing partition 2 writing partition read 3 messages 2 writing partition 2 2 2 writing partition writing partition writing partition 2 writing partition writing partition 2 2 3 writing partition 3 writing partition read 1 messages 3 writing partition 3 writing partition read 1 messages writing partition writing partition

Spark worker 3 stdout:

registering consumer 31ab4a3e-caaa-44e2-bb27-8aa37b58aa26 with consumer group spark_consumer_1565341373338 consumer url: http://rest-proxy:8082/consumers/spark_consumer_1565341373338/instances/31ab4a3e-caaa-44e2-bb27-8aa37b58aa26 read 2 messages read 2 messages read 2 messages read 1 messages read 1 messages

As a parting note, I must again mention that this setup has not been tested in a real production environment. I am worried about possible performance implications of stream unions, since they mean moving data around the cluster. The custom receivers can surely still be improved, with some way to handle failures and deregistering the consumer if a receiver is going down. But this whole experiment is a good starting point with the Kafka REST proxy and implementing custom Spark receivers that work with REST endpoints.