Figure 1. Data Pipeline |
For my projects I use couchdb, because I find it to be more flexible for the kind of pipelines I work with (since it doesn't have to conform to the Avro format). But with that said, Avro is the standard for data serialization and exchange in Hadoop.
I started working with Confluent Schema Registry because of its tight integration with Streamsets Data Collector, which also uses the Avro format for its pipelines. Since I let Data Collector handle the schema definition for me, I have no need for it. But...I wanted to share how I got this up and running with Kerberos and RHEL in case someone had a need..Most of this can be found in Confluent's installation guide, but I added some things to it for Kerberos, so here we go (this is for version 4.1)...
*This assumes that Kafka and Zookeeper are already up and running in your environment
1. Install the Confluent public key
sudo rpm --import https://packages.confluent.io/rpm/4.1/archive.key
2. Then create a file in /etc/yum.repos.d called confluent.repo and add this text:
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/4.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/4.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/4.1/archive.key
enabled=1
3. Clear YUM cache
sudo yum clean all
4. Install the Schema Registry
yum install confluent-schema-registry
5. Because we're dealing with Kerberos here, we need to make a JAAS file in order to set up the security context for the schema registry to run under. Create this file as /etc/schema-registry/jaas-kafka.conf . Insert this text into this file:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/schema-registry/kafka.keytab"
storeKey=true
useTicketCache=false
principal="<kafka principal>";
};
For more insight into the JAAS options for schema registry you can visit that page here.
6. You'll notice in the JAAS configuration we have a setting for KeyTab. In order for us to use a keytab to authenticate the schema registry with Keberos, we need to poach a kafka keytab from a running process and copy it to the path specified in the configuration. You can find a keytab either on a node running the Kafka gateway, or on the node running the Kafka Broker. These can be found in a directory similar to: /run/cloudera-scm-agent/process/<process>-kafka-KAFKA_BROKER/kafka.keytab . Then copy it over to /etc/schema-registry on the node running the schema registry.
To test to make sure your keytab will work you can try to kinit with it first:
kinit -kt /etc/schema-registry/kafka.keytab <kafka principal>
7. The next step is to configure the schema_registry.properties. There are a number of configurations for this file which can be found here. The bare bones needed to have schema registry with Kafka on a kerberized cluster looks like:
listeners=http://<schema registry server>:8081
kafkastore.connection.url=<zookeeper server 1>:2181, <zookeeper server 2>:2181, <zookeeper server 3>:2181
host.name=<schema registry server>
kafkastore.security.protocol=SASL_PLAINTEXT
kafkastore.bootstrap.servers=SASL_PLAINTEXT://<kafka broker>.scripps.org:9092
kafkastore.sasl.kerberos.service.name=kafka
kafkastore.topic=_schemas
debug=true
This assumes you have SASL enabled for your Kafka/Zookeeper connection:
Figure 2. Kafka Configuration |
8. Set the JAAS environment variable :
export SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=/etc/schema-registry/jaas-kafka.conf"
9. Start the schema registry service
/usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties &
You should see something like this:
[2018-05-14 13:03:01,863] INFO Logging initialized @418ms (org.eclipse.jetty.util.log:186)
[2018-05-14 13:03:02,254] INFO Initializing KafkaStore with broker endpoints: SASL_PLAINTEXT://<Kafka Broker>:9092 (io.confluent.kafka.schemaregistry.storage.KafkaStore:103)
[2018-05-14 13:03:02,727] INFO Creating schemas topic _schemas (io.confluent.kafka.schemaregistry.storage.KafkaStore:186)
[2018-05-14 13:03:02,970] INFO Initialized last consumed offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:138)
[2018-05-14 13:03:02,975] INFO [kafka-store-reader-thread-_schemas]: Starting (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:66)
[2018-05-14 13:03:03,161] INFO Wait to catch up until the offset of the last message at 0 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,208] INFO Joining schema registry with Zookeeper-based coordination (io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry:212)
[2018-05-14 13:03:03,388] INFO Created schema registry namespace <Zookeeper Server>:2181/schema_registry (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:161)
[2018-05-14 13:03:03,410] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,428] INFO Successfully elected the new master: {"host":"<Schema Registry Server>","port":8081,"master_eligibility":true,"scheme":"http","version":1} (io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector:102)
[2018-05-14 13:03:03,434] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore:277)
[2018-05-14 13:03:03,515] INFO Adding listener: http://<Schema Registry Server>:8081 (io.confluent.rest.Application:190)
[2018-05-14 13:03:03,581] INFO jetty-9.2.24.v20180105 (org.eclipse.jetty.server.Server:327)
[2018-05-14 13:03:04,257] INFO HV000001: Hibernate Validator 5.1.3.Final (org.hibernate.validator.internal.util.Version:27)
[2018-05-14 13:03:04,437] INFO Started o.e.j.s.ServletContextHandler@5c530d1e{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2018-05-14 13:03:04,453] INFO Started NetworkTrafficServerConnector@299321e2{HTTP/1.1}{<Schema Registry Server>:8081} (org.eclipse.jetty.server.NetworkTrafficServerConnector:266)
[2018-05-14 13:03:04,453] INFO Started @3011ms (org.eclipse.jetty.server.Server:379)
[2018-05-14 13:03:04,454] INFO Server started, listening for requests... (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:45)
Then disown the process
disown
10. We can now verify that our Kafka Topic for the schema registry has been created:
/usr/bin/kafka-topics --list --zookeeper <zookeeper server>:2181
Which should display something like this:
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Socket connection established to <zookeeper server>:2181, initiating session
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: Session establishment complete on server <zookeeper server>:2181, sessionid = 0x162fecd66aadacb, negotid timeout = 30000
18/05/15 11:28:45 INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
_schemas
18/05/15 11:28:45 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
18/05/15 11:28:45 INFO zookeeper.ZooKeeper: Session: 0x162fecd66aadacb closed
18/05/15 11:28:45 INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x162fecd66aadacb
You can see that the _schemas topic configured in our schema_registry.properties file has been created.
11. Next I turn off the compatibility requirements. This ensures that new schemas entered for the same subject don't have to be compatible with previous versions, which might be a good idea if you work with any kind of data drift (this is optional based on your requirements):
curl -X PUT -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "NONE"}' \
http://<Schema Registry Server>:8081/config
12. Now we can register a new Avro schema. If you are sourcing the data from a delimited flat file, you can enter the schema directly. For example, we have a flat file source that contains the following pipe delimited columns:
lastName|firstName|middleINI|MRN|pcpID|planCode|planName|effDt|termDt|memberPlanID|planType|planDescription|empGrp|planNumber
which translate to an Avro schema like this:
{
"type":"record",
"name":"Sample_Extract",
"doc":"sample flat file extract",
"fields": [ {
"name" : "lastName",
"type" : ["null", "string"],
"default" : null,
"columnName" : "lastname",
"sqlType" : "12"
},
{
"name": "firstname",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "firstname",
"sqlType" : "12"
},
{
"name" :"middleini",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "middleini",
"sqlType" : "12"
},
{
"name" : "mrn",
"type" : [ "null", "string" ],
"default" : null,
"columnName" : "mrn",
"sqlType" : "12"
},
{
"name" : "pcpid",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "pcpid",
"sqlType" : "12"
},
{
"name" : "plancode",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "plancode",
"sqlType" : "12"
},
{
"name" : "planname",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "planname",
"sqlType" : "12"
},
{
"name" : "effdt",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "effdt",
"sqlType" : "12"
},
{
"name" : "termdt",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "termdt",
"sqlType" : "12"
},
{
"name" : "memberplanid",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "memberplanid",
"sqlType" : "12"
},
{
"name" :"plantype",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "plantype",
"sqlType" : "12"
},
{
"name" : "plandescription",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "plandescription",
"sqlType" : "12"
},
{
"name" : "empgrp",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "empgrp",
"sqlType" : "12"
},
{
"name" : "plannumber",
"type" :[ "null", "string" ],
"default" : null,
"columnName" : "plannumber",
"sqlType" : "12"
}
],
"tableName" : "sample.extract"
}
To register this schema, which I will call "sample_extract_schema", in the registry we can call the REST API using curl like this:
curl -X POST -i -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"schema": "{ \"type\":\"record\", \"name\":\"Sample_Extract\", \"doc\":\"sample flat file extract\", \"fields\": [ { \"name\" : \"lastName\", \"type\" : [\"null\", \"string\"], \"default\" : null, \"columnName\" : \"lastname\", \"sqlType\" : \"12\" }, { \"name\": \"firstname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"firstname\", \"sqlType\" : \"12\" }, { \"name\" :\"middleini\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"middleini\", \"sqlType\" : \"12\" }, { \"name\" : \"mrn\", \"type\" : [ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"mrn\", \"sqlType\" : \"12\" }, { \"name\" : \"pcpid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"pcpid\", \"sqlType\" : \"12\" }, { \"name\" : \"plancode\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plancode\", \"sqlType\" : \"12\" }, { \"name\" : \"planname\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"planname\", \"sqlType\" : \"12\" }, { \"name\" : \"effdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"effdt\", \"sqlType\" : \"12\" }, { \"name\" : \"termdt\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"termdt\", \"sqlType\" : \"12\" }, { \"name\" : \"memberplanid\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"memberplanid\", \"sqlType\" : \"12\" }, { \"name\" :\"plantype\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plantype\", \"sqlType\" : \"12\" }, { \"name\" : \"plandescription\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plandescription\", \"sqlType\" : \"12\" }, { \"name\" : \"empgrp\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"empgrp\", \"sqlType\" : \"12\" }, { \"name\" : \"plannumber\", \"type\" :[ \"null\", \"string\" ], \"default\" : null, \"columnName\" : \"plannumber\", \"sqlType\" : \"12\" } ], \"tableName\" : \"sample.extract\" } "}' \
http://<schema registry server>:8081/subjects/sample_extract_schema/versions
13. To verify it was entered properly, we can use the REST API again:
curl -X GET -i http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest
which should return the latest version of the schema for that subject.
-Example (with Jackson and Jersey)
import java.io.IOException;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.ClientHandlerException;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.UniformInterfaceException;
import com.sun.jersey.api.client.WebResource;
public class SchemaRegistryTest {
public static class SchemaObject {
public String subject;
public int version;
public int id;
public String schema;
}
SchemaObject schemaObject;
public void executeTest() throws JsonParseException, JsonMappingException, ClientHandlerException, UniformInterfaceException, IOException {
Client client = Client.create();
WebResource webResource = client
.resource("http://<schema registry server>:8081/subjects/sample_extract_schema/versions/latest");
ClientResponse response = webResource.accept("application/vnd.schemaregistry.v1+json").get(ClientResponse.class);
if (response.getStatus() != 500) {
if (response.getStatus() != 200) {
throw new RuntimeException("Failed : HTTP
error code : " + response.getStatus());
}
response.bufferEntity();
ObjectMapper mapper = new ObjectMapper();
schemaObject
= mapper.readValue(response.getEntity(String.class), SchemaObject.class);
System.out.println(schemaObject.schema);
}
}
}
That's it! You now have a repository for your data pipeline schemas, backed by Kafka and the high availability of Zookeeper.}
}