Saturday, April 5, 2014

A Script Component Solution for Inferred Members in SSIS 2012

     One issue, that seems pretty pervasive in the data warehousing community, deals with what Kimball refers to as early arriving facts. These are transaction records from your source system that have arrived in your data warehouse that contain natural/business key pointers to dimension records that have not yet arrived. When processing your fact table you would want to join the natural keys in the transaction to the dimension that relates to that natural key i.e. PersonID exists in the Person dimension, etc. However; when you don't have that particular identifier, yet in the dimension, you have to create what is known as an inferred member. An inferred member is basically either a dummy record pointer used as a place holder until the real dimension record arrives, or a record in your dimension that is created on the fly that contains the business key(s) that came in from the transaction and possibly some default values. In this post I'm going to demonstrate how you can generate these values on the fly and return them to your data flow so they can be placed in your fact table rather than a pointer to some dummy record.

     This solution combines the use of a script component and a stored procedure to handle the inferred member generation. Want to thank my good buddy Ron for his help on this one ;). For this example were going to be loading purchase transactions from a source system with a schema that looks like this: 


Figure 1. Source System Schema

This represents the action of purchasing a product. We have entities that describe the person purchasing, the product bought and the department the product was purchased from. You'll also notice that some of these entities have composite keys. This is to demonstrate that this solution can handle these as well. 

     Now in our data warehouse we can represent this source schema in a star schema like this:


Figure 2. Dimensional Schema
     In our staging area our purchase data is going to contain this:

-->Staging Data
 create table Purchase
 (
 PurchaseID int not null,
 PurchaseDate datetime,
 PersonID int,
 DepartmentID int,
 DepartmentLocID int,
 ProductID int,
 ProductTypeID int,
 ProductColor varchar(10),
 PurchaseValue money,
 constraint pk_Purchase primary key (PurchaseID)
 )
 go

 insert into Purchase(PurchaseID, PurchaseDate, PersonID, DepartmentID, DepartmentLocID, ProductID, ProductTypeID, ProductColor, PurchaseValue)
 values (1, '1/1/2014', 2, 1, 1, 1, 2, 'Green', 22.00),
 (2, '1/2/2014', 1, 2, 1, 4, 4, 'Black', 23.00), -->Product Inferred Member
 (3, '1/3/2014', 3, 2, 2, 3, 4, 'Blue', 101.00),  -->Person Inferred Member
 (4, '1/4/2014', 2, 3, 1, 1, 2, 'Red', 123.00),  -->Department Inferred Member
 (5, '1/6/2014', 4, 3, 1, 1, 2, 'Orange', 33.00),    -->Person, Product and Department Inferred Members
 (6, '1/6/2014', null, 1, 1, 1, 2, 'Green', 80.00)  -->No person associated with purchase

go

As you can see, I already marked the records that contain our inferred members, as well as an example of one record with a null natural key in our transaction. In our dimensions we'll create N/A records for when this occurs. Our dimensional schema can be created using this script:


-->Dimensional Data
create table DimPerson
(
PersonKey int identity(0,1) not null,
PersonID int,
PersonName varchar(10),
InferredMemberYN char(1),
constraint pk_DimPerson primary key (PersonKey)
)
go

insert into DimPerson (PersonID, PersonName, InferredMemberYN)
values
(0, 'N/A', 'Y'),
(1, 'Person1','N'),
(2, 'Person2', 'N')
go

create table DimDepartment
(
DepartmentKey int identity(0,1) not null,
DepartmentID int,
DepartmentLocID int,
DepartmentName varchar(25),
InferredMemberYN char(1),
constraint pk_DimDepartment primary Key (DepartmentKey)
)
 go

 insert into DimDepartment(DepartmentID, DepartmentLocID, DepartmentName, InferredMemberYN)
 values
 (0, 0, 'N/A', 'Y'),
 (1, 1, 'Department11','N'),
 (1, 2,'Department12','N'),
 (2, 1,'Department21','N'),
 (2, 2,'Department22','N')
 go


 create table DimProduct
 (
 ProductKey int identity(0,1) not null,
 ProductID int,
 ProductTypeID int,
 ProductColor varchar(10),
 ProductName varchar(20),
 InferredMemberYN char(1),
constraint pk_DimProduct primary Key (ProductKey)
)
go


insert into DimProduct(ProductID, ProductTypeID, ProductColor, ProductName, InferredMemberYN)
values
(0, 0, 'N/A', 'N/A', 'Y'),
(1, 2, 'Red', 'ProductRed', 'N'),
(1, 2, 'Green', 'ProductGreen', 'N'),
(3, 4, 'Blue', 'ProductBlue', 'N')
go

 Create table DimDay
 (
 DayKey int not null,
 DayDate datetime,
 constraint pk_DimDay primary key (DayKey)
 )
 go

 insert into DimDay (DayKey, DayDate)
 values 
 (0, '1/1/1900')
 (1, '1/1/2014'),
 (2, '1/2/2014'),
 (3, '1/3/2014'),
 (4, '1/4/2014'),
 (5, '1/5/2014'),
 (6, '1/6/2014')
 go

 create table FctPurchase
 (
 PurchasePersonKey int,
 PurchaseDepartmentKey int,
 DayKey int,
 PurchaseProductKey int,
 PurchaseID int,
 PurchaseValue money,
 constraint pk_FctPurchase primary key (PurchaseID)
 )
 go

     One part of this solution is a stored procedure that utilizes a type in our database. This type will be used to hold natural key values and names passed from SSIS in the data flow:

-->Type to hold our natural key values
 create type NaturalKeys
as table
(
  NaturalKeyName varchar(100),
  NaturalKeyValue varchar(100)
);
go


-->Stored proc to handle inferred members
create procedure [dbo].[LoadInferredMember]
@DimensionName varchar(100), @SurrogateKey varchar(100), @NaturalKeys as NaturalKeys READONLY

as
begin try
  begin transaction
  set xact_abort on
declare
@ReturnSurrogateKey bigint = null,
@SqlStatement nvarchar(max),
@NaturalKeysStatement varchar(max) =' ',
@InsertStatement nvarchar(max)=' ',
@Values varchar(max)=' ';

-->If any of our natural keys are null we select the N/A record to return
if exists(select 1 from @NaturalKeys where NaturalKeyValue is null)
       begin
       set @ReturnSurrogateKey= 0
       end
else
       begin

       -->Generates Select statement
       select @NaturalKeysStatement = @NaturalKeysStatement+' '+NaturalKeyName+'='''+NaturalKeyValue +''' and'
       from @NaturalKeys
       set @NaturalKeysStatement=SUBSTRING( @NaturalKeysStatement,0,len(@NaturalKeysStatement)-2)

       -->Generates the insert into statement
       select @InsertStatement = @InsertStatement+NaturalKeyName+','
       from @NaturalKeys

       set @InsertStatement = 'insert into '+@DimensionName +' ('+@InsertStatement+'InferredMemberYN) values'

       -->Generates the Values statement
       select @Values = @Values+' '''+NaturalKeyValue+''' ,'
       from @NaturalKeys

       set @Values= '('+@Values+'''Y''),'
       set @Values=SUBSTRING( @Values,0,len(@Values))

       set @InsertStatement=@InsertStatement+' '+@Values+' Select @ReturnSurrogateKey = SCOPE_IDENTITY()'

       -->Check to see if record actually exists
       set @SqlStatement = 'Select @ReturnSurrogateKey='+ @SurrogateKey+' from '+@DimensionName +' where '+@NaturalKeysStatement

       exec sp_executesql @SqlStatement,N'@ReturnSurrogateKey bigint OUTPUT', @ReturnSurrogateKey OUTPUT;


       if @ReturnSurrogateKey is null
       begin
       exec sp_executesql @InsertStatement,N'@ReturnSurrogateKey bigint OUTPUT', @ReturnSurrogateKey OUTPUT;
       end

end 
commit
return @ReturnSurrogateKey
end try
begin catch
  rollback transaction
  raiserror (N'Error has occurred %s %d.', -- Message text.
           10, -- Severity,
           1, -- State,
           N'number', -- First argument.
           5); -- Second argument.
throw
end catch;

go

This stored procedure will first check to see if a record already exists for the natural key passed it in the dimension and return the surrogate key. If it doesn't it will insert the natural key(s), set the InferredMemberYN column to Y, and return the generated surrogate key. In your dimension tables make sure that any other columns that are set to not null have a default value set for them or the insert will naturally fail. 

     The data flow for our example is going to look like this:  

Figure 3. Data Flow Task
This data flow needs 2 variables to work:

Figure 4. SSIS Variables
InferredColumnPrefix is going to be the column prefix were using for our inferred member metadata (explained later in this post). InferredSurrogateKeyValue is the temporary value we are going to assign to a surrogate key column in the data flow for records that do not yet exist in the dimension. 

The package makes use of 2 connection managers. An ole/db and an ado.net:

Figure 5. Connection Managers
I'm using the ole/db one for my source and destination components. But of course you can use any one you want. The only reason why I'm showing this is to stress the creation of the ado.net one that points to the database your dimensions are located. This data source is required by the script component. 

     In our ole/db source, our source query is going to look like this:

-->SSIS Query for FctPurchase
 select
   coalesce(dper.PersonKey, -1) as PurchasePersonKey
  ,coalesce(ddep.DepartmentKey, -1) as PurchaseDepartmentKey
  ,coalesce(pr.ProductKey, -1) as ProductKey
  ,dday.DayKey
  ,pu.PurchaseID
  ,pu.PurchaseValue
  ,pu.DepartmentID as PurchaseDepartmentID  -->Natural Key (Department)
  ,pu.DepartmentLocID as PurchaseDepartmentLocID -->Natural Key (Department)
  ,pu.PersonID as PurchasePersonID -->Natural Key (Person)
  ,pu.ProductID  -->Natural Key (Product)
  ,pu.ProductTypeID -->Natural Key (Product)
  ,pu.ProductColor -->Natural Key (Product)
  from Purchase pu
         join DimDay dday on pu.PurchaseDate=dday.DayDate
         left join DimPerson dper on dper.PersonID=pu.PersonID
         left join DimDepartment ddep on ddep.DepartmentID=pu.DepartmentID
                           and ddep.DepartmentLocID=pu.DepartmentLocID
         left join DimProduct pr on pu.ProductID=pr.ProductID
                   and pu.ProductTypeID=pr.ProductTypeID
                           and pu.ProductColor=pr.ProductColor

 If our query does not detect a record for our dimension(s), we replace it with -1. Make sure this number matches what our InferredSurrogateKeyValue variable is set to. 

     In our derived column component we are going to enter some metadata that will aid our script component in its task to create inferred members. We need to tell our script component the dimension the keys belong to, what name the keys are in the data flow and what names they are in the dimension. We enter them in this fashion:

Figure 6. Derived Component

You'll notice that each column we add ties to a surrogate key in the data flow prefixed with IFX. This has to match the value in our InferredColumnPrefix variable. The expression has 3 main sections delimited by pipes. The first section has the name of the dimension. The second section has a comma delimited mapping between the name of the surrogate key in the data flow to the name of the surrogate key in the dimension. The third section has a comma delimited mapping between the name of the natural key in the data flow to the name of the natural key in the dimension. If you have composite keys you can separate each set of them using a ^. This image may help to visualize this more:

Figure 7. Metadata Mapping
     With this done, we can now start building our script component. Drag a script component onto the design surface. When prompted to pick a type, choose transformation:

Figure 8. Script Component Type
     
     On the Script screen make sure to add our 2 variables as ReadOnlyVariables:

Figure 9. Script Component Script Screen

     On the Input Columns screen, make sure to check all of the surrogate key, natural key and metadata columns in the data flow:

Figure 10. Script Component Input Columns Screen

     On the connection manager screen make sure to add our ado.net connection and name it DbConnection:

Figure 11. Script Component Connection Managers Screen
    
      With the configurations out of the way, we're ready to start coding. Go back to the Script screen, click on the Edit Script button and paste the following code into main.cs:

#region Help:  Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services data flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script component. */
#endregion

#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using Microsoft.SqlServer.Dts.Runtime;
using System.Reflection;
using System.Data.SqlClient;
using System.Collections.Generic;

#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{
    private PipelineBuffer _inputBuffer;
    IDTSConnectionManager100 _connMgr;
    IList<string> _propertyList;
    int[] _BufferColumnIndexes;

    /// <summary>
    /// Called at run time when a PipelineBuffer from an upstream component
    /// is available to the component to let the component process the incoming rows.
    /// </summary>
    /// <param name="InputID">The ID of the input of the component.</param>
    /// <param name="Buffer">The PipelineBuffer object.</param>
    public override void ProcessInput(int InputID, Microsoft.SqlServer.Dts.Pipeline.PipelineBuffer Buffer)
    {
        _inputBuffer = Buffer;
        _BufferColumnIndexes = GetColumnIndexes(InputID);
        base.ProcessInput(InputID, Buffer);
    }

    /// <summary>
    ///
    /// </summary>
    /// <param name="Buffer">The script buffer</param>
    public override void Input0_ProcessInput(Input0Buffer Buffer)
    {
        try
        {
            //Gets a list of index and name of columns in the script buffer
            if (_propertyList == null)
            {
                _propertyList = new List<string>();

                string inputBufferName = Buffer.GetType().Name;
                var properties = (typeof(Input0Buffer)).GetProperties();
                foreach (var property in properties)
                {
                    if (!property.Name.EndsWith("_IsNull"))
                        _propertyList.Add(property.Name);
                }
            }

            base.Input0_ProcessInput(Buffer);
        }
        catch (Exception e)
        {
            //Send error message to the UI
            FailComponent(e.ToString());
            //Comment out throw to not fail the package but still report to ui
            throw;
        }
    }


    /// <summary>
    /// This method is called once for every row that passes through the component from Input0.
    ///
    /// Example of reading a value from a column in the the row:
    ///  string zipCode = Row.ZipCode
    ///
    /// Example of writing a value to a column in the row:
    ///  Row.ZipCode = zipCode
    /// </summary>
    /// <param name="inputBufferRow">The row that is currently passing through the component</param>
    public override void Input0_ProcessInputRow(Input0Buffer inputBufferRow)
    {

        //Loops through each column
        foreach (IDTSInputColumn100 column in this.ComponentMetaData.InputCollection[0].InputColumnCollection)
        {
            //Determine if the column is a meta data column for the surrogate keys
            if (column.Name.StartsWith(Variables.InferredColumnPrefix))
            {
                //Get info about inferred column
                PropertyInfo columnValue = inputBufferRow.GetType().GetProperty(column.Name);

                //Parse out each element of the column
                string[] mainComponent = columnValue.GetValue(inputBufferRow, null).ToString().Split('|');

                //Parse out the name of the keys
                string[] keyName = mainComponent[1].Split(',');

                //Parse out the natural keys
                string[] naturalKeys = mainComponent[2].ToString().Trim().Split('^');

                //Parse out the name of the dimension
                string dimName = mainComponent[0].ToString();

                //Get column info through reflection
                PropertyInfo keyColumnValue = inputBufferRow.GetType().GetProperty(keyName[0]);


                ///If the surrogate key = the InferredSurrogateKeyValue ssis variable
                if ((int)keyColumnValue.GetValue(inputBufferRow, null) == Variables.InferredSurrogateKeyValue)
                {

                    //Create data table that will be sent to the stored proc
                    DataTable dt = GetNaturalKeyTable(naturalKeys, inputBufferRow);

                    //Set index variable to index of the surrogate key in the input pipeline buffer5
                    int index = (_propertyList.IndexOf(keyColumnValue.Name));

                    //Get surrogate key from database
                    int sk = GetSurrogateKey(dimName, keyName[1], dt);

                    //Set surrogate key to returned value, replacing InferredSurrogateKeyValue
                    _inputBuffer.SetInt32(_BufferColumnIndexes[index], sk);
                }
            }
        }
    }

    /// <summary>
    /// Returns a surrogate key from the database to replace the inferred surrogate key in the buffer
    /// </summary>
    /// <param name="dimName">The name of the dimension the inferred surrogate key references</param>
    /// <param name="keyName">The name of the surrogate key in the dimension</param>
    /// <param name="dt">The data table containing the natural key names and values to be inserted into the dimension</param>
    /// <returns>The surrogate key to replace the inferred key</returns>
    private int GetSurrogateKey(string dimName, string keyName, DataTable dt)
    {
        //Set up connections for the database call
        _connMgr = this.Connections.DbConnection;
        SqlConnection sqlConn = (SqlConnection)_connMgr.AcquireConnection(null);

        //Call stored procedure to insert our inferred member and return the surrogate key
        SqlCommand cmd = new SqlCommand("LoadInferredMember", sqlConn);
        cmd.CommandType = CommandType.StoredProcedure;
        SqlParameter paramDimName = cmd.Parameters.AddWithValue("@DimensionName", dimName);
        paramDimName.SqlDbType = SqlDbType.VarChar;
        SqlParameter paramSurrogateKey = cmd.Parameters.AddWithValue("@SurrogateKey", keyName);
        paramSurrogateKey.SqlDbType = SqlDbType.VarChar;
        SqlParameter paramNaturalKeys = cmd.Parameters.AddWithValue("@NaturalKeys", dt);
        paramNaturalKeys.SqlDbType = SqlDbType.Structured;

        var returnParameter = cmd.Parameters.Add("@ReturnSurrogateKey"SqlDbType.BigInt);
        returnParameter.Direction = ParameterDirection.ReturnValue;
        //Execute query
        cmd.ExecuteNonQuery();
        //Close database connection
        sqlConn.Close();

        return (int)returnParameter.Value;

    }

    /// <summary>
    /// Returns a data table containing natural keys parsed from a string array
    /// and values pulled from the input buffer row
    /// </summary>
    /// <param name="naturalKeys">An array that contains the natural key mappings from the input buffer to the dimension table</param>
    /// <param name="inputBufferRow">The input row buffer that contains the natural key values</param>
    /// <returns>A data table containing the parsed out natural key names with their values</returns>
    private DataTable GetNaturalKeyTable(string[] naturalKeys, Input0Buffer inputBufferRow)
    {
        //Create data table that will be sent to the stored proc
        DataTable dt = new DataTable();

        //Data column to hold the natural key names 
        DataColumn NaturalKeyName = new DataColumn();
        NaturalKeyName.ColumnName = "NaturalKeyName";
        NaturalKeyName.DataType = System.Type.GetType("System.String");

        //Data column to hold the natural key values 
        DataColumn NaturalKeyValue = new DataColumn();
        NaturalKeyValue.ColumnName = "NaturalKeyValue";
        NaturalKeyValue.DataType = System.Type.GetType("System.String");

        //Add the columns to the data table
        dt.Columns.Add(NaturalKeyName);
        dt.Columns.Add(NaturalKeyValue);


        //Row object to hold the columns
        DataRow row;

        //Insert natural keys into data table
        foreach (string nat in naturalKeys)
        {
            row = dt.NewRow();

            //Parse out each natural key individually
            string[] naturalKey = nat.Trim().Split(',');

            //Get natural key values
            PropertyInfo naturalColumnValue = inputBufferRow.GetType().GetProperty(naturalKey[0]);

            //Load natural key name into data table
            row["NaturalKeyName"] = naturalKey[1].ToString();

            //If natural key value is null then send null to stored proc
            if (_inputBuffer.IsNull(_BufferColumnIndexes[(_propertyList.IndexOf(naturalColumnValue.Name))]) == true)
            {
                row["NaturalKeyValue"] = null;
            }
            //Else send the real value
            else
            {
                row["NaturalKeyValue"] = naturalColumnValue.GetValue(inputBufferRow, null).ToString();
            }

            //Add row to the data table
            dt.Rows.Add(row);

        }

        return dt;
    }

    /// <summary>
    /// Outputs an error message
    /// </summary>
    /// <param name="errorMsg">The exception message</param>
    private void FailComponent(string errorMsg)
    {
        bool fail = false;
        IDTSComponentMetaData100 compMetadata = this.ComponentMetaData;
        compMetadata.FireError(1, "Error handling inferred members!", errorMsg, "", 0, out fail);
    }

}

     If you don't want to use the -1 value (or any numerical value) and just want to have the surrogate keys come in null in the data flow, to be replaced,  just replace this line of code:


 //If the surrogate key = the InferredSurrogateKeyValue ssis variable
 if ((int)keyColumnValue.GetValue(inputBufferRow, null) == Variables.InferredSurrogateKeyValue)

with this line of code:


//If the surrogate key is null
if (_inputBuffer.IsNull(_BufferColumnIndexes[(_propertyList.IndexOf(keyColumnValue.Name))])==true)

     This might make dealing with the non existing records easier, especially if you're using lookup transformations to get surrogate keys from dimensions instead of in the ole/db source query. If you go this route you no longer need to use the InferredSurrogateKeyValue SSIS variable.

     Save, build and were ready to run the package. I set up data viewers so we can see what's going on. You can see before we hit the script component we have -1 values for rows that were not found in the dimension:

Figure 12. Data Before Inferred Members are Added
After we run this data through our script component:

Figure 13. Data After Inferred Members are Added
      You can see that the -1 values were replaced with surrogate keys returned from the database. If we look at our dimensions we can see that data has been added and matches what we see in the data flow:

select * from DimDepartment
select * from DimPerson
select * from DimProduct

     
Figure 14. Rows in Dimensions
     
     The natural keys have been added to each dimension table and the InferredMemberYN column has been set to Y. The name columns have all null values. These could have been set with default values like "Unknown" or something. But its up to each individual implementation on how to handle this. What's good about this implementation is that you will get an actual surrogate key to insert into your fact rather than a pointer to a dummy record. When the actual record for the dimension comes in to staging, it will simply update the existing inferred member we created with actual data and set InferredMemberYN to N. Since we used a real record instead of a dummy one we won't need to go back and update the fact record once this occurs.

2 comments:

  1. Hi,

    I have few questions on SSIS Performance tunning.

    Suppose My system has 8GB RAM and I have installed SQL Server 2012 along with DataTools.
    SQL Server uses 7GB memory. Will SSIS uses the 7GB memory or remaining 1 GB memory for buffers?

    ReplyDelete
    Replies
    1. The 1 GB, SSIS does not share memory space with SQL Server.

      Delete