«

Serializing to Parquet from Kafka with Exactly Once Guarantee

Posted by Sunita Koppar

In the process of building our new analytics pipeline, we had to implement a typical lambda architecture. A lambda architecture is where you need to implement one massive process flow for real-time data and another for batch for the sake of performance. Mainly because the optimization points for real-time and batch are almost entirely unique to each other. Throughout our process of designing and improving the batch portion of our new lambda architecture, we faced many challenges and learned helpful lessons. We hope a summary of these takeaways will offer helpful insight to those implementing their own batch pipelines.

The Approach

We were not starting completely from scratch, as our existing system had a batch pipeline, which functioned primarily as a backup data source and repository for making raw data inquiries. We used Hive/Presto query-able HDFS files to allow insight into the raw data. We also had Flume working in a multi-function capacity where it would write to Kafka as well as storing to HDFS. There turned out to be multiple issues with this approach.

Approach-1

For starters:

This approach is slow, has multiple knobs and yet doesn’t guarantee the exactly once delivery semantics with Kafka.

Fortunately, we were able to address all these issues. Our batch sizes were very much within the retention period of our team’s Kafka instance (7 days by default). As our target format was some columnar storage, we skipped the intermediate step of persisting smaller files. We pushed the data from Kafka directly to HDFS as Parquet. This also eliminated dependency on another component – Flume. So that knocked out two of our problems.

With this, the pipeline at a high level looks like this:

Approach-2

To achieve exactly once data consumption, there are a couple of “typical” approaches one could take.

a. Persisting consumer offsets into an RDBMS (Relational Database Management System)

b. Checkpointing functionality in Spark

c. Persisting to Zookeeper (similar to database)

A newer option is to persist in a special topic in Kafka. For the version of Kafka we have (below 0.9.x), this method was not yet available to us. So we were left with choosing between the former a, b, and c.

We knew right away that we did not want to set up a database just for the offsets, so we quickly knocked out the option a.

Checkpointing (option b) was what we attempted first and that caused multiple issues:

Sql = SQLContext = SQLContext.getOrCreate(rdd.sparkContext)

(Details here)

So that brings us to using Zookeeper as persistence, which is an approach inspired by this strategy, but differs in the way offsets are stored in Zookeeper (which seems to be working quite well since we started using it a couple of months ago).

The approach explained in the cited blog post stores a string in zookeeper that looks like:

<partition id>:<offset>,<partition id>:<offset>,....

While the way we are using it is to store the offsets in the standard path - /consumer/offsets which changes the way Zookeeper is read and updated. This approach has an added advantage that all standard monitoring tools can be used for reporting.

Note: Kafka provides some APIs to manage offsets in Zookeeper through the brokers. In our implementation, the consumer does it directly without broker’s involvement, using the same libraries. A major motivation to this is 1- Spark offsets are available in DStreams until the first change only. Any operation on the InputDStream will erase that information. Hence we either need to store the information in a variable and update at a later point (which is lot more error prone) or update the offsets right away. We update the offsets right away. The mentioned blog explains this in good detail.

hdfs@hmaster003:/opt/cloudera/parcels/KAFKA/bin$ ./kafka-consumer-offset-checker --zookeeper hmaster003.toa:2181 --topic beacon_raw-bb17 --group spark-consumer
Group           Topic                      	Pid Offset      	logSize     	Lag         	Owner
spark-consumer  beacon_raw-bb17                0   430497589   	430539852   	42263       	none
spark-consumer  beacon_raw-bb17            	1   424068412   	424106926   	38514       	none
spark-consumer  beacon_raw-bb17                2   429174880   	429216418   	41538       	none
……..
spark-consumer  beacon_raw-bb17                18  432487688   	432530616   	42928       	none
spark-consumer  beacon_raw-bb17                19  424975641   	425014165   	38524       	none

The complete code pertaining to this is available in ZookeeperOffsetStore.scala (which is available publicly).

Here is the code snippet pertaining to this functionality (a more thorough explanation can be found in the referenced blog post):

The driver program that creates direct stream from Kafka


val offsetsStore = new ZooKeeperOffsetsStore(conf.getString("zkHosts"), conf.getString("groupId"), conf.getString("topics"))
val storedOffsets = if (conf.getBoolean("resetOffset")) None else offsetsStore.readOffsets() // this flag helps restart from latest offsets. Needed when we would like to skip some records.

val kafkaArr =  storedOffsets match {
  case None =>
	// start from the initial offsets
	KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder](ssc, kafkaProps, Set(topics))

  case Some(fromOffsets) =>
	// start from previously saved offsets
	val messageHandler: MessageAndMetadata[String, Array[Byte]] => (String, Array[Byte]) = (mmd: MessageAndMetadata[String, Array[Byte]]) => (mmd.key, mmd.message)
	KafkaUtils.createDirectStream[String,Array[Byte],StringDecoder,DefaultDecoder,Tuple2[String, Array[Byte]]](ssc, kafkaProps, fromOffsets, messageHandler)

}

ZookeeperStore: Saving offsets – Here we get all the partitions for the given topics and loop through to update the offset corresponding to each topic/partition combination:

override def saveOffsets(topic: String, rdd: RDD[_]): String = 
{
  val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).toSeq)
  val topicDirs = new ZKGroupTopicDirs(consumerGrp, topic)
  LogHandler.log.debug("Saving offsets to ZooKeeper")
  val start = System.currentTimeMillis()
  val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  offsetsRanges.foreach(offsetRange => LogHandler.log.trace(s"Using ${offsetRange}"))
  val offsetsRangesStr = offsetsRanges.map { offsetRange =>
	ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + offsetRange.partition, (offsetRange.fromOffset).toString)
	//LogHandler.log.debug("FromOffset is "+ (offsetRange.fromOffset).toString)
	s"${offsetRange.partition}:${offsetRange.fromOffset}"
  }.mkString(",")
  LogHandler.log.debug(s"Writing offsets to ZooKeeper: ${offsetsRangesStr}")
  LogHandler.log.debug("Done updating offsets in ZooKeeper. Took " + (System.currentTimeMillis() - start))
  offsetsRangesStr
}

Reading previously stored offsets and return None if not available, which is essentially the first time the consumer runs or when resetOffset flag is set (in case of resetOffset, this method is not invoked at all and the main program automatically infers a None value:

Reading Previous Offsets from Zookeeper:

private def getOffsets(groupID :String, topic: String):Option[Map[TopicAndPartition,Option[String]]] = {
  val topicDirs = new ZKGroupTopicDirs(groupID, topic)
  val offsets = new mutable.HashMap[TopicAndPartition,Option[String]]()
  val topicSeq = List(topic).toSeq
  try {
	val topParts = ZkUtils.getPartitionsForTopics(zkClient, topicSeq)
	LogHandler.log.debug("Number of topics are "+ topParts.size +" they are " + topParts.toString())
	var partition:Object=null
	for (top <- topParts) {
  	for (partition <- top._2.toList)
  	{
  	//LogHandler.log.debug("Partition is "+ partition.toString() + " lets access "+ partition._2(0))
  	val partitionOffsetPath:String = topicDirs.consumerOffsetDir + "/" + partition;
  	val maybeOffset:Option[String] = ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)._1;
  	LogHandler.log.trace("partitionOffsetPath is "+ partitionOffsetPath + " maybeOffset is "+ maybeOffset.toString + " ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath)  "+ ZkUtils.readDataMaybeNull(zkClient, partitionOffsetPath).toString())
  	val topicAndPartition:TopicAndPartition  = new TopicAndPartition(top._1, partition)
  	offsets.put(topicAndPartition, maybeOffset)
  	} // All partition offsets for 1 topic
	}
  }
Option(offsets.toMap)
}


// Read the previously saved offsets from Zookeeper

override def readOffsets: Option[Map[TopicAndPartition, Long]] = {
  LogHandler.log.debug("Reading offsets from ZooKeeper")
  val offsets = new mutable.HashMap[TopicAndPartition, Long]()
  val offsetsRangesMap = getOffsets(consumerGrp, topic)
  val start = System.currentTimeMillis()
  offsetsRangesMap.get.foreach {
	case (tp, offsetOpt) =>
  	offsetOpt match {
    	case Some(offstr) =>
      	LogHandler.log.trace("Done reading offsets from ZooKeeper. Took " + (System.currentTimeMillis() - start))
      	val currentOffset:Long = if (Some(offstr).isDefined) Some(offstr).get.toLong else 0L
      	offsets.put(tp, currentOffset)
    	case None =>
      	LogHandler.log.debug("No offsets found in ZooKeeper for " + tp + ". Took " + (System.currentTimeMillis() - start))
      	None
  	}
  }
if (offsets.size == 0)
  None
else
  Option(offsets.toMap)
}

From here we implement the conversion from a typical Kafka byte array to Parquet.

We do this in 4 steps, the first being internal requirement as the data comes to us a protocol buffers:

1) Read the byte array from Kafka and deserialize it using the protobuffer classes.

2) Create a map from the custom http object. This step is essential in our usecase since the http object contains non-essential data and secondly, it is a nested structure which requires parsing.

3) Build an Avro object from this map. This step requires creating a schema definition file i.e. *.avsc file.

Here is a sample for public reference:

{  
  "namespace":"com.edgecast.avro",
  "type":"record",
  "name":"AvroData",
  "version":"20170123",
  "fields":[  
{  
  "name":"loc",
  "description":"Location",
  "type":["null","string"],
  "default":null
},
{  
  "name":"title",
  "description":"Asset title",
  "type":[ "null", "string"],
  "default":null
},
{  
  "name":"timeZone",
  "description":"Time zone offset",
  "type":"int",
  "default":-99999999
}

The numeric default of “-99999999” is a workaround to ensure null values for missing numeric fields (for details, refer to this). Converting to Avro helps validate the data types and also facilitates efficient conversion to Parquet as the schema is already defined.

4) Create a sequence from the Avro object which can be converted to Spark SQL Row object and persisted as a parquet file. I tried converting directly from Avro to Spark Row, but somehow that did not work. If anyone reading the blog has suggestions in this regard, I would love to hear. Hopefully in the next versions of Spark, this becomes possible and we can eliminate the step of creating a sequence from the Avro object. This is the step that creates a limitation of requiring magic numbers like “-99999999” for numeric fields explained above. We replace the magic number with null before persisting and this helps retain NULLs in the persistence layer.

As a side note, the Spark Avro version compatible with the version of Spark we use, does not support some methods like SchemaConverters.toSqlType We cloned the Spark-Avro repo and changed the access modifiers for these methods in order to get it to work.

Now let’s look at these 4 steps in complete detail:

kafkaArr is the DStream we build from spark streaming for a 2-hour sliding window. In order to convert the DStream, we use mapPartitions which is more efficient than map as it applies the method to all the records in a partition (single physical file). Here, what we essentially do is pass every record received in the DStream through the conversion mechanism. Below is the orchestration in the main program and the conversion occurs in:

`AvroBeacondataUtils.avroToListWithAudit`

For auditing purposes, we built a string from partition:offset combinations for all the partitions available and create a hash string with it. This will be the filename of the output. In case of abrupt shutdown, the last file will be recreated and overwritten, so essentially, even if processing is duplicated, data is not.


Extract data and offsets from DStreams:

kafkaArr.foreachRDD{ (rdd,time) =>
  val offsetSaved =  offsetsStore.saveOffsets(topics,rdd).replace(":","-").replace(",","_")  à indicate the starting point
  LogHandler.log.debug("Saved offset to Zookeeper")
  val timeNow = DateFunctions.timeNow()
  val loadDate = DateFunctions.getdt(timeNow,dateFormats.YYYYMMDD)
  val suffix= StringUtils.murmurHash(offsetSaved)
  val schema = SchemaConverters.toSqlType(BeaconAvroData.getClassSchema).dataType.asInstanceOf[StructType]
  val ardd = rdd.mapPartitions{itr =>
    itr.map { r =>try {
  	val cr = AvroBeacondataUtils.avroToListWithAudit(r._2, offsetSaved, loadDate, timeNow)
  	Row.fromSeq(cr.toArray)
	} catch{
  	case e:Exception => LogHandler.log.error("Exception while converting to Avro" + e.printStackTrace())
    	System.exit(-1)
    	Row(0)  //This is just to allow compiler to accept
	}
     }
  }

  val df = sqlctx.createDataFrame(if(conf.getBoolean("reduceOutputPartitions")) ardd.coalesce(conf.getInt("numOutputPartitions"))
  	                            else ardd
                          	,schema)
  mod_df = setNullableStateForAllColumns(df,true)
  mod_df.save(conf.getString("ParquetOutputPath") +suffix, "parquet", SaveMode.Overwrite)

Let’s look at AvroBeacondataUtils class and how it converts the data: We will get the bytearray from kafka converted to the custom http object using the accessor methods (generated from the .proto files)


Convert to Avro

public static BeaconAvroData getAvroBeaconData (byte[] protodata)
    	throws InvalidProtocolBufferException {
	Accesslog.accesslog sfLog = Accesslog.accesslog.parseFrom(protodata);
	return getAvroBeaconData(sfLog);
}

From the generated map, we will build an Avro object. The specification for Avro is typically provided in a .avsc file and using a maven plugin, accessor classes can be generated. The pom.xml from the github provides these details.

public static BeaconAvroData getAvroBeaconData (Accesslog.accesslog sfLog){
	Config conf= AppConfig.conf();
	Map<String,Object> namedBeacon = getNamedBeaconData(sfLog);
	BeaconAvroData avroBeacon = com.edgecast.avro.BeaconAvroData.newBuilder().build();
	if(conf.getBoolean("appendDerivedFields")){
    	Enrich.appendBeaconDerivedFields(namedBeacon,avroBeacon);
	}
	try {
        avroBeacon.setXXX(namedBeacon.get("xxx")));  // we need to add each element using its setter method. If there is a way to do this programmatically, we would like to hear. Please leave comments.
}

return avroBeacon;

Building the Avro object helps take care of missing keys or null values in the data and they will not get lost in the process of converting to the Spark DataFrame.

Once we have the avro object, we can generate a sequence out of it which will be converted to a Spark SQL Row. Spark SQL has the notion of a dataframe which in RDBMS parlance is nothing but a table and a Row corresponds to a single row in a table.

Create a sequence from Avro to facilitate Spark DataFrame creation

public static List avroToList(BeaconAvroData a) throws UnsupportedEncodingException{
	List l = new ArrayList<>();
	for (Schema.Field f : a.getSchema().getFields()) {
    	Object value = a.get(f.name());
        if (value == null) {
        	l.add(null);
    	}
    	else {
        	switch (f.schema().getType().getName()){
            	case "union":
                	l.add(value.toString());
                	break;
            	case "int":
                	if(Integer.valueOf(value.toString()).equals(Integer.valueOf(AvroNumericDefault))) {
                    	l.add(null);
           	     }
                	else {
                    	l.add(Integer.valueOf(value.toString()));
                	}
              	break;
            	case "long”:
                	if(Long.valueOf(value.toString()).equals(Long.valueOf(AvroNumericDefault))) {
                    	l.add(null);}
                	else {
                    	l.add(Long.valueOf(value.toString()));}
                	break;
            	default:l.add(value);
                	break;
        	}
    	}
   }
	return l;
}

Partitioning the final output is much easier after spark 1.4, however, for our version, we had to do this to achieve partitioning:

val pd = mod_df.map{r =>
val n = r.length
(r(n-4).toString,r(n-3).toString,r(n-2).toString ,r(n-1).toString)}.distinct.collect.toList
pd.foreach{case (y,m,d,h)=>
 	val partdf=sqlctx.sql("select * from validtb where partYear="+ y+ " and partMonth="+ m + " and partDay="+ d + " and partHr="+ h)
  partdf.save(conf.getString("ParquetOutputPath")+"year="+y+"/month="+m+"/day="+d+"/hr="+h+"/"+suffix, "parquet", SaveMode.Overwrite)
 LogHandler.log.info("Created the parquet file"+ conf.getString("ParquetOutputPath")+"year="+y+"/month="+m+"/day="+d+"/hr="+h+"/"+suffix)
}

Results/Comparison

Size on disk

disk-size

For a single day’s worth of data, using the old approach:

4.3 G 12.9 G /beacons/hive/raw/customer_id=bb17/date=20170322

VS.

For a single day’s worth of data, using the new approach:

1.3 G 3.8 G /user/hive/warehouse/parquet/bb17/date=20170322

The first column indicates that the actual content was reduced from 4.5 GB to 1.3 GB, while the actual consumption of the disk space due to redundancy, came down from 12.9 GB to 3.8 GB.

As we know, Hadoop (especially the Name Nodes) works better with fewer large files versus many tiny files. Considering that, file count is also a factor for optimization.

Number of files

num-difference

hdfs@hmaster003:~$ hadoop fs -ls /beacons/hive/raw/customer_id=bb17/date=20170322 wc -l

Number of files, using old approach: 2450

VS.

hadoop fs -ls /user/hive/warehouse/parquet/bb17/date=20170322 wc -l

Number of files, using new approach: 14


Conclusion

This implementation has been in use in our application for 3-4 months now and we have successfully implemented exactly once data transformation with reliable persistence of data, using as low as 2GB memory across the cluster and 2 executors. We’d love to know how your exactly once implementation approach might be similar or different from ours!

Please feel free to reach out via Twitter @sunitakoppar or @engage_vdms to share feedback!

Share