This solution combines the use of a script component and a stored procedure to handle the inferred member generation. Want to thank my good buddy Ron for his help on this one ;). For this example were going to be loading purchase transactions from a source system with a schema that looks like this:
Figure 1. Source System Schema |
Now in our data warehouse we can represent this source schema in a star schema like this:
Figure 2. Dimensional Schema |
-->Staging Data
create table Purchase
(
PurchaseID int not null,
PurchaseDate datetime,
PersonID int,
DepartmentID int,
DepartmentLocID int,
ProductID int,
ProductTypeID int,
ProductColor varchar(10),
PurchaseValue money,
constraint pk_Purchase primary key (PurchaseID)
)
go
insert into Purchase(PurchaseID, PurchaseDate, PersonID, DepartmentID, DepartmentLocID, ProductID, ProductTypeID, ProductColor, PurchaseValue)
values (1, '1/1/2014', 2, 1, 1, 1, 2, 'Green', 22.00),
(2, '1/2/2014', 1, 2, 1, 4, 4, 'Black', 23.00), -->Product Inferred Member
(3, '1/3/2014', 3, 2, 2, 3, 4, 'Blue', 101.00), -->Person Inferred Member
(4, '1/4/2014', 2, 3, 1, 1, 2, 'Red', 123.00), -->Department Inferred Member
(5, '1/6/2014', 4, 3, 1, 1, 2, 'Orange', 33.00), -->Person, Product and Department Inferred Members
(6, '1/6/2014', null, 1, 1, 1, 2, 'Green', 80.00) -->No person associated with purchase
go
As you can see, I already marked the records that contain our inferred members, as well as an example of one record with a null natural key in our transaction. In our dimensions we'll create N/A records for when this occurs. Our dimensional schema can be created using this script:
-->Dimensional Data
create table DimPerson
(
PersonKey int identity(0,1) not null,
PersonID int,
PersonName varchar(10),
InferredMemberYN char(1),
constraint pk_DimPerson primary key (PersonKey)
)
go
insert into DimPerson (PersonID, PersonName, InferredMemberYN)
values
(0, 'N/A', 'Y'),
(1, 'Person1','N'),
(2, 'Person2', 'N')
go
create table DimDepartment
(
DepartmentKey int identity(0,1) not null,
DepartmentID int,
DepartmentLocID int,
DepartmentName varchar(25),
InferredMemberYN char(1),
constraint pk_DimDepartment primary Key (DepartmentKey)
)
go
insert into DimDepartment(DepartmentID, DepartmentLocID, DepartmentName, InferredMemberYN)
values
(0, 0, 'N/A', 'Y'),
(1, 1, 'Department11','N'),
(1, 2,'Department12','N'),
(2, 1,'Department21','N'),
(2, 2,'Department22','N')
go
create table DimProduct
(
ProductKey int identity(0,1) not null,
ProductID int,
ProductTypeID int,
ProductColor varchar(10),
ProductName varchar(20),
InferredMemberYN char(1),
constraint pk_DimProduct primary Key (ProductKey)
)
go
insert into DimProduct(ProductID, ProductTypeID, ProductColor, ProductName, InferredMemberYN)
values
(0, 0, 'N/A', 'N/A', 'Y'),
(1, 2, 'Red', 'ProductRed', 'N'),
(1, 2, 'Green', 'ProductGreen', 'N'),
(3, 4, 'Blue', 'ProductBlue', 'N')
go
Create table DimDay
(
DayKey int not null,
DayDate datetime,
constraint pk_DimDay primary key (DayKey)
)
go
insert into DimDay (DayKey, DayDate)
values
(0, '1/1/1900')
(1, '1/1/2014'),
(0, '1/1/1900')
(1, '1/1/2014'),
(2, '1/2/2014'),
(3, '1/3/2014'),
(4, '1/4/2014'),
(5, '1/5/2014'),
(6, '1/6/2014')
go
create table FctPurchase
(
PurchasePersonKey int,
PurchaseDepartmentKey int,
DayKey int,
PurchaseProductKey int,
PurchaseID int,
PurchaseValue money,
constraint pk_FctPurchase primary key (PurchaseID)
)
go
One part of this solution is a stored procedure that utilizes a type in our database. This type will be used to hold natural key values and names passed from SSIS in the data flow:
-->Type to hold our natural key values
create type NaturalKeys
as table
(
NaturalKeyName varchar(100),
NaturalKeyValue varchar(100)
);
go
-->Stored proc to handle inferred members
create procedure [dbo].[LoadInferredMember]
@DimensionName varchar(100), @SurrogateKey varchar(100), @NaturalKeys as NaturalKeys READONLY
as
begin try
begin transaction
set xact_abort on
declare
@ReturnSurrogateKey bigint = null,
@SqlStatement nvarchar(max),
@NaturalKeysStatement varchar(max) =' ',
@InsertStatement nvarchar(max)=' ',
@Values varchar(max)=' ';
-->If any of our natural keys are null we select the N/A record to return
if exists(select 1 from @NaturalKeys where NaturalKeyValue is null)
begin
set @ReturnSurrogateKey= 0
end
else
begin
-->Generates Select statement
select @NaturalKeysStatement = @NaturalKeysStatement+' '+NaturalKeyName+'='''+NaturalKeyValue +''' and'
from @NaturalKeys
set @NaturalKeysStatement=SUBSTRING( @NaturalKeysStatement,0,len(@NaturalKeysStatement)-2)
-->Generates the insert into statement
select @InsertStatement = @InsertStatement+NaturalKeyName+','
from @NaturalKeys
set @InsertStatement = 'insert into '+@DimensionName +' ('+@InsertStatement+'InferredMemberYN) values'
-->Generates the Values statement
select @Values = @Values+' '''+NaturalKeyValue+''' ,'
from @NaturalKeys
set @Values= '('+@Values+'''Y''),'
set @Values=SUBSTRING( @Values,0,len(@Values))
set @InsertStatement=@InsertStatement+' '+@Values+' Select @ReturnSurrogateKey = SCOPE_IDENTITY()'
-->Check to see if record actually exists
set @SqlStatement = 'Select @ReturnSurrogateKey='+ @SurrogateKey+' from '+@DimensionName +' where '+@NaturalKeysStatement
exec sp_executesql @SqlStatement,N'@ReturnSurrogateKey bigint OUTPUT', @ReturnSurrogateKey OUTPUT;
if @ReturnSurrogateKey is null
begin
exec sp_executesql @InsertStatement,N'@ReturnSurrogateKey bigint OUTPUT', @ReturnSurrogateKey OUTPUT;
end
end
commit
return @ReturnSurrogateKey
end try
begin catch
rollback transaction
raiserror (N'Error has occurred %s %d.', -- Message text.
10, -- Severity,
1, -- State,
N'number', -- First argument.
5); -- Second argument.
throw
end catch;
go
This stored procedure will first check to see if a record already exists for the natural key passed it in the dimension and return the surrogate key. If it doesn't it will insert the natural key(s), set the InferredMemberYN column to Y, and return the generated surrogate key. In your dimension tables make sure that any other columns that are set to not null have a default value set for them or the insert will naturally fail.
The data flow for our example is going to look like this:
Figure 3. Data Flow Task |
This data flow needs 2 variables to work:
Figure 4. SSIS Variables |
InferredColumnPrefix is going to be the column prefix were using for our inferred member metadata (explained later in this post). InferredSurrogateKeyValue is the temporary value we are going to assign to a surrogate key column in the data flow for records that do not yet exist in the dimension.
The package makes use of 2 connection managers. An ole/db and an ado.net:
Figure 5. Connection Managers |
I'm using the ole/db one for my source and destination components. But of course you can use any one you want. The only reason why I'm showing this is to stress the creation of the ado.net one that points to the database your dimensions are located. This data source is required by the script component.
In our ole/db source, our source query is going to look like this:
-->SSIS Query for FctPurchase
select
coalesce(dper.PersonKey, -1) as PurchasePersonKey
,coalesce(ddep.DepartmentKey, -1) as PurchaseDepartmentKey
,coalesce(pr.ProductKey, -1) as ProductKey
,dday.DayKey
,pu.PurchaseID
,pu.PurchaseValue
,pu.DepartmentID as PurchaseDepartmentID -->Natural Key (Department)
,pu.DepartmentLocID as PurchaseDepartmentLocID -->Natural Key (Department)
,pu.PersonID as PurchasePersonID -->Natural Key (Person)
,pu.ProductID -->Natural Key (Product)
,pu.ProductTypeID -->Natural Key (Product)
,pu.ProductColor -->Natural Key (Product)
from Purchase pu
join DimDay dday on pu.PurchaseDate=dday.DayDate
left join DimPerson dper on dper.PersonID=pu.PersonID
left join DimDepartment ddep on ddep.DepartmentID=pu.DepartmentID
and ddep.DepartmentLocID=pu.DepartmentLocID
left join DimProduct pr on pu.ProductID=pr.ProductID
and pu.ProductTypeID=pr.ProductTypeID
and pu.ProductColor=pr.ProductColor
In our derived column component we are going to enter some metadata that will aid our script component in its task to create inferred members. We need to tell our script component the dimension the keys belong to, what name the keys are in the data flow and what names they are in the dimension. We enter them in this fashion:
Figure 6. Derived Component |
You'll notice that each column we add ties to a surrogate key in the data flow prefixed with IFX. This has to match the value in our InferredColumnPrefix variable. The expression has 3 main sections delimited by pipes. The first section has the name of the dimension. The second section has a comma delimited mapping between the name of the surrogate key in the data flow to the name of the surrogate key in the dimension. The third section has a comma delimited mapping between the name of the natural key in the data flow to the name of the natural key in the dimension. If you have composite keys you can separate each set of them using a ^. This image may help to visualize this more:
Figure 7. Metadata Mapping |
With this done, we can now start building our script component. Drag a script component onto the design surface. When prompted to pick a type, choose transformation:
Figure 8. Script Component Type |
On the Script screen make sure to add our 2 variables as ReadOnlyVariables:
Figure 9. Script Component Script Screen |
On the Input Columns screen, make sure to check all of the surrogate key, natural key and metadata columns in the data flow:
Figure 10. Script Component Input Columns Screen |
On the connection manager screen make sure to add our ado.net connection and name it DbConnection:
Save, build and were ready to run the package. I set up data viewers so we can see what's going on. You can see before we hit the script component we have -1 values for rows that were not found in the dimension:
After we run this data through our script component:
The natural keys have been added to each dimension table and the InferredMemberYN column has been set to Y. The name columns have all null values. These could have been set with default values like "Unknown" or something. But its up to each individual implementation on how to handle this. What's good about this implementation is that you will get an actual surrogate key to insert into your fact rather than a pointer to a dummy record. When the actual record for the dimension comes in to staging, it will simply update the existing inferred member we created with actual data and set InferredMemberYN to N. Since we used a real record instead of a dummy one we won't need to go back and update the fact record once this occurs.
Figure 11. Script Component Connection Managers Screen |
With the configurations out of the way, we're ready to start coding. Go back to the Script screen, click on the Edit Script button and paste the following code into main.cs:
#region Help: Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
* a .Net application within the context of an Integration Services data flow.
*
* Expand the other regions which have "Help" prefixes for examples of specific ways to use
* Integration Services features within this script component. */
#endregion
#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Runtime;
using System.Reflection;
using System.Data.SqlClient;
using System.Collections.Generic;
#endregion
/// <summary>
/// This is the class to which to add your code. Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
private PipelineBuffer _inputBuffer;
IDTSConnectionManager100 _connMgr;
IList<string> _propertyList;
int[] _BufferColumnIndexes;
/// <summary>
/// Called at run time when a PipelineBuffer from an upstream component
/// is available to the component to let the component process the incoming rows.
/// </summary>
/// <param name="InputID">The ID of the input of the component.</param>
/// <param name="Buffer">The PipelineBuffer object.</param>
public override void ProcessInput(int InputID, Microsoft.SqlServer.Dts.Pipeline.PipelineBuffer Buffer)
{
_inputBuffer = Buffer;
_BufferColumnIndexes = GetColumnIndexes(InputID);
base.ProcessInput(InputID, Buffer);
}
/// <summary>
///
/// </summary>
/// <param name="Buffer">The script buffer</param>
public override void Input0_ProcessInput(Input0Buffer Buffer)
{
try
{
//Gets a list of index and name of columns in the script buffer
if (_propertyList == null)
{
_propertyList = new List<string>();
string inputBufferName = Buffer.GetType().Name;
var properties = (typeof(Input0Buffer)).GetProperties();
foreach (var property in properties)
{
if (!property.Name.EndsWith("_IsNull"))
_propertyList.Add(property.Name);
}
}
base.Input0_ProcessInput(Buffer);
}
catch (Exception e)
{
//Send error message to the UI
FailComponent(e.ToString());
//Comment out throw to not fail the package but still report to ui
throw;
}
}
/// <summary>
/// This method is called once for every row that passes through the component from Input0.
///
/// Example of reading a value from a column in the the row:
/// string zipCode = Row.ZipCode
///
/// Example of writing a value to a column in the row:
/// Row.ZipCode = zipCode
/// </summary>
/// <param name="inputBufferRow">The row that is currently passing through the component</param>
public override void Input0_ProcessInputRow(Input0Buffer inputBufferRow)
{
//Loops through each column
foreach (IDTSInputColumn100 column in this.ComponentMetaData.InputCollection[0].InputColumnCollection)
{
//Determine if the column is a meta data column for the surrogate keys
if (column.Name.StartsWith(Variables.InferredColumnPrefix))
{
//Get info about inferred column
PropertyInfo columnValue = inputBufferRow.GetType().GetProperty(column.Name);
//Parse out each element of the column
string[] mainComponent = columnValue.GetValue(inputBufferRow, null).ToString().Split('|');
//Parse out the name of the keys
string[] keyName = mainComponent[1].Split(',');
//Parse out the natural keys
string[] naturalKeys = mainComponent[2].ToString().Trim().Split('^');
//Parse out the name of the dimension
string dimName = mainComponent[0].ToString();
//Get column info through reflection
PropertyInfo keyColumnValue = inputBufferRow.GetType().GetProperty(keyName[0]);
///If the surrogate key = the InferredSurrogateKeyValue ssis variable
if ((int)keyColumnValue.GetValue(inputBufferRow, null) == Variables.InferredSurrogateKeyValue)
{
//Create data table that will be sent to the stored proc
DataTable dt = GetNaturalKeyTable(naturalKeys, inputBufferRow);
//Set index variable to index of the surrogate key in the input pipeline buffer5
int index = (_propertyList.IndexOf(keyColumnValue.Name));
//Get surrogate key from database
int sk = GetSurrogateKey(dimName, keyName[1], dt);
//Set surrogate key to returned value, replacing InferredSurrogateKeyValue
_inputBuffer.SetInt32(_BufferColumnIndexes[index], sk);
}
}
}
}
/// <summary>
/// Returns a surrogate key from the database to replace the inferred surrogate key in the buffer
/// </summary>
/// <param name="dimName">The name of the dimension the inferred surrogate key references</param>
/// <param name="keyName">The name of the surrogate key in the dimension</param>
/// <param name="dt">The data table containing the natural key names and values to be inserted into the dimension</param>
/// <returns>The surrogate key to replace the inferred key</returns>
private int GetSurrogateKey(string dimName, string keyName, DataTable dt)
{
//Set up connections for the database call
_connMgr = this.Connections.DbConnection;
SqlConnection sqlConn = (SqlConnection)_connMgr.AcquireConnection(null);
//Call stored procedure to insert our inferred member and return the surrogate key
SqlCommand cmd = new SqlCommand("LoadInferredMember", sqlConn);
cmd.CommandType = CommandType.StoredProcedure;
SqlParameter paramDimName = cmd.Parameters.AddWithValue("@DimensionName", dimName);
paramDimName.SqlDbType = SqlDbType.VarChar;
SqlParameter paramSurrogateKey = cmd.Parameters.AddWithValue("@SurrogateKey", keyName);
paramSurrogateKey.SqlDbType = SqlDbType.VarChar;
SqlParameter paramNaturalKeys = cmd.Parameters.AddWithValue("@NaturalKeys", dt);
paramNaturalKeys.SqlDbType = SqlDbType.Structured;
var returnParameter = cmd.Parameters.Add("@ReturnSurrogateKey", SqlDbType.BigInt);
returnParameter.Direction = ParameterDirection.ReturnValue;
//Execute query
cmd.ExecuteNonQuery();
//Close database connection
sqlConn.Close();
return (int)returnParameter.Value;
}
/// <summary>
/// Returns a data table containing natural keys parsed from a string array
/// and values pulled from the input buffer row
/// </summary>
/// <param name="naturalKeys">An array that contains the natural key mappings from the input buffer to the dimension table</param>
/// <param name="inputBufferRow">The input row buffer that contains the natural key values</param>
/// <returns>A data table containing the parsed out natural key names with their values</returns>
private DataTable GetNaturalKeyTable(string[] naturalKeys, Input0Buffer inputBufferRow)
{
//Create data table that will be sent to the stored proc
DataTable dt = new DataTable();
//Data column to hold the natural key names
DataColumn NaturalKeyName = new DataColumn();
NaturalKeyName.ColumnName = "NaturalKeyName";
NaturalKeyName.DataType = System.Type.GetType("System.String");
//Data column to hold the natural key values
DataColumn NaturalKeyValue = new DataColumn();
NaturalKeyValue.ColumnName = "NaturalKeyValue";
NaturalKeyValue.DataType = System.Type.GetType("System.String");
//Add the columns to the data table
dt.Columns.Add(NaturalKeyName);
dt.Columns.Add(NaturalKeyValue);
//Row object to hold the columns
DataRow row;
//Insert natural keys into data table
foreach (string nat in naturalKeys)
{
row = dt.NewRow();
//Parse out each natural key individually
string[] naturalKey = nat.Trim().Split(',');
//Get natural key values
PropertyInfo naturalColumnValue = inputBufferRow.GetType().GetProperty(naturalKey[0]);
//Load natural key name into data table
row["NaturalKeyName"] = naturalKey[1].ToString();
//If natural key value is null then send null to stored proc
if (_inputBuffer.IsNull(_BufferColumnIndexes[(_propertyList.IndexOf(naturalColumnValue.Name))]) == true)
{
row["NaturalKeyValue"] = null;
}
//Else send the real value
else
{
row["NaturalKeyValue"] = naturalColumnValue.GetValue(inputBufferRow, null).ToString();
}
//Add row to the data table
dt.Rows.Add(row);
}
return dt;
}
/// <summary>
/// Outputs an error message
/// </summary>
/// <param name="errorMsg">The exception message</param>
private void FailComponent(string errorMsg)
{
bool fail = false;
IDTSComponentMetaData100 compMetadata = this.ComponentMetaData;
compMetadata.FireError(1, "Error handling inferred members!", errorMsg, "", 0, out fail);
}
}
If you don't want to use the -1 value (or any numerical value) and just want to have the surrogate keys come in null in the data flow, to be replaced, just replace this line of code:
with this line of code:
//If the surrogate key = the InferredSurrogateKeyValue ssis variable
if ((int)keyColumnValue.GetValue(inputBufferRow, null) == Variables.InferredSurrogateKeyValue)
with this line of code:
//If the surrogate key is null
if (_inputBuffer.IsNull(_BufferColumnIndexes[(_propertyList.IndexOf(keyColumnValue.Name))])==true)
This might make dealing with the non existing records easier, especially if you're using lookup transformations to get surrogate keys from dimensions instead of in the ole/db source query. If you go this route you no longer need to use the InferredSurrogateKeyValue SSIS variable.Figure 12. Data Before Inferred Members are Added |
You can see that the -1 values were replaced with surrogate keys returned from the database. If we look at our dimensions we can see that data has been added and matches what we see in the data flow:
select * from DimDepartment
select * from DimPerson
select * from DimProduct
Figure 14. Rows in Dimensions |
The natural keys have been added to each dimension table and the InferredMemberYN column has been set to Y. The name columns have all null values. These could have been set with default values like "Unknown" or something. But its up to each individual implementation on how to handle this. What's good about this implementation is that you will get an actual surrogate key to insert into your fact rather than a pointer to a dummy record. When the actual record for the dimension comes in to staging, it will simply update the existing inferred member we created with actual data and set InferredMemberYN to N. Since we used a real record instead of a dummy one we won't need to go back and update the fact record once this occurs.
Hi,
ReplyDeleteI have few questions on SSIS Performance tunning.
Suppose My system has 8GB RAM and I have installed SQL Server 2012 along with DataTools.
SQL Server uses 7GB memory. Will SSIS uses the 7GB memory or remaining 1 GB memory for buffers?
The 1 GB, SSIS does not share memory space with SQL Server.
Delete