Tuesday, May 15, 2018

Working with Confluent's Schema Registry and Kerberos

     Schemas, metadata for your data, is still very valuable regardless of all of the noise around schema-less data storage. But as the old adage goes, "if all you have is a hammer, everything looks like a nail".  As is turns out, schemas are very valuable, especially in modern data pipelines. These form, what amounts to, a contract between the data coming in and the applications that consume this data. 


Figure 1. Data Pipeline


For my projects I use couchdb, because I find it to be more flexible for the kind of pipelines I work with (since it doesn't have to conform to the Avro format). But with that said, Avro is the standard for data serialization and exchange in Hadoop. 

     I started working with Confluent Schema Registry because of its tight integration with Streamsets Data Collector, which also uses the Avro format for its pipelines. Since I let Data Collector handle the schema definition for me, I have no need for it. But...I wanted to share how I got this up and running with Kerberos and RHEL in case someone had a need..Most of this can be found in Confluent's installation guide, but I added some things to it for Kerberos, so here we go (this is for version 4.1)...

*This assumes that Kafka and Zookeeper are already up and running in your environment

1. Install the Confluent public key

sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key

2. Then create a file in /etc/yum.repos.d called confluent.repo and add this text:

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key

enabled=1

3. Clear YUM cache

sudo yum clean all

4. Install the Schema Registry

yum install confluent-schema-registry

5. Because we're dealing with Kerberos here, we need to make a JAAS file in order to set up the security context for the schema registry to run under. Create this file as /etc/schema-registry/jaas-kafka.conf . Insert this text into this file:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 keyTab="/etc/schema-registry/kafka.keytab"
 storeKey=true
 useTicketCache=false
 principal="<kafka principal>";

};

For more insight into the JAAS options for schema registry you can visit that page here.

6. You'll notice in the JAAS configuration we have a setting for KeyTab. In order for us to use a keytab to authenticate the schema registry with Keberos, we need to poach a kafka keytab from a running process and copy it to the path specified in the configuration. You can find a keytab either on a node running the Kafka gateway, or on the node running the Kafka Broker. These can be found in a directory similar to: /run/cloudera-scm-agent/process/<process>-kafka-KAFKA_BROKER/kafka.keytab . Then copy it over to /etc/schema-registry on the node running the schema registry.

To test to make sure your keytab will work you can try to kinit with it first:

kinit -kt /etc/schema-registry/kafka.keytab <kafka principal>

7. The next step is to configure the schema_registry.properties. There are a number of configurations for this file which can be found here. The bare bones needed to have schema registry with Kafka on a kerberized cluster looks like:

listeners=http://<schema registry server>:8081
kafkastore.connection.url=<zookeeper server 1>:2181, <zookeeper server 2>:2181, <zookeeper server 3>:2181
host.name=<schema registry server>
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<kafka broker>.scripps.org:9092
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.topic=_schemas

debug=true

This assumes you have SASL enabled for your Kafka/Zookeeper connection:


Figure 2. Kafka Configuration

8. Set the JAAS environment variable :

export SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=/etc/schema-registry/jaas-kafka.conf"

9. Start the schema registry service

/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &


You should see something like this:

[2018-05-14 13:03:01,863] INFO Logging initialized @418ms (org.eclipse.jetty.util.log:186)
[2018-05-14 13:03:02,254] INFO Initializing KafkaStore with broker endpoints: SASL_PLAINTEXT://<Kafka Broker>:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-14 13:03:02,727] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:186)
[2018-05-14 13:03:02,970] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-14 13:03:02,975] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-14 13:03:03,161] INFO Wait to catch up until the offset of the last message at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,208] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-14 13:03:03,388] INFO Created schema registry namespace <Zookeeper Server>:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-14 13:03:03,410] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,428] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,434] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,515] INFO Adding listener: http://<Schema Registry Server>:8081 (io.confluent.rest.Application:190)
[2018-05-14 13:03:03,581] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-14 13:03:04,257] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-14 13:03:04,437] INFO Started o.e.j.s.ServletContextHandler@5c530d1e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-14 13:03:04,453] INFO Started NetworkTrafficServerConnector@299321e2{HTTP/1.1}{<Schema Registry Server>:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-14 13:03:04,453] INFO Started @3011ms (org.eclipse.jetty.server.Server:379)

[2018-05-14 13:03:04,454] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

Then disown the process

disown

10. We can now verify that our Kafka Topic for the schema registry has been created:

/usr/bin/kafka-topics --list --zookeeper <zookeeper server>:2181

Which should display something like this:

18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Socket connection established to <zookeeper server>:2181, initiating session
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Session establishment complete on server <zookeeper server>:2181, sessionid = 0x162fecd66aadacb, negotid timeout = 30000
18/05/15 11:28:45 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
_schemas
18/05/15 11:28:45 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
18/05/15 11:28:45 INFO zookeeper.ZooKeeper: Session: 0x162fecd66aadacb closed
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x162fecd66aadacb

You can see that the _schemas topic configured in our schema_registry.properties file has been created.

11. Next I turn off the compatibility requirements. This ensures that new schemas entered for the same subject don't have to be compatible with previous versions, which might be a good idea if you work with any kind of data drift (this is optional based on your requirements):

curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://<Schema Registry Server>:8081/config

12. Now we can register a new Avro schema. If you are sourcing the data from a delimited flat file, you can enter the schema directly. For example, we have a flat file source that contains the following pipe delimited columns:

lastName|firstName|middleINI|MRN|pcpID|planCode|planName|effDt|termDt|memberPlanID|planType|planDescription|empGrp|planNumber

which translate to an Avro schema like this:

{
 "type":"record",
 "name":"Sample_Extract",
 "doc":"sample flat file extract",
 "fields": [ {
   "name" : "lastName",
   "type" : ["null", "string"], 
   "default" : null,
   "columnName" : "lastname",
   "sqlType" : "12"
},
{
   "name": "firstname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "firstname",
   "sqlType" : "12"
},
{
   "name" :"middleini",

   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "middleini",
   "sqlType" : "12"
},
{
   "name" : "mrn",
   "type" : [ "null", "string" ],
   "default" : null,
   "columnName" : "mrn",
   "sqlType" : "12"
},
{

   "name" : "pcpid",
   "type" :[ "null", "string" ],
    "default" : null,
   "columnName" : "pcpid",
   "sqlType" : "12"
},
{
   "name" : "plancode",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plancode",
   "sqlType" : "12"
},
{
   "name" : "planname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "planname",
   "sqlType" : "12"

},
{
   "name" : "effdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "effdt",
   "sqlType" : "12"

},
{
   "name" : "termdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "termdt",
   "sqlType" : "12"

},
{
   "name" : "memberplanid",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "memberplanid",
   "sqlType" : "12"

},
{
   "name" :"plantype",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plantype",
   "sqlType" : "12"

},
{
   "name" : "plandescription",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plandescription",
   "sqlType" : "12"

},
{
   "name" : "empgrp",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "empgrp",
   "sqlType" : "12"
},
{
   "name" : "plannumber",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plannumber",
   "sqlType" : "12"

}
   
],
"tableName" : "sample.extract"

}

To register this schema, which I will call "sample_extract_schema", in the registry we can call the REST API using curl like this:

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{ \"type\":\"record\", \"name\":\"Sample_Extract\", \"doc\":\"sample flat file extract\", \"fields\": [ { \"name\" : \"lastName\", \"type\" : [\"null\", \"string\"], \"default\" : null, \"columnName\" : \"lastname\", \"sqlType\" : \"12\" }, { \"name\": \"firstname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"firstname\", \"sqlType\" : \"12\" }, { \"name\" :\"middleini\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"middleini\", \"sqlType\" : \"12\" }, { \"name\" : \"mrn\", \"type\" : [ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"mrn\", \"sqlType\" : \"12\" }, { \"name\" : \"pcpid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"pcpid\", \"sqlType\" : \"12\" }, { \"name\" : \"plancode\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plancode\", \"sqlType\" : \"12\" }, { \"name\" : \"planname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"planname\", \"sqlType\" : \"12\" }, { \"name\" : \"effdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"effdt\", \"sqlType\" : \"12\" }, { \"name\" : \"termdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"termdt\", \"sqlType\" : \"12\" }, { \"name\" : \"memberplanid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"memberplanid\", \"sqlType\" : \"12\" }, { \"name\" :\"plantype\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plantype\", \"sqlType\" : \"12\" }, { \"name\" : \"plandescription\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plandescription\", \"sqlType\" : \"12\" }, { \"name\" : \"empgrp\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"empgrp\", \"sqlType\" : \"12\" }, { \"name\" : \"plannumber\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plannumber\", \"sqlType\" : \"12\" } ], \"tableName\" : \"sample.extract\" } "}' \

    http://<schema registry server>:8081/subjects/sample_extract_schema/versions

13. To verify it was entered properly, we can use the REST API again:

curl -X GET -i http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest 

which should return the latest version of the schema for that subject.

-Example (with Jackson and Jersey)

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;


public class SchemaRegistryTest {

     public static class SchemaObject {

           public String subject;
           public int version;
           public int id;
           public String schema;
     }
     SchemaObject schemaObject;

     public void executeTest() throws JsonParseException, JsonMappingException, ClientHandlerException, UniformInterfaceException, IOException {

           Client client = Client.create();

           WebResource webResource = client
                     .resource("http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest");
           ClientResponse response = webResource.accept("application/vnd.schemaregistry.v1+json").get(ClientResponse.class);
           if (response.getStatus() != 500) {

                if (response.getStatus() != 200) {
                     throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());
                }
                response.bufferEntity();

                ObjectMapper mapper = new ObjectMapper();
                schemaObject = mapper.readValue(response.getEntity(String.class), SchemaObject.class);

                System.out.println(schemaObject.schema);
                 }

}
}
That's it! You now have a repository for your data pipeline schemas, backed by Kafka and the high availability of Zookeeper.

9 comments:

  1. islamic dua fuaktsoft channel is the best channel to promote our children to religon and prompte the attention levelof our children.
    In Islam, Invocation (duʿāʾ) (Arabic: دُعَاء‎ IPA: [duˈʕæːʔ], plural: ʾadʿiyah أدْعِيَة [ʔædˈʕijæ]) is a prayer of
    supplication or request. ... Muhammad is reported to have said, "Dua is the very essence of worship."

    ReplyDelete
  2. Convolutional Neural Network (CNN) is a type of deep neural network which has proven
    to perform well in computer vision tasks such as image classification, object detection,
    object localization and neural style transfer. In this post, I will explain about the
    different layers that make up a convolutional neural network: convolution layer, pooling
    A convolution layer transforms the input image in order to extract features from it.
    In this transformation, the image is convolved with a kernel (or filter). layer
    and fully connected layer.

    ReplyDelete

  3. one two buckle my shoes


    Poetry has so many benefits for kids. It is not only a great medium for rendering information
    but children also find poems very delightful. Poetry recitation and memorising is a fun activity that you can
    engage your kid in. Let’s take a look at some famous, funny and rhyming poems for kids. Along with that, we shall
    discuss how you can select a poem and teach your kid to recite it.

    ReplyDelete
  4. Our main goal is to train and educate our kids or children through different Rhymes for kids and attractive poems for children
    https://www.youtube.com/watch?v=pQ7_elTLT2k&list=PLGdtwx2xwTkV0MYq8uC1BBUUpRwJwJ98Q&index=2

    ReplyDelete
  5. Thank you for sharing this Informative blog, it is very useful.

    Data Engineering Services

    ReplyDelete



  6. barbaroslar episode 1 in urdu


    Comment for barbaroslar episodes: Go and Watch the Turkish Drama Series of Engin Altan's "Barbarosslar" in Urdu only on our youtube channel "My Kids Tube".
    Please subscribe and Stay Tuned!


    https://www.youtube.com/watch?v=qmJQOLaK2IM

    ReplyDelete