Tuesday, May 15, 2018

Working with Confluent's Schema Registry and Kerberos

     Schemas, metadata for your data, is still very valuable regardless of all of the noise around schema-less data storage. But as the old adage goes, "if all you have is a hammer, everything looks like a nail".  As is turns out, schemas are very valuable, especially in modern data pipelines. These form, what amounts to, a contract between the data coming in and the applications that consume this data. 

Figure 1. Data Pipeline

For my projects I use couchdb, because I find it to be more flexible for the kind of pipelines I work with (since it doesn't have to conform to the Avro format). But with that said, Avro is the standard for data serialization and exchange in Hadoop. 

     I started working with Confluent Schema Registry because of its tight integration with Streamsets Data Collector, which also uses the Avro format for its pipelines. Since I let Data Collector handle the schema definition for me, I have no need for it. But...I wanted to share how I got this up and running with Kerberos and RHEL in case someone had a need..Most of this can be found in Confluent's installation guide, but I added some things to it for Kerberos, so here we go (this is for version 4.1)...

*This assumes that Kafka and Zookeeper are already up and running in your environment

1. Install the Confluent public key

sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key

2. Then create a file in /etc/yum.repos.d called confluent.repo and add this text:

name=Confluent repository (dist)

name=Confluent repository


3. Clear YUM cache

sudo yum clean all

4. Install the Schema Registry

yum install confluent-schema-registry

5. Because we're dealing with Kerberos here, we need to make a JAAS file in order to set up the security context for the schema registry to run under. Create this file as /etc/schema-registry/jaas-kafka.conf . Insert this text into this file:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 principal="<kafka principal>";


For more insight into the JAAS options for schema registry you can visit that page here.

6. You'll notice in the JAAS configuration we have a setting for KeyTab. In order for us to use a keytab to authenticate the schema registry with Keberos, we need to poach a kafka keytab from a running process and copy it to the path specified in the configuration. You can find a keytab either on a node running the Kafka gateway, or on the node running the Kafka Broker. These can be found in a directory similar to: /run/cloudera-scm-agent/process/<process>-kafka-KAFKA_BROKER/kafka.keytab . Then copy it over to /etc/schema-registry on the node running the schema registry.

To test to make sure your keytab will work you can try to kinit with it first:

kinit -kt /etc/schema-registry/kafka.keytab <kafka principal>

7. The next step is to configure the schema_registry.properties. There are a number of configurations for this file which can be found here. The bare bones needed to have schema registry with Kafka on a kerberized cluster looks like:

listeners=http://<schema registry server>:8081
kafkastore.connection.url=<zookeeper server 1>:2181, <zookeeper server 2>:2181, <zookeeper server 3>:2181
host.name=<schema registry server>
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<kafka broker>.scripps.org:9092


This assumes you have SASL enabled for your Kafka/Zookeeper connection:

Figure 2. Kafka Configuration

8. Set the JAAS environment variable :

export SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=/etc/schema-registry/jaas-kafka.conf"

9. Start the schema registry service

/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &

You should see something like this:

[2018-05-14 13:03:01,863] INFO Logging initialized @418ms (org.eclipse.jetty.util.log:186)
[2018-05-14 13:03:02,254] INFO Initializing KafkaStore with broker endpoints: SASL_PLAINTEXT://<Kafka Broker>:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-14 13:03:02,727] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:186)
[2018-05-14 13:03:02,970] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-14 13:03:02,975] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-14 13:03:03,161] INFO Wait to catch up until the offset of the last message at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,208] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-14 13:03:03,388] INFO Created schema registry namespace <Zookeeper Server>:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-14 13:03:03,410] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,428] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,434] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,515] INFO Adding listener: http://<Schema Registry Server>:8081 (io.confluent.rest.Application:190)
[2018-05-14 13:03:03,581] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-14 13:03:04,257] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-14 13:03:04,437] INFO Started o.e.j.s.ServletContextHandler@5c530d1e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-14 13:03:04,453] INFO Started NetworkTrafficServerConnector@299321e2{HTTP/1.1}{<Schema Registry Server>:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-14 13:03:04,453] INFO Started @3011ms (org.eclipse.jetty.server.Server:379)

[2018-05-14 13:03:04,454] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

Then disown the process


10. We can now verify that our Kafka Topic for the schema registry has been created:

/usr/bin/kafka-topics --list --zookeeper <zookeeper server>:2181

Which should display something like this:

18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Socket connection established to <zookeeper server>:2181, initiating session
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Session establishment complete on server <zookeeper server>:2181, sessionid = 0x162fecd66aadacb, negotid timeout = 30000
18/05/15 11:28:45 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
18/05/15 11:28:45 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
18/05/15 11:28:45 INFO zookeeper.ZooKeeper: Session: 0x162fecd66aadacb closed
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x162fecd66aadacb

You can see that the _schemas topic configured in our schema_registry.properties file has been created.

11. Next I turn off the compatibility requirements. This ensures that new schemas entered for the same subject don't have to be compatible with previous versions, which might be a good idea if you work with any kind of data drift (this is optional based on your requirements):

curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://<Schema Registry Server>:8081/config

12. Now we can register a new Avro schema. If you are sourcing the data from a delimited flat file, you can enter the schema directly. For example, we have a flat file source that contains the following pipe delimited columns:


which translate to an Avro schema like this:

 "doc":"sample flat file extract",
 "fields": [ {
   "name" : "lastName",
   "type" : ["null", "string"], 
   "default" : null,
   "columnName" : "lastname",
   "sqlType" : "12"
   "name": "firstname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "firstname",
   "sqlType" : "12"
   "name" :"middleini",

   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "middleini",
   "sqlType" : "12"
   "name" : "mrn",
   "type" : [ "null", "string" ],
   "default" : null,
   "columnName" : "mrn",
   "sqlType" : "12"

   "name" : "pcpid",
   "type" :[ "null", "string" ],
    "default" : null,
   "columnName" : "pcpid",
   "sqlType" : "12"
   "name" : "plancode",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plancode",
   "sqlType" : "12"
   "name" : "planname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "planname",
   "sqlType" : "12"

   "name" : "effdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "effdt",
   "sqlType" : "12"

   "name" : "termdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "termdt",
   "sqlType" : "12"

   "name" : "memberplanid",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "memberplanid",
   "sqlType" : "12"

   "name" :"plantype",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plantype",
   "sqlType" : "12"

   "name" : "plandescription",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plandescription",
   "sqlType" : "12"

   "name" : "empgrp",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "empgrp",
   "sqlType" : "12"
   "name" : "plannumber",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plannumber",
   "sqlType" : "12"

"tableName" : "sample.extract"


To register this schema, which I will call "sample_extract_schema", in the registry we can call the REST API using curl like this:

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{ \"type\":\"record\", \"name\":\"Sample_Extract\", \"doc\":\"sample flat file extract\", \"fields\": [ { \"name\" : \"lastName\", \"type\" : [\"null\", \"string\"], \"default\" : null, \"columnName\" : \"lastname\", \"sqlType\" : \"12\" }, { \"name\": \"firstname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"firstname\", \"sqlType\" : \"12\" }, { \"name\" :\"middleini\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"middleini\", \"sqlType\" : \"12\" }, { \"name\" : \"mrn\", \"type\" : [ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"mrn\", \"sqlType\" : \"12\" }, { \"name\" : \"pcpid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"pcpid\", \"sqlType\" : \"12\" }, { \"name\" : \"plancode\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plancode\", \"sqlType\" : \"12\" }, { \"name\" : \"planname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"planname\", \"sqlType\" : \"12\" }, { \"name\" : \"effdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"effdt\", \"sqlType\" : \"12\" }, { \"name\" : \"termdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"termdt\", \"sqlType\" : \"12\" }, { \"name\" : \"memberplanid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"memberplanid\", \"sqlType\" : \"12\" }, { \"name\" :\"plantype\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plantype\", \"sqlType\" : \"12\" }, { \"name\" : \"plandescription\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plandescription\", \"sqlType\" : \"12\" }, { \"name\" : \"empgrp\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"empgrp\", \"sqlType\" : \"12\" }, { \"name\" : \"plannumber\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plannumber\", \"sqlType\" : \"12\" } ], \"tableName\" : \"sample.extract\" } "}' \

    http://<schema registry server>:8081/subjects/sample_extract_schema/versions

13. To verify it was entered properly, we can use the REST API again:

curl -X GET -i http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest 

which should return the latest version of the schema for that subject.

-Example (with Jackson and Jersey)

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;

public class SchemaRegistryTest {

     public static class SchemaObject {

           public String subject;
           public int version;
           public int id;
           public String schema;
     SchemaObject schemaObject;

     public void executeTest() throws JsonParseException, JsonMappingException, ClientHandlerException, UniformInterfaceException, IOException {

           Client client = Client.create();

           WebResource webResource = client
                     .resource("http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest");
           ClientResponse response = webResource.accept("application/vnd.schemaregistry.v1+json").get(ClientResponse.class);
           if (response.getStatus() != 500) {

                if (response.getStatus() != 200) {
                     throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());

                ObjectMapper mapper = new ObjectMapper();
                schemaObject = mapper.readValue(response.getEntity(String.class), SchemaObject.class);


That's it! You now have a repository for your data pipeline schemas, backed by Kafka and the high availability of Zookeeper.

Tuesday, May 1, 2018

Using the XML SerDe in Hive for Exploding Nested XML Elements

     This article will give a detailed account of how to install and use the XML Serde, by Dmitry Vasilenkoin Hive located in github here.

     Detailed instructions for the installation can be found here, but I'm going to take you through the step I specifically used.

1. Download the latest version of the XML SerDe jar from here.
2. Pick a directory on the linux OS, where the Hive server is running on, and upload the jar to it. In this example I used /usr/lib/hive/lib .
3. Change the owner of this dir to hive: chown /usr/lib/hive/lib hive:hive
4. Make sure the jar is executable: chmod +x /usr/lib/hive/lib/hivexmlserde-
5. Set the "Hive Auxiliary JARs Directory" Hive configuration in Cloudera manager to /usr/lib/hive/lib
6. There may be sentry settings, if you are using it, that need to be made and instructions on how to do that are in the link provided.

     The XML SerDe allows you to query an XML file as if it was a relational table. This is a perfect example of the term "schema on read" that gets tossed around when discussing the benefits of Hadoop. In fact, you can have many Hive tables that reference a single XML document, each with a different view of that file.
Say we had an XML file that looks like this:

     Now from looking at this data we see a few elements that can be separated out into their own tables:  <QUESTION><CUSTOMERLEVELDATA><ANALYSIS><DEMOGRAPHICS><HCAHPS><COMMENTS>

     First thing we need to do is create Hive tables that will form the basis for all of these different elements. Let's use the answers submitted in the XML doc as the basis for our table. This will be the base for building views for these elements :<CUSTOMERLEVELDATA><ANALYSIS><DEMOGRAPHICS><HCAHPS>

DROP TABLE IF EXISTS raw_answers_xml;
CREATE EXTERNAL TABLE sampledb.raw_answers_xml( 
customerleveldata array< 
row format serde 'com.ibm.spss.hive.serde2.xml.XmlSerDe' 
WITH serdeproperties ( "column.xpath.customerleveldata"="/DATA_EXPORT/CUSTOMERLEVELDATA" ) 
stored AS 
inputformat 'com.ibm.spss.hive.serde2.xml.XmlInputFormat' 
outputformat 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' 
location "/sources/sampledb/raw" 
tblproperties ( "xmlinput.start"="<DATA_EXPORT>", "xmlinput.end"="</DATA_EXPORT>" 

     The Hive table we want to create here is called raw_answers_xml and encompasses all of the mentioned elements. You'll notice that we translate the <CUSTOMERLEVELDATA> element into an array, a struct for the child elements <SURVEY_ID><CLIENT_ID><SERVICE><RECDATE><DISDATE>, an array for the
<DEMOGRAPHICS> element, a struct for it's child element <RESPONSE> and an array for another child element <HCAHPS> and its relevant child elements. I'd like to point out a few things here that should standout. The property column.xpath.customerleveldata takes xpath queries which you can use to traverse the XML document. In this instance we want to pull data in elements starting at <DATA_EXPORT><CUSTOMERLEVELDATA>. The next properties to touch upon are xmlinput.start and xmlinput.end. These tell the SerDe what should be the start and end elements in the XML document for capturing data. So make sure what ever elements you want to pull into the Hive table are within these start and end points.

     The first element we'll look at is the <CUSTOMERLEVELDATA> element, since this element contains non-repeating child elements (XML Array) that we want to break out into its own table (we'll start off simple) i.e. :


     This will form the basis for our answers_survey view, which will be our normalized tuple for containing unique instances of a survey response. The view we build to represent this part of the XML document will be called answers_survey and the code to generate it is:

DROP VIEW IF EXISTS answers_survey;
CREATE VIEW answers_survey AS 
 SELECT main_cols.survey_id, 
        Unix_timestamp( Concat(main_cols.recdate,' 00:00:00' ) )*1000 AS recdate,
        Unix_timestamp( Concat(main_cols.disdate,' 00:00:00' ) )*1000 AS disdate,
        input__file__name AS sourcefile
 sampledb.raw_answers_xml xml 
lateral VIEW explode(xml.customerleveldata) atable   
lateral VIEW inline (array(atable.atable)) ex_cols 
lateral VIEW inline (array(ex_cols.customerleveldata)) main_cols; 

     This view traverses the array declared in the raw_answers_xml table and explodes it so we can view the data in rows. But we're not done. All the explode does is handle the array, we still have to deal with the underlying structs. These are represented in JSON format, the first being the customerleveldata struct. To parse inside of this we use an inline to then access the struct underneath it. Then use another inline on that struct to get our columns and project them in our view.
Now that we showed a simple example, lets jump to one where we have repeating rows for a single survey. We will use the same base table raw_answers_xml and this time tackle the answers given under the <DEMOGRAPHICS> element. This view will be named answers_demographics and the code for it is as follows:

DROP VIEW IF EXISTS answers_demographics;
CREATE VIEW IF NOT EXISTS answers_demographics AS
  SELECT main_cols.survey_id,
         input__file__name AS sourcefile
  FROM   sampledb.raw_answers_xml xml 

lateral VIEW explode(xml.customerleveldata) atable AS atable 
lateral VIEW inline (array(atable.atable)) AS ex_cols 
lateral VIEW inline (array(ex_cols.customerleveldata)) AS main_cols 
lateral VIEW explode(main_cols.demographics) dtable AS dtable 
lateral VIEW inline (array(dtable.dtable)) AS d_cols 
lateral VIEW inline (array(d_cols.response)) AS r_cols;

     In this view we are traversing 2 arrays, the top level array for <CUSTOMERLEVELDATA> and the array for <DEMOGRAPHICS>. So we first explode <CUSTOMERLEVELDATA> then parse out the struct, which contains the survey_id that we can use to join to the answers_survey view we created in the previous example. Then we explode <DEMOGRAPHICS> to get to the struct to parse out all of the answers given for demographics.

     So those are 2 examples of how to parse out tables from the XML document using the SerDe. One of the tables representing a single instance of a survey, the other being multiple answers given for a survey. The best thing about this is we can derive all of our tables/views from the XML file(s) itself with transformations on the fly and without having to materialize on the cluster.