In this post I'm going to be building upon the persistent staging post I made, and apply those concepts to set persistent staging records as inactive when they get deleted in the source system. Few caveats to make this work:
- The staging load has to be a full load every time. This way we can look at what we have in persistent staging, compare to what is in staging and determine what is missing. You cannot have multiple versions of the same record in staging at the same time.
- The stage table and the persistent stage table need to have the same name
- The stage table and the persistent stage table need to have the same fields, with same names, except for the control fields in the persistent staging table. These control fields include:
- EffectiveStartDT (The datetime the record was valid from, defaults to 1/1/900)
- EffectiveExpireDT (The datetime the record was valid to, defaults to 1/1/4000)
- CurrentRowYN (A Y or N flag that determines whether or not the record is the current version)
- PSInsertDT (The datetime the record was inserted into the persistent staging table)
- Checksum (The binary checksum of the record, this will be compared against incoming data from staging to see if a record has changed)
- The persistent staging table must have a primary key on a combination of the natural key(s), sometimes referred to as a business key) and the EffectiveStartDT.
- None of the fields can be a BLOB i.e. ntext, text, image etc. since the binary_checksum function can't handle those data types.
- This is for daily changes. If you have multiple loads a day, with multiple changes in a single day, the record will simply be overwritten without doing a SCD type 2. (Essentially a SCD Type 1). So if it gets deleted and re-added in the same day the inactivate date record will be overwritten.
- The staging database and the persistent staging database have to be on the same server
- Both the persistent staging table and the staging table both need to have SourceInactiveDT field.
To dynamically generate the merge statement we're going to create a stored procedure that will read the metadata of the table, and auto generate the code for us. We can include this in the SSIS package we created for the loading of the persistent staging table:
Figure 1. Data Flow Task |
We call the stored procedure and pass as a parameter the name of the table we are staging. In this example the table Department:
Figure 2. Execute SQL Task |
Now lets prep the data for the inactivation. If we use the data from the last post our data set looks like this:
select * from [Source].[dbo].[Department]
Figure 3. Source System Data |
Figure 4. Persistent Staging Data |
delete from [Source].[dbo].[Department] where DepartmentName='Help Desk'
We also need to make sure we add the SourceInactiveDT to both tables:
alter table
[Staging].[dbo].[Department]
add SourceInactiveDT datetime
go
alter table
[PersistentStaging].[dbo].[Department]
add SourceInactiveDT datetime
go
Now lets run the package and see what happens in the persistent staging table:
Here we can see that the Help Desk record received a SourceInactiveDT when the stored procedure detected it wasn't present in the staging table.
Here is the code for the stored procedure to inactivate the record:
Use PersistentStaging
GO
IF OBJECT_ID('dbo.udfColumnsForBinaryChecksum') IS NOT NULL
begin
drop function
[dbo].[udfColumnsForBinaryChecksum]
end
go
Create
function [dbo].[udfColumnsForBinaryChecksum]
( @TableName
varchar(100))
returns
varchar(8000)
as
--
=========================================================================
-- Author: Jim Ferris http://dennysjymbo.blogspot.com/
-- Create date: 2014-02-28
-- Description: returns a comma seperated list of fields from a
table for binary checksum input
-- Example: select dbo.udfColumnsForBinaryChecksum ('Department')
-- =========================================================================
begin
declare @returnValues
varchar(8000)=' '
Select
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME = @TableName
AND COLUMN_NAME
NOT IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum'
)
Set @returnValues=SUBSTRING( @returnValues,0,len(@returnValues))
return @returnValues;
end
go
CREATE PROCEDURE
[dbo].[InactivateRecordFromStaging]
(@TableName
AS varchar(100))
AS
-->
=========================================================================
--> Author: Jim
Ferris http://dennysjymbo.blogspot.com/
--> Create date:
2014-02-27
--> Description: Checks
records in staging, compares to persistent staging, then
--> sets
the inactive date for records in persistent staging
--> that
don't exist in staging. Used only for staging that
-->
involves FULL loads and doesn't get a SourceInactivateDT
--> from the source system
--> Example: exec
dbo.InactivateRecordFromStaging 'Department'
-->
=========================================================================
-->SET NOCOUNT ON added to prevent extra result sets from
interfering with SELECT statements.
SET
NOCOUNT ON;
-->Variable declarations
DECLARE
@DatabaseNameStage AS
varchar(100) ='Staging',
@DatabaseNamePstage AS
varchar(100) ='PersistentStaging',
@StagingOwner AS varchar(25) = 'dbo',
@PersistentStagingOwner AS
varchar(25) = 'dbo',
@LogTableName
AS varchar(100),
@SQL_Statement
AS nvarchar(max),
@StageTableName
AS varchar(100),
@CurrentDateTime
as datetime = sysdatetime(),
@Fields
AS varchar(max)=' ',
@TotalFields
as varchar(max) =' ',
@Join
as varchar(max) =' ',
@Source
as varchar(max) =' ',
@Output
as varchar(max) =' ' ,
@BOutput
as varchar(max) =' ' ,
@Selection
as varchar(max) =' ',
@UpdateCount
int,
@Primarykey
varchar(250)=''
-->Initialize variables
SET
@LogTableName =
@DatabaseNamePstage +
'.'+@PersistentStagingOwner+'.' + @TableName
SET
@StageTableName =
@DatabaseNameStage +
'.'+@StagingOwner+'.' + @TableName
BEGIN TRY
BEGIN TRAN
SET XACT_ABORT ON
-->Returns a string of all fields in the table, minus the
@Fields
SELECT
@TotalFields=@TotalFields + CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME
= @TableName
AND
COLUMN_NAME NOT
IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT' ,
'Checksum',
'SourceInactiveDT'
)
SET
@TotalFields= @TotalFields + CHAR(10)+'SourceInactiveDT,'+ CHAR(10)+ 'EffectiveStartDT,'+ CHAR(10)+'EffectiveExpireDT,'+ CHAR(10)+'CurrentRowYN,'+ CHAR(10)+'PSInsertDT,'+ CHAR(10)+'CheckSum'
-->Returns a string of all fields minus the control fields in a
table, minus the SourceInactiveDT
SELECT
@Fields = @Fields +CHAR(10) + ' ' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME
= @TableName
AND
COLUMN_NAME NOT
IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT' ,
'SourceInactiveDT'
)
SET
@Fields=SUBSTRING( @Fields,0,len(@Fields)) + CHAR(10)
-->Returns all the fields in the table minus the control
fields, adding the source roleplaying identifier
SELECT
@Source = @Source +CHAR(10) + ' source.' + COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME
= @TableName
AND
COLUMN_NAME NOT
IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum' ,
'SourceInactiveDT'
)
SET
@Source=SUBSTRING( @Source,0,len(@Source))+ CHAR(10) +'SourceInactiveDT,' + CHAR(10) +',''1/1/1900'','+ CHAR(10) +'''1/1/4000'','+ CHAR(10) +'''Y'','+ CHAR(10) +''''+ convert(varchar,@CurrentDateTime,20)+''','+ CHAR(10) +'source.[Checksum]'+ CHAR(10)
-->Returns all fields for output binary checksum
SELECT
@BOutput = @BOutput +CHAR(10) + 'isnull( inserted.' + COLUMN_NAME + ',deleted.'+COLUMN_NAME +') ,'
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME
= @TableName
AND
COLUMN_NAME NOT
IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum' ,
'SourceInactiveDT'
)
SET
@BOutput=SUBSTRING( @BOutput,0,len(@BOutput)) + CHAR(10)
-->Returns all the fields in a table, minus the control fields,
with the addition of the values needed for an insert
SELECT
@Output = @Output +CHAR(10) + 'isnull( inserted.' + COLUMN_NAME + ',deleted.'+COLUMN_NAME +') as '+COLUMN_NAME+' ,'
FROM
INFORMATION_SCHEMA.COLUMNS
WHERE
TABLE_NAME
= @TableName
AND
COLUMN_NAME NOT
IN
(
'EffectiveStartDT',
'EffectiveExpireDT',
'CurrentRowYN',
'PSInsertDT',
'CheckSum' ,
'SourceInactiveDT'
)
SET
@Output=SUBSTRING( @Output,0,len(@Output)) + CHAR(10) +',convert(date,sysdatetime())
as SourceInactiveDT,'+ CHAR(10) +'convert(date,sysdatetime()) as [EffectiveStartDT],'+ CHAR(10) +'''1/1/4000'' as
[EffectiveExpireDT],'+ CHAR(10) +'''Y''as [CurrentRowYN],'+
CHAR(10) +''''+ convert(varchar,@CurrentDateTime)+''''+ ' as [PSInsertDT],'+ CHAR(10) +'binary_checksum('+SUBSTRING( @BOutput,0,len(@BOutput))+ CHAR(10) +','''+convert(varchar,@CurrentDateTime)+''') as CheckSum'+ CHAR(10)
-->Returns the join statement for the join between the staging
and the persistent staging tables
SELECT @Join =@Join + CHAR(10) + ' target.' + ccu.COLUMN_NAME + ' = source.' + ccu.COLUMN_NAME + ' AND'
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN
INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
ON tc.CONSTRAINT_NAME = ccu.Constraint_name
JOIN
INFORMATION_SCHEMA.COLUMNS c ON ccu.TABLE_NAME = c.TABLE_NAME AND ccu.COLUMN_NAME = c.COLUMN_NAME
WHERE
tc.CONSTRAINT_TYPE = 'Primary Key' and ccu.COLUMN_NAME <> 'EffectiveStartDT'
and
ccu.TABLE_NAME = @TableName
SET
@Join =@Join + ' 1=1'
-->Returns the primary key of the persistent staging database
table seperated by commas
SELECT @Primarykey =@Primarykey +ccu.COLUMN_NAME + ','
FROM
INFORMATION_SCHEMA.TABLE_CONSTRAINTS tc
JOIN
INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE ccu
ON tc.CONSTRAINT_NAME = ccu.Constraint_name
JOIN
INFORMATION_SCHEMA.COLUMNS c ON ccu.TABLE_NAME = c.TABLE_NAME AND ccu.COLUMN_NAME = c.COLUMN_NAME
WHERE
tc.CONSTRAINT_TYPE = 'Primary Key' and ccu.COLUMN_NAME <> 'EffectiveStartDT'
and
ccu.TABLE_NAME = @TableName
SET @Primarykey=SUBSTRING( @Primarykey,0,len(@Primarykey)) + CHAR(10)
-->Begin generating merge statement
Select
@SQL_Statement=convert(nvarchar(max), N'')
+ CHAR(10) + '-->Inserts an inactive record SCD type 2'
+ CHAR(10) + 'SET NOCOUNT ON;'
+ CHAR(10) + 'SET XACT_ABORT ON '
+ CHAR(10) + 'Declare @counter int=0'
+ CHAR(10) + 'select @counter=count(*) from ' +@StageTableName
+ CHAR(10) + '-->Temp table to hold deleted primary keys'
+ CHAR(10) + '-->If we have records in staging we compare persistent
staging to staging on natural keys'
+ CHAR(10) + 'if @counter>0'
+ CHAR(10) + 'INSERT INTO'
+ CHAR(10) + ' ' + @LogTableName
+ CHAR(10) + '('
+ CHAR(10) + @TotalFields +')'
+ CHAR(10) + 'SELECT '
+ CHAR(10) + @TotalFields
+ CHAR(10) + 'FROM'
+ CHAR(10) + '('
+ CHAR(10) + 'MERGE '+@LogTableName+' AS target'
+ CHAR(10) + ' USING ('
+ CHAR(10) + 'SELECT'
+ CHAR(10) + @Primarykey + CHAR(10)
+ CHAR(10) +' FROM '+@StageTableName+' with (nolock)) As source'
+ CHAR(10) + '('
+ CHAR(10) + @Primarykey
+ CHAR(10) + ')'
+ CHAR(10) + ' ON'
+ CHAR(10) + '('
+ CHAR(10) + @Join +')'
+ CHAR(10) + '-->If delete in source occurs we deactivate the previous
record'
+ CHAR(10) + 'WHEN NOT MATCHED BY SOURCE and target.CurrentRowYN=''Y'' and
target.SourceInactiveDT is null and [EffectiveStartDT] <>
convert(date,sysdatetime()) THEN'
+ CHAR(10) + 'update set
[EffectiveExpireDT] =dateadd(ms,-3,dateadd(day,1,DATEADD(dd,
DATEDIFF(dd,0,sysdatetime()), -1))),[CurrentRowYN]=''N'''
+ CHAR(10) + '-->If delete occurs and its on the same day as previous
non-delete change, we update current record'
+ CHAR(10) + 'WHEN NOT MATCHED BY SOURCE and target.CurrentRowYN=''Y'' and
target.SourceInactiveDT is null and
[EffectiveStartDT]=convert(date,sysdatetime())THEN'
+ CHAR(10) + 'delete'
+ CHAR(10) + '-->Output updated records'
+ CHAR(10) + 'output '+@Output
+ CHAR(10) + ') as data ( '
+ CHAR(10) + @TotalFields
+ CHAR(10) + ');'
+ CHAR(10) + 'SELECT @UpdateCount=@@ROWCOUNT;'
-->Run dynamic SQL and return the number of updated/inserted
records
EXEC
sp_executesql @SQL_Statement,N'@UpdateCount int OUTPUT',
@UpdateCount OUTPUT;
SELECT
@UpdateCount as
DeleteCount
--print
@sql_statement
COMMIT TRAN
END TRY
BEGIN CATCH
IF XACT_STATE() = -1
ROLLBACK
--SELECT error_number() As ErrorNumber,
--error_line()
As ErrorLine,
--convert(varchar(12),
error_line()) + ', with error number ' + convert(varchar(12), error_number()) +
': ' + ERROR_MESSAGE()
RAISERROR (N'Error has occurred %s %d.', -- Message text.
10, -- Severity,
1, -- State,
N'number', -- First argument.
5); -- Second argument.
THROW
END CATCH;
No comments:
Post a Comment