In Streamsets Data Collector they have a Kudu Destination that takes advantage of the Java API to load data. This is tightly coupled with the CDC feature they have on their JDBC Origin. The way this works is that the CDC operations like:
1 for INSERT
2 for DELETE
3 for UPDATE
4 for UPSERT
are assigned to the Data Collector header variable sdc.operation.type which in turn tells the Kudu destination what to do with the data. Unfortunately for some of us change tracking users the operation codes come in as I, D or U. So in order for us to take advantage of Kudu, we're going to have to translate these change tracking operations and get that assigned to the header variable.
Before we dive into how we do this I wanted to list a couple of gotchas I ran into while using Data Collector with Kudu. Unlike a Hive destination that has the Hive Metadata Processor, the Kudu destination has no equivalent and will not create the table for you. You will have to create the table in advance of kicking off the pipeline or it will fail. Kudu will not accept your datetime data types from SQL Server, you will have to get the Unix epoch representation and store them in a bigint. I will demonstrate how to do this. Make sure the Max Batch Size (Records) on the JDBC origin does not exceed the Mutation Buffer Space (records) on the Kudu Destination or the pipeline will fail.
Ok, for this example we will use a pipeline that looks like this:
Figure 1. JDBC to Kudu Pipeline |
For the JDBC Origin make sure that:
-The Incremental Mode checkbox is checked
-Initial Offset is set to 0
-The Offset Column is set to sys_change_version
The source query is going to pull from this table:
Figure 2. Source System Table |
using this query:
DECLARE @offset INT =${offset}
DECLARE @unixepoch2 datetime2 = '1970-01-01 00:00:00.0000'
IF (@offset =0 OR @offset IS NULL)
SELECT
sct.abn_note_id AS abn_note_id,
sct.line AS line,
sct.cm_log_owner_id AS cm_log_owner_id,
sct.cm_phy_owner_id AS cm_phy_owner_id,
sct.order_id AS order_id,
((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date,
c.sys_change_version AS sys_change_version,
'I' AS sys_change_operation
FROM
dbo.abn_orders sct
CROSS APPLY (
SELECT
coalesce(max(sys_change_version), 0) AS sys_change_version
FROM
changetable(changes dbo.abn_orders, 0) c) AS c
ELSE
SELECT
ct.abn_note_id AS abn_note_id,
ct.line AS line,
sct.cm_log_owner_id AS cm_log_owner_id, sct.cm_phy_owner_id AS cm_phy_owner_id,
sct.order_id AS order_id,
((cast (datediff(day,@unixepoch2,sct.abn_check_from_date) AS float) * 86400) + (cast(datediff(millisecond,dateadd(day,datediff(day,@unixepoch2,sct.abn_check_from_date),@unixepoch2),sct.abn_check_from_date) AS float ) / 1000))*1000 AS abn_check_from_date,
sys_change_version AS sys_change_version,
sys_change_operation AS sys_change_operation
FROM
dbo.abn_orders AS sct
RIGHT OUTER JOIN changetable(changes dbo.abn_orders, ${offset}) AS ct
ON ct.abn_note_id = sct.abn_note_id AND ct.line = sct.line
WHERE
sys_change_version > ${offset}
ORDER BY
sys_change_version
In this query you can see we are using an if - else statement. The first part of the if is used when the offset is 0, thus triggering an initial load from the table and grabbing the latest offset to store in Data Collector. The reason we have to do this is to protect ourselves if we are loading this data from outside the change tracking retention window, or change tracking was not enabled from the table's inception. After the else is used for delta loading. This will pull data from the table since the last recorded offset. Pro tip...if you are using incremental pipelines make sure to periodically backup /var/lib/sdc/data on your Data Collector server in order to save the offset states of your pipelines. Can definitely come in handy when disaster strikes.
You will also notice the functions we apply to the abn_check_from_date field. This is to convert the SQL Server datetime to a unix epoch with millisecond precision that we can store in a bigint column over in the Kudu destination table. We can later throw a view on top of this value to display it as a timestamp in Hive/Impala.
Now that we took care of the JDBC origin, we need to take care of setting the header variable in order to tell Kudu what to do with our record(s) coming through the pipeline. We can accomplish this using a Javascript Evaluator processor. In the script textarea put:
//Loop through columns
for (var i = 0; i < records.length; i++) {
try {
//We either delete
if (records[i].value.sys_change_operation == "D") {
records[i].attributes['sdc.operation.type'] = '2'
}
//Or we upsert
else {
records[i].attributes['sdc.operation.type'] = '4'
}
// Write record to processor output
output.write(records[i]);
} catch (e) {
// Send record to error
error.write(records[i], e);
}
}
You can see that if the operation is "D" we set the header variable to 2, else we set it to 4 for an upsert. I did that for simplicity sake, you can change this to add cases specifically for inserts and updates if need be.
That's it! You now have an incremental pipeline that takes advantage of SQL Server Change Tracking and Kudu. The devs over at Streamsets have promised that they will support Change Tracking natively in a future release, but for now this solution will get you by.
No comments:
Post a Comment