In this post I'll detail a way, with certain caveats, to create a single stored procedure that your SSIS packages can call to pull data from a staging table and perform SCD Type 2 transformations into a persistent staging table using a SQL merge statement. When doing a simple google search for this I came upon a really good post about merge statements written by Dallas Snider. I thought that I'd use his solution as a starting point to implement what I was trying to accomplish, a dynamic way to generate the statement by utilizing metadata from the persistent staging table.
To use the stored procedure there are a few caveats that need to be in place:
- The stage table and the persistent stage table need to be in different databases
- The stage table and the persistent stage table need to have the same name
- The stage table and the persistent stage table need to have the same fields, with same names, except for the control fields in the persistent staging table. These control fields include:
- EffectiveStartDT (The datetime the record was valid from, defaults to 1/1/900)
- EffectiveExpireDT (The datetime the record was valid to, defaults to 1/1/4000)
- CurrentRowYN (A Y or N flag that determines whether or not the record is the current version)
- PSInsertDT (The datetime the record was inserted into the persistent staging table)
- Checksum (The binary checksum of the record, this will be compared against incoming data from staging to see if a record has changed)
- The persistent staging table must have a primary key on a combination of the natural key(s), sometimes referred to as a business key) and the EffectiveStartDT.
- None of the fields can be a BLOB i.e. ntext, text, image etc. since the binary_checksum function can't handle those data types. There is a work-around that I'll discuss later in this post.
- This is for daily changes. If you have multiple loads a day, with multiple changes in a single day, the record will simply be overwritten without doing a SCD type 2. (Essentially a SCD Type 1).
- Only one version of a record can exist in the staging table
- The staging database and the persistent staging database have to be on the same server
use Source
go
create table Department
(
DepartmentID int,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40)
)
insert into Department
(DepartmentID, DepartmentName, DepartmentLocation, DepartmentUnit)
values
(1, 'Accounts Receivable', 'New York', 'Accounting'),
(2, 'Corporate Ethics', 'Nashville', 'Human Resources'),
(3, 'Help Desk', 'Nashville', 'Information Systems')
go
use Staging
go
create table Department
(
DepartmentID int,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40)
)
use PersistentStaging
go
create table Department
(
DepartmentID int not null,
DepartmentName varchar(25),
DepartmentLocation varchar(45),
DepartmentUnit varchar(40),
EffectiveStartDT datetime not null,
EffectiveExpireDT datetime,
CurrentRowYN char(1),
PSInsertDT datetime,
CheckSum varchar(35),
constraint pk_department primary key (DepartmentID, EffectiveStartDT)
)
go
Now our goal is to get department data from the source database, stage the data in the staging database, then load SCD type 2 changes into our persistent staging database. Our SSIS package to do this will look something like this:
Figure 1. Sequence Container to Persistently Stage Data |
The package will start off by truncating the stage table, to prepare it for receiving fresh data from the source system:Figure 2. Truncate Staging Table |
Figure 3. Load Source Data into Staging |
With the data now in staging we can call our stored procedure that will compare the data in the staging table to the data in the persistent staging table. If a record with the natural key doesn't exist yet, we simply insert a new record into the persistent staging table. If it does, and a change has happened, we expire the previous record and insert the new version of the record into persistent staging. This happens by calling a stored procedure called "PersistRecordFromStaging" and passing a parameter consisting of the table name:
Figure 4. Executing Stored Procedure to Persistently Stage Data |
The stored procedure will dynamically generate this merge statement based on the metadata of the persistent staging table:
BEGIN TRY
BEGIN TRAN
-->Inserts new records, and records that have been updated as a
SCD type 2
SET NOCOUNT
ON;
SET XACT_ABORT
ON
INSERT INTO
PersistentStaging.dbo.Department
(
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit,
EffectiveStartDT,
EffectiveExpireDT,
CurrentRowYN,
PSInsertDT,
CheckSum
)
SELECT
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit,
EffectiveStartDT,
EffectiveExpireDT,
CurrentRowYN,
PSInsertDT,
CheckSum
FROM
(
MERGE into
PersistentStaging.dbo.Department AS target
USING (
SELECT
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit
,binary_checksum(
convert(varchar, DepartmentID ),
DepartmentName,
DepartmentLocation,
DepartmentUnit)
as CheckSum
FROM Staging.dbo.Department with (nolock)) As source
(
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit,
CheckSum
)
ON
(
target.DepartmentID = source.DepartmentID AND 1=1)
-->If change occurs we deactivate the previous record
WHEN MATCHED
and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN='Y' and target.[EffectiveStartDT]<>convert(date,sysdatetime())
THEN
update set
[EffectiveExpireDT] =dateadd(ms,-3,dateadd(day,1,DATEADD(dd, DATEDIFF(dd,0,sysdatetime()), -1))),[CurrentRowYN]='N'
-->If change occurs on same day as previous change, we take the
net
WHEN MATCHED
and target.[Checksum] <> source.[Checksum] and target.CurrentRowYN='Y' and target.[EffectiveStartDT]=convert(date,sysdatetime())
THEN DELETE
-->If this record has never been inserted into this table,
insert
WHEN NOT
MATCHED THEN
INSERT
(
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit,
EffectiveStartDT,
EffectiveExpireDT,
CurrentRowYN,
PSInsertDT,
CheckSum
)
VALUES
(
source.DepartmentID,
source.DepartmentName,
source.DepartmentLocation,
source.DepartmentUnit
,'1/1/1900',
'1/1/4000',
'Y',
'2014-02-27 19:14:15',
source.[Checksum]
)
OUTPUT
source.DepartmentID,
source.DepartmentName,
source.DepartmentLocation,
source.DepartmentUnit
,convert(date,sysdatetime()) as [EffectiveStartDT],
'1/1/4000' as [EffectiveExpireDT],
'Y'as
[CurrentRowYN],
'Feb 27 2014 7:14PM' as [PSInsertDT],
source.[Checksum],
$action as action
)AS
CHANGES
(
DepartmentID,
DepartmentName,
DepartmentLocation,
DepartmentUnit,
EffectiveStartDT,
EffectiveExpireDT,
CurrentRowYN,
PSInsertDT,
CheckSum
,action
)
-->If we have records that have been deleted or updated, we do
an insert
where action='UPDATE' or action='DELETE';
SELECT @UpdateCount=@@ROWCOUNT;
SELECT @InsertCount
= count(*) from PersistentStaging.dbo.Department where '1/1/1900' = [EffectiveStartDT] and
'1/1/4000' = [EffectiveExpireDT] and
[PSInsertDT]='2014-02-27 19:14:15'
COMMIT TRAN
END TRY
BEGIN CATCH
IF XACT_STATE() = -1
ROLLBACK
RAISERROR (N'Error has occurred %s %d.', -- Message text.
10, -- Severity,
1, -- State,
N'number', -- First argument.
5); -- Second argument.
THROW
END CATCH;
The number of updates (records that had an SCD type 2 performed) and the number of inserts (brand new records never seen by the persistent staging table):
Figure 5. Execute SQL Task Result Set |
These can be stored in SSIS variables you create in the package:
Figure 6. SSIS Variables to Store Inserts/Updates |
After we persistently stage our data, we call another execute sql task that will log our insert and updates to an audit table. Lets run the package and see what the data in staging and persistent staging looks like:
select * from Staging.dbo.department
select * from PersistentStaging.dbo.department
Figure 7. Data Preview |
As you can see data came into staging, loaded into persistent staging and set the effective, expiration, current row, insert date and time and recorded the row check sum. Now, lets say we fast forward a day and sometime between this data warehouse load and the last load the Help Desk department moved from Nashville to Cleveland (sorry help desk people).
use Source
update department
set DepartmentLocation='Cleveland'
where DepartmentName='Help Desk'
go
Now, let's run the package again and see what happened to our data:
select * from Staging.dbo.department
select * from PersistentStaging.dbo.department
Figure 8. Data Change |
From looking at the data in persistent staging you can see that the new record for the Help Desk department was entered and the previous record was expired to the end of the previous date. The code for this stored procedure and the user defined function it uses is:
Use PersistentStaging
GO
Create
function [dbo].[udfColumnsForBinaryChecksum] ( @TableName varchar(100))
returns
varchar(8000)
as
-- =========================================================================
-- Author: Jim Ferris http://dennysjymbo.blogspot.com/
-- Create date: 2014-02-28
-- Description: returns a comma seperated list of fields from a table for binary checksum input
-- Example: select dbo.udfColumnsForBinaryChecksum ('Department')
-- =========================================================================
begin
declare @returnValues varchar(8000)=' '
Select
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
Set @returnValues=SUBSTRING( @returnValues,0,len(@returnValues))
return @returnValues;
end
GO
CREATE
PROCEDURE [dbo].[PersistRecordFromStaging]
(
@TableName AS varchar(100))
AS
--> =========================================================================
--> Author: Jim Ferris http://dennysjymbo.blogspot.com/
--> Create date: 2014-02-28
--> Description: Persistently stages data from staging to persistent staging
--> Example: exec dbo.PersistRecordFromStaging 'Department'
--> =========================================================================
--> SET NOCOUNT ON added to prevent extra result sets from interfering with SELECT statements.
SET NOCOUNT ON;
-->Variable declarations
DECLARE @DatabaseNameStage
AS varchar(100) ='Staging',
@DatabaseNamePstage AS
varchar(100) ='PersistentStaging',
@StagingOwner AS varchar(25) = 'dbo',
@PersistentStagingOwner AS varchar(25) = 'dbo',
@LogTableName AS varchar(100),
@SQL_Statement AS nvarchar(max),
@StageTableName AS
varchar(100),
@CurrentDateTime as
datetime = sysdatetime(),
@Fields AS varchar(max)=' ',
@TotalFields as varchar(max) =' ',
@Join as varchar(max) =' ',
@Source as varchar(max) =' ',
@Output as varchar(max) =' ' ,
@Selection as varchar(max) =' ',
@Changes as varchar(max) =' ' ,
@UpdateCount int,
@InsertCount int
-->Initialize variables
SET @LogTableName
= @DatabaseNamePstage
+ '.'+@PersistentStagingOwner+'.' + @TableName
SET @StageTableName
= @DatabaseNameStage
+ '.'+@StagingOwner+'.' + @TableName
BEGIN
TRY
BEGIN TRAN
SET XACT_ABORT
ON
-->Returns a string of all fields in the table
SELECT @TotalFields=@TotalFields + CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'Checksum'
)
SET @TotalFields= @TotalFields +CHAR(10)+ 'EffectiveStartDT,'+ CHAR(10)+'EffectiveExpireDT,'+ CHAR(10)+'CurrentRowYN,'+ CHAR(10)+'PSInsertDT,'+ CHAR(10)+'CheckSum'
-->Returns a string of all fields minus the control fields in a
table
SELECT @Fields
= @Fields +CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'Checksum'
)
SET @Fields= @Fields + CHAR(10)+'Checksum'
-->Returns all the fields in the table minus the control
fields, adding the source roleplaying identifier
SELECT @Source
= @Source +CHAR(10) + ' source.' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Source=SUBSTRING( @Source,0,len(@Source)) + CHAR(10) +',''1/1/1900'','+ CHAR(10) +'''1/1/4000'','+ CHAR(10) +'''Y'','+ CHAR(10) +''''+ convert(varchar,@CurrentDateTime,20)+''','+ CHAR(10) +'source.[Checksum]'+ CHAR(10)
-->Returns all the fields in a table, minus the control fields,
with the addition of the binary checksum
SELECT @Selection
= @Selection +CHAR(10) + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Selection=SUBSTRING( @Selection,0,len(@Selection)) + CHAR(10) + ',binary_checksum('+dbo.udfColumnsForBinaryChecksum(@TableName)+') as CheckSum'
-->Returns all the fields in a table, minus the control fields,
with the addition of the values needed for an insert
SELECT @Output
= @Output +CHAR(10) + ' source.' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Output=SUBSTRING( @Output,0,len(@Output)) + CHAR(10) +',convert(date,sysdatetime())
as [EffectiveStartDT],'+ CHAR(10) +'''1/1/4000'' as [EffectiveExpireDT],'+ CHAR(10) +'''Y''as [CurrentRowYN],'+ CHAR(10) +''''+ convert(varchar,@CurrentDateTime)+''''+ ' as [PSInsertDT],'+ CHAR(10) +'source.[Checksum],'+ CHAR(10) +'$action as action' + CHAR(10)
-->Returns the changes output
SELECT @Changes
= @Changes +CHAR(10) + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
SET @Changes= @Changes +CHAR(10)+ 'EffectiveStartDT,'+ CHAR(10)+'EffectiveExpireDT,'+ CHAR(10)+'CurrentRowYN,'+ CHAR(10)+'PSInsertDT,'+ CHAR(10)+'CheckSum'
-->Returns the join statement for the join between the staging
and the persistent staging tables
SELECT @Join
=@Join + CHAR(10) + ' target.' + ccu.COLUMN_NAME + ' = source.' + ccu.COLUMN_NAME + ' AND'
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
ccu ON tc.CONSTRAINT_NAME = ccu.Constraint_name
JOIN INFORMATION_SCHEMA.COLUMNS c ON ccu.TABLE_NAME = c.TABLE_NAME AND ccu.COLUMN_NAME = c.COLUMN_NAME
WHERE
tc.CONSTRAINT_TYPE =
'Primary Key' and
ccu.COLUMN_NAME <> 'EffectiveStartDT'
and ccu.TABLE_NAME = @TableName
SET @Join
=@Join + ' 1=1)'
-->Begin generating merge statement
Select @SQL_Statement=convert(nvarchar(max), N'')
--+'Declare @UpdateCount int=0, @InsertCount int=0'+ CHAR(10)
+ '-->Inserts
new records, and records that have been updated as a SCD type 2'
+ CHAR(10) + 'SET NOCOUNT ON;'
+ CHAR(10) + 'SET XACT_ABORT ON '
+ CHAR(10) + 'INSERT INTO'
+ CHAR(10) + ' ' + @LogTableName
+ CHAR(10) + '('
+ CHAR(10) + @TotalFields +')'
+ CHAR(10) + 'SELECT '
+ CHAR(10) + @TotalFields
+ CHAR(10) + 'FROM'
+ CHAR(10) + '('
+ CHAR(10) + 'MERGE into '+@LogTableName+' AS target'
+ CHAR(10) + ' USING ('
+ CHAR(10) + 'SELECT'
+ CHAR(10) + @Selection + CHAR(10)
+ CHAR(10) +' FROM '+@StageTableName+' with (nolock)) As source'
+ CHAR(10) + '('
+ CHAR(10) +@Fields
+ CHAR(10) + ')'
+ CHAR(10) + ' ON'
+ CHAR(10) + '('
+ CHAR(10) + @Join
+ CHAR(10) + '-->If change occurs we
deactivate the previous record'
+ CHAR(10) + 'WHEN MATCHED and
target.[Checksum] <> source.[Checksum] and target.CurrentRowYN=''Y'' and
target.[EffectiveStartDT]<>convert(date,sysdatetime())'
+ CHAR(10) + 'THEN'
+ CHAR(10) + 'update set
[EffectiveExpireDT] =dateadd(ms,-3,dateadd(day,1,DATEADD(dd,
DATEDIFF(dd,0,sysdatetime()), -1))),[CurrentRowYN]=''N'''
+ CHAR(10) +'-->If change occurs on
same day as previous change, we take the net'
+ CHAR(10) + 'WHEN MATCHED and
target.[Checksum] <> source.[Checksum] and target.CurrentRowYN=''Y'' and
target.[EffectiveStartDT]=convert(date,sysdatetime())'
+ CHAR(10) + 'THEN DELETE '
+ CHAR(10) + '-->If this record has
never been inserted into this table, insert'
+ CHAR(10) + 'WHEN NOT MATCHED THEN '
+ CHAR(10) + 'INSERT'
+ CHAR(10) + '('
+ CHAR(10) + @TotalFields +')'
+ CHAR(10) + 'VALUES'
+ CHAR(10) + '('
+ CHAR(10) + @Source
+ CHAR(10) + ')'
+ CHAR(10) + 'OUTPUT'
+ CHAR(10) + @Output
+ CHAR(10) +')'
+'AS CHANGES'
+ CHAR(10)
+'('+ CHAR(10)
+ @Changes
+ ',action'+ CHAR(10)
+')'+ CHAR(10)
+'-->If we have records
that have been deleted or updated, we do an insert'+ CHAR(10)
+'where action=''UPDATE''
or action=''DELETE'';'+
CHAR(10)
+ 'SELECT
@UpdateCount=@@ROWCOUNT;'
+ CHAR(10) + 'SELECT @InsertCount =
count(*) from ' + @LogTableName+' where ''1/1/1900'' = [EffectiveStartDT] and ''1/1/4000'' =
[EffectiveExpireDT] and [PSInsertDT]='+''''+convert(varchar,@CurrentDateTime,20)+''''
exec sp_executesql @SQL_Statement,N'@UpdateCount int OUTPUT,
@InsertCount int OUTPUT', @UpdateCount OUTPUT,@InsertCount OUTPUT;
SELECT @UpdateCount
as UpdateCount, @InsertCount as InsertCount
--print @sql_statement
COMMIT
TRAN
END TRY
BEGIN CATCH
IF XACT_STATE() = -1
ROLLBACK
RAISERROR (N'Error has occurred %s %d.', -- Message text.
10, -- Severity,
1, -- State,
N'number', -- First argument.
5); -- Second argument.
THROW
END CATCH;
This stored procedure will dynamically generate the merge statement for any persistent staging table as long as you follow the rules stated in the beginning of this post. This should speed up your dev time on these kinds of loads tremendously. To get around the BLOB issue, you can change the user defined function to something like this:
USE PersistentStaging
go
ALTER function [dbo].[udfColumnsForBinaryChecksum] ( @TableName varchar(100))
returns varchar(8000)
as
begin
declare @returnValues varchar(8000)=' '
Select @returnValues = @returnValues +CHAR(10) + case when character_maximum_length=-1 then 'convert(varchar(4000),left('+ COLUMN_NAME +',4000)) ' else COLUMN_NAME end + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
Set @returnValues=SUBSTRING( @returnValues,0,len(@returnValues))
return @returnValues;
end
Now this will only check the first 4000 characters of the text. You could probably increase its likely hood of catching a change in the blob by concatenating it with a right(4000) too. It all depends on the situation you have and how big the blob is. I wrote another post on how to generate a check sum on Blobs such as this. You could probably alter the dynamic sql to somehow incorporate this and just pull the check sum from the staging table rather than have it generated on the fly in sql.