Tuesday, May 15, 2018

Working with Confluent's Schema Registry and Kerberos

     Schemas, metadata for your data, is still very valuable regardless of all of the noise around schema-less data storage. But as the old adage goes, "if all you have is a hammer, everything looks like a nail".  As is turns out, schemas are very valuable, especially in modern data pipelines. These form, what amounts to, a contract between the data coming in and the applications that consume this data. 


Figure 1. Data Pipeline


For my projects I use couchdb, because I find it to be more flexible for the kind of pipelines I work with (since it doesn't have to conform to the Avro format). But with that said, Avro is the standard for data serialization and exchange in Hadoop. 

     I started working with Confluent Schema Registry because of its tight integration with Streamsets Data Collector, which also uses the Avro format for its pipelines. Since I let Data Collector handle the schema definition for me, I have no need for it. But...I wanted to share how I got this up and running with Kerberos and RHEL in case someone had a need..Most of this can be found in Confluent's installation guide, but I added some things to it for Kerberos, so here we go (this is for version 4.1)...

*This assumes that Kafka and Zookeeper are already up and running in your environment

1. Install the Confluent public key

sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key

2. Then create a file in /etc/yum.repos.d called confluent.repo and add this text:

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key

enabled=1

3. Clear YUM cache

sudo yum clean all

4. Install the Schema Registry

yum install confluent-schema-registry

5. Because we're dealing with Kerberos here, we need to make a JAAS file in order to set up the security context for the schema registry to run under. Create this file as /etc/schema-registry/jaas-kafka.conf . Insert this text into this file:

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 useKeyTab=true
 keyTab="/etc/schema-registry/kafka.keytab"
 storeKey=true
 useTicketCache=false
 principal="<kafka principal>";

};

For more insight into the JAAS options for schema registry you can visit that page here.

6. You'll notice in the JAAS configuration we have a setting for KeyTab. In order for us to use a keytab to authenticate the schema registry with Keberos, we need to poach a kafka keytab from a running process and copy it to the path specified in the configuration. You can find a keytab either on a node running the Kafka gateway, or on the node running the Kafka Broker. These can be found in a directory similar to: /run/cloudera-scm-agent/process/<process>-kafka-KAFKA_BROKER/kafka.keytab . Then copy it over to /etc/schema-registry on the node running the schema registry.

To test to make sure your keytab will work you can try to kinit with it first:

kinit -kt /etc/schema-registry/kafka.keytab <kafka principal>

7. The next step is to configure the schema_registry.properties. There are a number of configurations for this file which can be found here. The bare bones needed to have schema registry with Kafka on a kerberized cluster looks like:

listeners=http://<schema registry server>:8081
kafkastore.connection.url=<zookeeper server 1>:2181, <zookeeper server 2>:2181, <zookeeper server 3>:2181
host.name=<schema registry server>
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<kafka broker>.scripps.org:9092
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.topic=_schemas

debug=true

This assumes you have SASL enabled for your Kafka/Zookeeper connection:


Figure 2. Kafka Configuration

8. Set the JAAS environment variable :

export SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=/etc/schema-registry/jaas-kafka.conf"

9. Start the schema registry service

/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &


You should see something like this:

[2018-05-14 13:03:01,863] INFO Logging initialized @418ms (org.eclipse.jetty.util.log:186)
[2018-05-14 13:03:02,254] INFO Initializing KafkaStore with broker endpoints: SASL_PLAINTEXT://<Kafka Broker>:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-14 13:03:02,727] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:186)
[2018-05-14 13:03:02,970] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-14 13:03:02,975] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-14 13:03:03,161] INFO Wait to catch up until the offset of the last message at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,208] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-14 13:03:03,388] INFO Created schema registry namespace <Zookeeper Server>:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-14 13:03:03,410] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,428] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,434] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,515] INFO Adding listener: http://<Schema Registry Server>:8081 (io.confluent.rest.Application:190)
[2018-05-14 13:03:03,581] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-14 13:03:04,257] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-14 13:03:04,437] INFO Started o.e.j.s.ServletContextHandler@5c530d1e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-14 13:03:04,453] INFO Started NetworkTrafficServerConnector@299321e2{HTTP/1.1}{<Schema Registry Server>:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-14 13:03:04,453] INFO Started @3011ms (org.eclipse.jetty.server.Server:379)

[2018-05-14 13:03:04,454] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)

Then disown the process

disown

10. We can now verify that our Kafka Topic for the schema registry has been created:

/usr/bin/kafka-topics --list --zookeeper <zookeeper server>:2181

Which should display something like this:

18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Socket connection established to <zookeeper server>:2181, initiating session
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Session establishment complete on server <zookeeper server>:2181, sessionid = 0x162fecd66aadacb, negotid timeout = 30000
18/05/15 11:28:45 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
_schemas
18/05/15 11:28:45 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
18/05/15 11:28:45 INFO zookeeper.ZooKeeper: Session: 0x162fecd66aadacb closed
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x162fecd66aadacb

You can see that the _schemas topic configured in our schema_registry.properties file has been created.

11. Next I turn off the compatibility requirements. This ensures that new schemas entered for the same subject don't have to be compatible with previous versions, which might be a good idea if you work with any kind of data drift (this is optional based on your requirements):

curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://<Schema Registry Server>:8081/config

12. Now we can register a new Avro schema. If you are sourcing the data from a delimited flat file, you can enter the schema directly. For example, we have a flat file source that contains the following pipe delimited columns:

lastName|firstName|middleINI|MRN|pcpID|planCode|planName|effDt|termDt|memberPlanID|planType|planDescription|empGrp|planNumber

which translate to an Avro schema like this:

{
 "type":"record",
 "name":"Sample_Extract",
 "doc":"sample flat file extract",
 "fields": [ {
   "name" : "lastName",
   "type" : ["null", "string"], 
   "default" : null,
   "columnName" : "lastname",
   "sqlType" : "12"
},
{
   "name": "firstname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "firstname",
   "sqlType" : "12"
},
{
   "name" :"middleini",

   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "middleini",
   "sqlType" : "12"
},
{
   "name" : "mrn",
   "type" : [ "null", "string" ],
   "default" : null,
   "columnName" : "mrn",
   "sqlType" : "12"
},
{

   "name" : "pcpid",
   "type" :[ "null", "string" ],
    "default" : null,
   "columnName" : "pcpid",
   "sqlType" : "12"
},
{
   "name" : "plancode",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plancode",
   "sqlType" : "12"
},
{
   "name" : "planname",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "planname",
   "sqlType" : "12"

},
{
   "name" : "effdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "effdt",
   "sqlType" : "12"

},
{
   "name" : "termdt",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "termdt",
   "sqlType" : "12"

},
{
   "name" : "memberplanid",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "memberplanid",
   "sqlType" : "12"

},
{
   "name" :"plantype",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plantype",
   "sqlType" : "12"

},
{
   "name" : "plandescription",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plandescription",
   "sqlType" : "12"

},
{
   "name" : "empgrp",
    "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "empgrp",
   "sqlType" : "12"
},
{
   "name" : "plannumber",
   "type" :[ "null", "string" ],
   "default" : null,
   "columnName" : "plannumber",
   "sqlType" : "12"

}
   
],
"tableName" : "sample.extract"

}

To register this schema, which I will call "sample_extract_schema", in the registry we can call the REST API using curl like this:

curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{ \"type\":\"record\", \"name\":\"Sample_Extract\", \"doc\":\"sample flat file extract\", \"fields\": [ { \"name\" : \"lastName\", \"type\" : [\"null\", \"string\"], \"default\" : null, \"columnName\" : \"lastname\", \"sqlType\" : \"12\" }, { \"name\": \"firstname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"firstname\", \"sqlType\" : \"12\" }, { \"name\" :\"middleini\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"middleini\", \"sqlType\" : \"12\" }, { \"name\" : \"mrn\", \"type\" : [ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"mrn\", \"sqlType\" : \"12\" }, { \"name\" : \"pcpid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"pcpid\", \"sqlType\" : \"12\" }, { \"name\" : \"plancode\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plancode\", \"sqlType\" : \"12\" }, { \"name\" : \"planname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"planname\", \"sqlType\" : \"12\" }, { \"name\" : \"effdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"effdt\", \"sqlType\" : \"12\" }, { \"name\" : \"termdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"termdt\", \"sqlType\" : \"12\" }, { \"name\" : \"memberplanid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"memberplanid\", \"sqlType\" : \"12\" }, { \"name\" :\"plantype\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plantype\", \"sqlType\" : \"12\" }, { \"name\" : \"plandescription\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plandescription\", \"sqlType\" : \"12\" }, { \"name\" : \"empgrp\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"empgrp\", \"sqlType\" : \"12\" }, { \"name\" : \"plannumber\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plannumber\", \"sqlType\" : \"12\" } ], \"tableName\" : \"sample.extract\" } "}' \

    http://<schema registry server>:8081/subjects/sample_extract_schema/versions

13. To verify it was entered properly, we can use the REST API again:

curl -X GET -i http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest 

which should return the latest version of the schema for that subject.

-Example (with Jackson and Jersey)

import java.io.IOException;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;


public class SchemaRegistryTest {

     public static class SchemaObject {

           public String subject;
           public int version;
           public int id;
           public String schema;
     }
     SchemaObject schemaObject;

     public void executeTest() throws JsonParseException, JsonMappingException, ClientHandlerException, UniformInterfaceException, IOException {

           Client client = Client.create();

           WebResource webResource = client
                     .resource("http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest");
           ClientResponse response = webResource.accept("application/vnd.schemaregistry.v1+json").get(ClientResponse.class);
           if (response.getStatus() != 500) {

                if (response.getStatus() != 200) {
                     throw new RuntimeException("Failed : HTTP error code : " + response.getStatus());
                }
                response.bufferEntity();

                ObjectMapper mapper = new ObjectMapper();
                schemaObject = mapper.readValue(response.getEntity(String.class), SchemaObject.class);

                System.out.println(schemaObject.schema);
                 }

}
}
That's it! You now have a repository for your data pipeline schemas, backed by Kafka and the high availability of Zookeeper.

No comments:

Post a Comment