Monday, August 5, 2019

Upserting a Data Vault Satellite in Azure SQL Data Warehouse using Data Factory and Databricks

     When doing data movement in Azure, the out of box solution is Data Factory....it is the EL in ELT. This works fine for moving data from your source systems into azure sql data warehouse (ASDW). In order to do transformations in Data Factory, you will either have to call stored procedures in ASDW, or use good ol' SSIS in your Data Factory pipeline. However; with the release of Data Flow, Microsoft has offered another way for you to transform data in Azure, which is really just Databricks under the hood. This allows for a nice GUI and drag and drop interface for your data transformations, however; this comes with a price. Everytime data factory kicks off a data flow, it has to spin up its own Databricks cluster to execute, which takes time. Multiply that with the number of data flows your DAG needs to run, and you can start to see where the problem is. Unfortunately, at the time of writing this, there is no option for you to simply use your own Databricks cluster to run these, negating the need for unnecessary spinning up of these clusters arbitrarily. Lucikly, if you like the speed that spark can add to your data pipelines, we can build these on our own using Databricks in Azure.

     First, we need to setup our Databricks cluster. Luckily this is pretty easy, a good walkthrough can be found hereNow that we have the Databricks cluster up and running, we need to generate an access token so that we can access this cluster from Data Factory, which can be done using these instructions. Now that we generated our acccess token, we can create a new linked service in Data Factory for this cluster using these instructions. In order for us to write data to ASDW we will use the SQL Data Warehouse connector for Azure Databricks. In order for this to work we will need to allow for Databricks to write to Blob Storage, which is used as temporary storage for the data being transferred between an Azure Databricks cluster and Azure SQL Data Warehouse, and whose configuration is detailed in the previous link.

     Ok now that we have our Databricks cluster configured, we need to set up some parameters for our Databricks notebook to use in order to connect to ASDW. We accomplish this through the use of environment variables in the cluster configuration. 



Figure 1. Databricks Cluster Configuration

     Under the configuration tab in your Databricks cluster there is a Spark tab that will allow you to enter your environment variables. By entering them here, all notebooks that run on this cluster can access them. Here we put in the username and password for ASDW, the ASDW fully qualified server name, the destination database name in ASDW, as well as all of the Blob storage information. You will need to restart the cluster for these to take effect.

     Ok, now we have our environment set up...we can now start coding. For this example I want to load data from a staging table to a data vault satellite table in ASDW. This represents the data set of a patient record containing an identifier (patient_id), and address attributes. The schema of the staging table looks like this:


CREATE TABLE [StageEmedsys].[Dev_Patient_]
(
 [patient_id] [decimal](10, 0) NULL,
 [address1] [varchar](30) NULL,
 [address2] [varchar](30) NULL,
 [city] [varchar](30) NULL,
 [state] [varchar](2) NULL,
 [zip] [varchar](10) NULL
)
WITH
(
 DISTRIBUTION = HASH ( [patient_id] ),
 CLUSTERED COLUMNSTORE INDEX
)
GO

     For the purposes of this example, we are assuming data has already been loading into this staging table, and there are no duplicate records. In the satellite we have the hashed patient_id (PatientKey), which allows us to join to the related patient hub, the load date, load process and source of the record, the hash of all of the satellite columns to do delta detection, and the relevant address columns. The target satellite table schema looks like this:


CREATE TABLE [raw].[SatPatientAddressEMED_TEST]
(
 [PatientKey] [varchar](64) NOT NULL,
 [LoadDate] [datetime] NOT NULL,
 [LoadProcess] [varchar](255) NOT NULL,
 [RecordSource] [varchar](255) NOT NULL,
 [HashDiff] [varchar](64) NOT NULL,
 [Address1] [varchar](30) NULL,
 [Address2] [varchar](30) NULL,
 [City] [varchar](30) NULL,
 [State] [varchar](2) NULL,
 [Zip] [varchar](10) NULL
)
WITH
(
 DISTRIBUTION = HASH ( [PatientKey] ),
 CLUSTERED COLUMNSTORE INDEX
)

     The PatientKey and the LoadDate compose the composite primary key of this satellite table. Now to load data from our staging table to our satellite, we create a Databricks notebook with the following Scala code: 


//Libraries
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import spark.implicits._
import java.security.MessageDigest
import java.math.BigInteger

//Blob Storage info
val blobStorage = System.getenv("BLOB_STORAGE") val blobContainer = System.getenv("BLOB_CONTAINER") val blobAccessKey = System.getenv("BLOB_ACCESS_KEY") val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs" val acntInfo = "fs.azure.account.key."+ blobStorage

sc.hadoopConfiguration.set(acntInfo, blobAccessKey)

//Database Connectivity
val dwDatabase = System.getenv("DW_DATABASE")
val dwServer = System.getenv("DW_SERVER")
val dwUser = System.getenv("DW_USER")
val dwPass = System.getenv("DW_PASS")
val dwJdbcPort =  "1433"
val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
 
//Get parameters
dbutils.widgets.text("RecordSource", "","")
val recordS = dbutils.widgets.get("RecordSource")

dbutils.widgets.text("LoadProcess", "","")
val loadP = dbutils.widgets.get("LoadProcess")

dbutils.widgets.text("SourceSystem", "","")
val sourceS = dbutils.widgets.get("SourceSystem")

//Get dataset for data in staging table
var stagedData: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select patient_id as SatelliteNaturalKey,[address1] as Address1,[address2] as Address2,[city] as City,[state] as State,[zip] as Zip,'0'+isnull([address1],'')+isnull([address2],'')+isnull([city],'')+isnull([state],'')+isnull([zip],'') as PreHashDiff, getdate() as LoadDate FROM [StageEmedsys].[Dev_Patient_]").load() 

//Get dataset for data in exising sat
val existingSat: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", sqlDwUrlSmall)
  .option("tempDir", tempDir)
  .option("forwardSparkAzureStorageCredentials", "true")
  .option("query", "select X.PatientKey as ExistingSatelliteKey,X.HashDiff as ExistingHashDiff from(Select C1.PatientKey,  C1.HashDiff,ROW_NUMBER() OVER (PARTITION BY C1.PatientKey ORDER BY LoadDate DESC) as RowNum from raw.SatPatientAddressEMED_TEST  C1)x where x.RowNum=1")
  .load()

//Hash function
val sha_256 = udf((s: String) => {MessageDigest.getInstance("SHA-256").digest(s.getBytes("UTF-8")).map("%02x".format(_)).mkString})

//Concat Function
val united = udf((s1: String, s2: String) =>  s1.concat(s2))

//Add additional columns
stagedData = stagedData.withColumn("SatelliteKey",sha_256(united(col("SatelliteNaturalKey"),lit(sourceS)))).withColumn("LoadProcess",lit(loadP)).withColumn("RecordSource",lit(recordS)).withColumn("HashDiff",sha_256(col("PreHashDiff")))                      
                                   
//Join and filter for new satellites, or satellites that have a different HashDiff
val dff = stagedData.join(existingSat, col("SatelliteKey")===col("ExistingSatelliteKey"), "left_outer").filter(existingSat.col("ExistingSatelliteKey").isNull ||  stagedData.col("HashDiff")=!=existingSat.col("ExistingHashDiff")).drop("SatelliteNaturalKey").drop("PreHashDiff").drop("ExistingSatelliteKey").select("SatelliteKey", "LoadDate", "LoadProcess", "RecordSource", "HashDiff", "Address1", "Address2", "City", "State", "Zip") 

//To write to ASDW
dff.write
   .format("com.databricks.spark.sqldw")
   .option("url", sqlDwUrlSmall)
   .option("forwardSparkAzureStorageCredentials", "true")
   .option("dbTable", "raw.SatPatientAddressEMED_TEST")
   .option("tempDir", tempDir)
   .mode("append")
   .save()

Ok lets break down this code block by block. The Libraries block is where we issue out import statements, for the libraries we need for the code in the notebook. This facilitates the work we do with spark dataframes and the udfs we want to work with.

The next block of code contains all of the information for connectivity to Blob storage. You'll notice each Scala variable makes use of the environment variables we defined earlier. After the Blob storage variables we see the code block for our connectivity to ASDW, once again making use of our environment variables. As you can see, I hardcoded the value of the port, but if you want to change it you can always include this in an environment variable as well.


Now that we have our connections defined, the next block of code handles input parameters via widgets. This will allow us to pass values from an Azure Data Factory pipeline to this notebook (which we will demonstrate later in this post). The parameters will pass information regarding the source system table the record came from (RecordSource), the unique identifier of the load process used to transform this data (LoadProcess), and the source system the record came from (SourceSystem).


In the code block labeled "Get dataset for data in staging table", we bring the data from the staging table into a dataframe in spark. In the SQL query we bring in all of the staging table fields, all of the fields concatenated as the PreHashDiff field, and the datetime of the execution of the query. The very next code block brings the PatientKey and the record hash, from the current records in the satellite table, into a spark dataframe. In this example I use a window function to determine the current version of a satellite record, but you can use any method that works for you in your particular case. Note, that if your ETL process hashes the PatientKey and HashDiff into the staging table, you can join your satellite to the staging table on PatientKey to reduce the number of records you have to pull from the satellite into spark. Either method has its pluses and minuses.


The next 2 lines contain our udfs we want to use with our dataframes. The first one, sha_256, we will use to get the SHA 256 hash for our PatientKey and HashDiff fields. The next udf we will use to concatenate the parameter values that are being passed in via Data Factory to the fields coming from the staging table. These concatenated values will be passed to the sha_256 udf to derive the HashDiff field.


In the next block of code, "Add additional columns", we add the parameters from Data Factory to our stagedData dataframe using the withColumn function, and generating our hash fields.


Now we can compare the datasets from the staging table and the satellite to determine deltas. For this we create a new dataframe call "dff" that will capture these deltas. We use the join function to left join the stagedData dataframe to the existingSat dataframe on SatelliteKey = ExistingSatelliteKey. We then apply the filter function to either keep records from stagedData that don't exist in existingSat, or where the record hashes differ. We then use the drop function to remove columns we don't need from the dataframe, and use the select function to project the columns from the dataframe in the order they are declared in the target satellite table.


With our dataframe of deltas ready to go, we can write to our target satellite table in ASDW in the "To write to ASDW" code block using the example detailed here. Note, we can properly parameterize this notebook so that we can use the same notebook for every satellite load, reducing the number of notebooks we have to maintain to load our satellites. We do this by changing 3 things with this notebook:


1. Add parameters for the queries to pull data from the staging table and from the satellite table.

2. Add a parameter for the target satellite table name used in the last block of code
3. Changed the staging query to contain columns that are in the same order as the target satellite table. We can do it in this example by changing the query to this:


select 
 patient_id as SatelliteNaturalKey,
        '' as SatelliteKey,
 getdate() as LoadDate,
 '' as LoadProcess,
 '' as RecordSource,
 '' as HashDiff,
 [address1] as Address1,
 [address2] as Address2,
 [city] as City,
 [state] as State,
 [zip] as Zip,
 '0'+isnull([address1],'')+isnull([address2],'')+isnull([city],'')+isnull([state],'')+isnull([zip],'') as PreHashDiff 
FROM 
 [StageEmedsys].[Dev_Patient_]

The code in the "Add additional columns" code block will replace the SatelliteKey, LoadProcess, RecordSource, and HashDiff blank values. Then you can remove the following from the code that creates the dff dataframe:


.select("SatelliteKey", "LoadDate", "LoadProcess", "RecordSource", "HashDiff", "Address1", "Address2", "City", "State", "Zip")

Ok finally, we need to create our Data Factory pipeline to call this notebook. A good walkthrough on how to do this can be found here. We start by creating a new pipeline, in this example we'll call it "SatExample". We drag a notebook activity onto the design surface and call it "Notebook_SatPatientAddressEMED_TEST":

Figure 2. Azure Data Factory Notebook Activity
On the Azure Databricks tab, select the Databricks cluster you created. Next, on the Settings tab we enter the path to our notebook and the parameter values we want to pass to the notebook:

Figure 3. Azure Data Factory Notebook Activity Settings Tab

And...thats it! You are now ready to run spark ETL pipelines in your Azure instance!

Tuesday, May 15, 2018

Working with Confluent's Schema Registry and Kerberos

     Schemas, metadata for your data, is still very valuable regardless of all of the noise around schema-less data storage. But as the old adage goes, "if all you have is a hammer, everything looks like a nail".  As is turns out, schemas are very valuable, especially in modern data pipelines. These form, what amounts to, a contract between the data coming in and the applications that consume this data. 


Figure 1. Data Pipeline


For my projects I use couchdb, because I find it to be more flexible for the kind of pipelines I work with (since it doesn't have to conform to the Avro format). But with that said, Avro is the standard for data serialization and exchange in Hadoop. 

     I started working with Confluent Schema Registry because of its tight integration with Streamsets Data Collector, which also uses the Avro format for its pipelines. Since I let Data Collector handle the schema definition for me, I have no need for it. But...I wanted to share how I got this up and running with Kerberos and RHEL in case someone had a need..Most of this can be found in Confluent's installation guide, but I added some things to it for Kerberos, so here we go (this is for version 4.1)...

*This assumes that Kafka and Zookeeper are already up and running in your environment

1. Install the Confluent public key

sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key

2. Then create a file in /etc/yum.repos.d called confluent.repo and add this text:

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key

enabled=1

3. Clear YUM cache

sudo yum clean all

4. Install the Schema Registry

yum install confluent-schema-registry

5. Because we're dealing with Kerberos here, we need to make a JAAS file in order to set up the security context for the schema registry to run under. Create this file as /etc/schema-registry/jaas-kafka.conf . Insert this text into this file:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 keyTab="/etc/schema-registry/kafka.keytab"
 storeKey=true
 useTicketCache=false
 principal="<kafka principal>";

};

For more insight into the JAAS options for schema registry you can visit that page here.

6. You'll notice in the JAAS configuration we have a setting for KeyTab. In order for us to use a keytab to authenticate the schema registry with Keberos, we need to poach a kafka keytab from a running process and copy it to the path specified in the configuration. You can find a keytab either on a node running the Kafka gateway, or on the node running the Kafka Broker. These can be found in a directory similar to: /run/cloudera-scm-agent/process/<process>-kafka-KAFKA_BROKER/kafka.keytab . Then copy it over to /etc/schema-registry on the node running the schema registry.

To test to make sure your keytab will work you can try to kinit with it first:

kinit -kt /etc/schema-registry/kafka.keytab <kafka principal>

7. The next step is to configure the schema_registry.properties. There are a number of configurations for this file which can be found here. The bare bones needed to have schema registry with Kafka on a kerberized cluster looks like:

listeners=http://<schema registry server>:8081
kafkastore.connection.url=<zookeeper server 1>:2181, <zookeeper server 2>:2181, <zookeeper server 3>:2181
host.name=<schema registry server>
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<kafka broker>.scripps.org:9092
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.topic=_schemas

debug=true

This assumes you have SASL enabled for your Kafka/Zookeeper connection:


Figure 2. Kafka Configuration

8. Set the JAAS environment variable :

export SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=/etc/schema-registry/jaas-kafka.conf"

9. Start the schema registry service

/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &


You should see something like this:

[2018-05-14 13:03:01,863] INFO Logging initialized @418ms (org.eclipse.jetty.util.log:186)
[2018-05-14 13:03:02,254] INFO Initializing KafkaStore with broker endpoints: SASL_PLAINTEXT://<Kafka Broker>:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-14 13:03:02,727] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:186)
[2018-05-14 13:03:02,970] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-14 13:03:02,975] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-14 13:03:03,161] INFO Wait to catch up until the offset of the last message at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,208] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-14 13:03:03,388] INFO Created schema registry namespace <Zookeeper Server>:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-14 13:03:03,410] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,428] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,434] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,515] INFO Adding listener: http://<Schema Registry Server>:8081 (io.confluent.rest.Application:190)
[2018-05-14 13:03:03,581] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-14 13:03:04,257] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-14 13:03:04,437] INFO Started o.e.j.s.ServletContextHandler@5c530d1e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-14 13:03:04,453] INFO Started NetworkTrafficServerConnector@299321e2{HTTP/1.1}{<Schema Registry Server>:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-14 13:03:04,453] INFO Started @3011ms (org.eclipse.jetty.server.Server:379)

[2018-05-14 13:03:04,454] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

Then disown the process

disown

10. We can now verify that our Kafka Topic for the schema registry has been created:

/usr/bin/kafka-topics --list --zookeeper <zookeeper server>:2181

Which should display something like this:

18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Socket connection established to <zookeeper server>:2181, initiating session
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Session establishment complete on server <zookeeper server>:2181, sessionid = 0x162fecd66aadacb, negotid timeout = 30000
18/05/15 11:28:45 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
_schemas
18/05/15 11:28:45 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
18/05/15 11:28:45 INFO zookeeper.ZooKeeper: Session: 0x162fecd66aadacb closed
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x162fecd66aadacb

You can see that the _schemas topic configured in our schema_registry.properties file has been created.

11. Next I turn off the compatibility requirements. This ensures that new schemas entered for the same subject don't have to be compatible with previous versions, which might be a good idea if you work with any kind of data drift (this is optional based on your requirements):

curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://<Schema Registry Server>:8081/config

12. Now we can register a new Avro schema. If you are sourcing the data from a delimited flat file, you can enter the schema directly. For example, we have a flat file source that contains the following pipe delimited columns:

lastName|firstName|middleINI|MRN|pcpID|planCode|planName|effDt|termDt|memberPlanID|planType|planDescription|empGrp|planNumber

which translate to an Avro schema like this:

{
 "type":"record",
 "name":"Sample_Extract",
 "doc":"sample flat file extract",
 "fields": [ {
   "name" : "lastName",
   "type" : ["null", "string"], 
   "default" : null,
   "columnName" : "lastname",
   "sqlType" : "12"
},
{
   "name": "firstname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "firstname",
   "sqlType" : "12"
},
{
   "name" :"middleini",

   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "middleini",
   "sqlType" : "12"
},
{
   "name" : "mrn",
   "type" : [ "null", "string" ],
   "default" : null,
   "columnName" : "mrn",
   "sqlType" : "12"
},
{

   "name" : "pcpid",
   "type" :[ "null", "string" ],
    "default" : null,
   "columnName" : "pcpid",
   "sqlType" : "12"
},
{
   "name" : "plancode",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plancode",
   "sqlType" : "12"
},
{
   "name" : "planname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "planname",
   "sqlType" : "12"

},
{
   "name" : "effdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "effdt",
   "sqlType" : "12"

},
{
   "name" : "termdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "termdt",
   "sqlType" : "12"

},
{
   "name" : "memberplanid",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "memberplanid",
   "sqlType" : "12"

},
{
   "name" :"plantype",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plantype",
   "sqlType" : "12"

},
{
   "name" : "plandescription",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plandescription",
   "sqlType" : "12"

},
{
   "name" : "empgrp",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "empgrp",
   "sqlType" : "12"
},
{
   "name" : "plannumber",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plannumber",
   "sqlType" : "12"

}
   
],
"tableName" : "sample.extract"

}

To register this schema, which I will call "sample_extract_schema", in the registry we can call the REST API using curl like this:

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{ \"type\":\"record\", \"name\":\"Sample_Extract\", \"doc\":\"sample flat file extract\", \"fields\": [ { \"name\" : \"lastName\", \"type\" : [\"null\", \"string\"], \"default\" : null, \"columnName\" : \"lastname\", \"sqlType\" : \"12\" }, { \"name\": \"firstname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"firstname\", \"sqlType\" : \"12\" }, { \"name\" :\"middleini\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"middleini\", \"sqlType\" : \"12\" }, { \"name\" : \"mrn\", \"type\" : [ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"mrn\", \"sqlType\" : \"12\" }, { \"name\" : \"pcpid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"pcpid\", \"sqlType\" : \"12\" }, { \"name\" : \"plancode\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plancode\", \"sqlType\" : \"12\" }, { \"name\" : \"planname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"planname\", \"sqlType\" : \"12\" }, { \"name\" : \"effdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"effdt\", \"sqlType\" : \"12\" }, { \"name\" : \"termdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"termdt\", \"sqlType\" : \"12\" }, { \"name\" : \"memberplanid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"memberplanid\", \"sqlType\" : \"12\" }, { \"name\" :\"plantype\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plantype\", \"sqlType\" : \"12\" }, { \"name\" : \"plandescription\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plandescription\", \"sqlType\" : \"12\" }, { \"name\" : \"empgrp\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"empgrp\", \"sqlType\" : \"12\" }, { \"name\" : \"plannumber\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plannumber\", \"sqlType\" : \"12\" } ], \"tableName\" : \"sample.extract\" } "}' \

    http://<schema registry server>:8081/subjects/sample_extract_schema/versions

13. To verify it was entered properly, we can use the REST API again:

curl -X GET -i http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest 

which should return the latest version of the schema for that subject.

-Example (with Jackson and Jersey)

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;


public class SchemaRegistryTest {

     public static class SchemaObject {

           public String subject;
           public int version;
           public int id;
           public String schema;
     }
     SchemaObject schemaObject;

     public void executeTest() throws JsonParseException, JsonMappingException, ClientHandlerException, UniformInterfaceException, IOException {

           Client client = Client.create();

           WebResource webResource = client
                     .resource("http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest");
           ClientResponse response = webResource.accept("application/vnd.schemaregistry.v1+json").get(ClientResponse.class);
           if (response.getStatus() != 500) {

                if (response.getStatus() != 200) {
                     throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());
                }
                response.bufferEntity();

                ObjectMapper mapper = new ObjectMapper();
                schemaObject = mapper.readValue(response.getEntity(String.class), SchemaObject.class);

                System.out.println(schemaObject.schema);
                 }

}
}
That's it! You now have a repository for your data pipeline schemas, backed by Kafka and the high availability of Zookeeper.