Tuesday, May 7, 2013

Creating a Script Component in SSIS that can Generate a Hash Value for a Row

    Generating hash values for your data can be extremely useful in data warehousing. It is an excellent way to tell if the data coming through staging, from your source systems, has changed or not. This is accomplished by pushing individual rows of data through some type of hashing algorithm and adding the output as a value on your data stream. When that particular row comes through staging again you can compare the hash value generating in staging to the hash stored with your data in the data warehouse. This enables you to determine if a change has occurred  in the row, and if your data warehouse row needs to be updated(or if you need to insert a new record if you are change tracking).

     Without utilizing something like a hash, you will have to compare every column in your incoming data stream to its associated column in your data warehouse in something like a conditional split transformation with a syntax like (COLUMN_A_NEW != COLUMN_A_DW) || (COLUMN_B_NEW != COLUMN_B_DW) ...etc. For records with many columns, this can be a daunting task. This will also have to be custom to every package you have as data sources for each package will be different. This provides little to no re-usability and is very inflexible to change. Thus, we use a hash.

     I've seen a couple of ways this has been done in different systems, some good....some not so good. One way, if your source system developers have blessed you with this feature, is to simply get a hash value for your record from the source system. You can store this hash value with the data and do a simple comparison in your conditional split transformation, HASH_VALUE_SOURCE != HASH_VALUE_DW. If they are different we process the update, if not we do nothing.  Another way I've seen is with creating a custom scripting component that maps out each individual column coming through and generating a hash. While this is still better than doing this in a conditional split transformation, its still too customized to the package and inflexible to change. Thus every time you add/remove a column to the transformation, the code in the scripting component needs to be updated. There is no re-usability.  A third way I've seen is simply to not use a hash at all. Just use some kind of add/change dating mechanism from the source, to determine what records to stage, and blindly update the data warehouse, whether records have changed or not. Now this is not very efficient at all is it?

     The best way, in my opinion, is to create a scripting component that is flexible to change, reusable, ASCII/Unicode capable and the ability to customize features (to a certain degree). I'll take you step by step through a simple implementation of this:

     First here is what our sample package looks like (in the data flow tab):


Figure 1. Data Flow 
Our OLE DB Source contains 2 columns, a transaction identifier,varchar(50), and a note associated with the transaction ,varchar(max), (this can also be a TEXT or NTEXT or nvarchar(max) depending on your particular needs).


Figure 2. OLE-DB Data Source
         Some of your projects may need to develop hash values that are dependent on case sensitive or case-insensitive data, i.e. "this is a test = This is a Test" or "this is a test != This is a Test". For that let's create a variable that our scripting component can later use to determine case sensitivity (by the way, when dealing with variables in SSIS I find bids helper invaluable). In this example I'll make it case sensitive by defaulting my string variable to Y.


Figure 3. Variable for Case Sensitivity

  In the tool box drag a Script Component onto the data flow designer surface. You will first be asked to a choose a scripting component type:


Figure 4  Script Component Type Selection
      Select transformation, as we want data to pass through and not be a source or destination for it. In this example I named this component "
Script Component - Populate Hash Value Field with BLOBS". Now open the component. The default screen you should see is:
Figure 5. Scripting Component
     Leave the ScriptLanguage to the default of C# because Visual Basic is.......yuck. Now we want our scripting component to make use of the case sensitive variable we created, so we need to add this to our list of ReadOnlyVariables this scripting component can use.


Figure 6. Select Case Sensitive Variable


     Now that we selected our variable, lets select our input columns. 


Figure 7. Input Columns
In our data flow we should have 3 columns, RMA_ITEM_ID, ITEM_COMMENT and LAST_MOD_DATE_df. Since LAST_MOD_DATE_df is different every time we run the package, we don't want this value considered when generating the hash value. If we did the hash would be different every time and would defeat the purpose of tracking changes to our row. Keep this in mind when we create our scripting component.

     Now that we selected the inputs for our scripting component, we need to create an output column for the hash value we want the scripting component to generate. Click on Inputs and Outputs in the menu. In that screen expand Output 0 and click on Output Columns. Then click on the Add Column button and create a new output column for our hash value. 


Figure 8. Output Columns
     We're not using any Connection Managers for this example, so lets jump back to the script menu and click the Edit Script button. Here is where we want to code our method for creating a hash, see code below:

#region Help:  Introduction to the Script Component
/* The Script Component allows you to perform virtually any operation that can be accomplished in
 * a .Net application within the context of an Integration Services data flow.
 *
 * Expand the other regions which have "Help" prefixes for examples of specific ways to use
 * Integration Services features within this script component. */
#endregion

#region Namespaces
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
using Microsoft.SqlServer.Dts.Pipeline;
using System.Text;
using System.Windows.Forms;
using System.Security.Cryptography;
using Microsoft.SqlServer.Dts.Runtime;
#endregion

#region Class
/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute()]
public class ScriptMain : UserComponent
{

#region Methods
    private PipelineBuffer inputBuffer;

    /// <summary>
    /// The ProcessInput method is called to provide the component a full PipelineBuffer object that contains rows from the upstream component.
    /// The columns contained in buffer include those columns defined in the IDTSInputColumnCollection100 of the component.
    /// If the component has synchronous outputs, the buffer will also include the columns added to the output column collection by the component,
    /// and all the columns in the output column collection of the components upstream from the component.
    /// </summary>
    /// <param name="InputID">The ID of the input of the component.</param>
    /// <param name="Buffer">The PipelineBuffer object.</param>
    public override void ProcessInput(int InputID, Microsoft.SqlServer.Dts.Pipeline.PipelineBuffer Buffer)
    {
        inputBuffer = Buffer;
        base.ProcessInput(InputID, Buffer);
    }
    /// <summary>
    /// This method is called once for every row that passes through the component from Input0.
    ///
    /// Example of reading a value from a column in the the row:
    ///  string zipCode = Row.ZipCode
    ///
    /// Example of writing a value to a column in the row:
    ///  Row.ZipCode = zipCode
    /// </summary>
    /// <param name="Row">The row that is currently passing through the component</param>
    public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        //Holds the number of columns in the row
        int counter = 0;
        //Used to concatenate columns in our row
        StringBuilder values = new StringBuilder();
        //Will be used to hold the value of the column in the loop
        object value = null;
        //Will be used to capture the string
        string probStr = null;
        //Will be used to hold the length of the blob
        int probBlobLen = 0;
        //Will be used to hold the bytes of the blob
        byte[] probbytBlob = null;
        //Will be used to test the type of blob(ASCII or Unicode)
        object tester = null;
        //Will be used to determine case sensitivity
        string caseSensitive = Variables.strCaseSensitive;

        //Loops through the columns in the current record in the buffer
        for (counter = 0; counter < inputBuffer.ColumnCount - 1; counter++)
        {
            //Checks for null values in the column, if not we process the value, else we submit a blank to the string builder
            if (inputBuffer.IsNull(counter) == false)
            {
                //Sets tester to the object type of the column being evaluated in the loop
                tester = inputBuffer[counter].GetType();
                //Gets SSIS specific data type
                BufferColumn bc = inputBuffer.GetColumnInfo(counter);

                //Checks to see if the data is a BLOB
                if (object.ReferenceEquals(tester, typeof(BlobColumn)))
                {

                    //Convert BLOB data to string
                    probBlobLen = (int)inputBuffer.GetBlobLength(counter);
                    probbytBlob = inputBuffer.GetBlobData(counter, 0, probBlobLen);

                    //If input is Unicode
                    if (bc.DataType == DataType.DT_NTEXT)
                    {
                        probStr = System.Text.Encoding.Unicode.GetString(probbytBlob);

                    }
                    //If input is ASCII
                    else
                    {
                        probStr = System.Text.Encoding.ASCII.GetString(probbytBlob);

                    }
                    //Sets the value object to the current column in the loop if BLOB
                    value = probStr;
                }
                else
                {
                    //Sets the value object to the current column in the loop
                    value = inputBuffer[counter].ToString();

                }
                //Appends the StringBuilder with the value from the current column in the loop
                values.Append(value);
            }
            else
            {
                //Appends the String Builder with a blank if the incoming column has a null value
                values.Append("");
            }
        }

        //Sets the output of the component to the SHA1 hash of the String Builder value

        //Not case sensitve
        if (caseSensitive == "N")
        {
            Row.HASHVALUE = CreateHash(values.ToString().ToUpper());
        }
        //Case sensitive
        else
        {
            Row.HASHVALUE = CreateHash(values.ToString());

        }

    }

    /// <summary>
    /// Generates a SHA1 hash value for a string passed
    /// </summary>
    /// <param name="data">Data string that is to be hashed</param>
    /// <returns>SAH1 hash value string</returns>
    public static string CreateHash(string data)
    {
        byte[] dataToHash = (new UnicodeEncoding()).GetBytes(data);
        SHA1CryptoServiceProvider sha = new SHA1CryptoServiceProvider();
        byte[] hashedData = sha.ComputeHash(dataToHash);
        RNGCryptoServiceProvider.Create().GetBytes(dataToHash);
        string s = Convert.ToBase64String(hashedData, Base64FormattingOptions.None);
        return s;
    }
#endregion
}
#endregion

     Lets step through and explain some of this code. First thing you're going to want to do is override the Input0_ProcessInputRow method. Here is where we are going to customize what happens on our data flow. Our main for-loop will loop through the columns of our row in the input buffer. In our loop we want to concatenate the values in our columns into a StringBuilder object. This means converting every value that comes through the loop to a string (this means Blobs as well if (object.ReferenceEquals(tester, typeof(BlobColumn)))).  

     When attempting to convert a Blob to a string encoding is an important factor, thus we want to test the encoding of the Blob so we know which type to use when converting to string if (bc.DataType == DataType.DT_NTEXT). Once we've added our column values to the StringBuilder, we need to generate our hash value for the record and bind that value to the HASH_VALUE output we defined. We do this by calling CreateHash and deciding whether or not our hash should be case sensitive  Row.HASHVALUE = CreateHash(values.ToString()); or case insensitive  Row.HASHVALUE = CreateHash(values.ToString().ToUpper());. The CreateHash method takes a string parameter runs its through a SHA1 hash and returns our hash value. 

     By putting a data viewer on the data flow you can now see this column added:


Figure 9. Data Viewer


 
Unfortunately, the data viewer only displays Blobs as <Long Text>. Here is a message box (MessageBox.Show("Concatenated String: " + values.ToString() +"\r\n" + "Hash Value: " CreateHash(values.ToString()));) to show how this will ultimately look:

Figure 10. Sample Output

    We have now successfully added our hash value to our data stream. We then pass this data through a look-up to determine if the natural key already exists in our destination or not. In this transformation we want to add the hash value for this record that already exists in our warehouse:

Figure 11. Adding the existing hash value to the data stream



 If it already exists we need to send the data through a conditional split transformation to determine if a change has occurred:

Figure 12.  Comparing new hash with existing hash

     If the 2 values are different we know the data has changed and we can move the data off somewhere to stage it for updating the destination. If the values are the same we do nothing.

     With this solution we achieve:
  •  flexible to change (you can add more columns or remove them by simply checking/unchecking them on the script menu in the scripting component)
  • reusable (You can add this as a custom component to your project or simply copy and paste it from one package to another)
  • ASCII/Unicode capable (No matter what kind of data you throw at it ASCII/Unicode, Blob, string, int, date time, etc. you will be able to generate a hash)
  • ability to customize features (Making it case sensitive/case insensitive via passing variables. This can be further customized to add more variables that will change behavior and accommodate your specific project)

12 comments:

  1. Nice job explaining this. I have a question.. All I want to do is combine different column and rows into a single variable and use this variable to send an email.

    Thanks
    Kumar

    ReplyDelete
    Replies
    1. Can you elaborate? You want to concatenate the values of columns?

      Delete
  2. I attempted your method but I am getting an error on the Createhash call. For line "Row.HASHVALUE = CreateHash(values.ToString());" I get an error "Cannot implicitly convert type 'string' to 'int'" Any ideas? I am not sure why I get that error as I want it to be a string. There are varchars, decimals and int in the inputs.
    Thanks

    ReplyDelete
    Replies
    1. Have you tried going into the code editor and building the solution after you selected the fields to add to the component?

      Delete
    2. Sometimes its best to walk away and look at everything again. The issue was I did not set the output data type for 'HASH_VALUE' to string, it was still the default integer data type. (When i built it I got the error i listed at first). So I started all over and that's when I caught that. Thank you for your response.

      Delete
    3. It is working but unfortunately I am finding the hashes are not consistent. I am processing 43k records and hashing about 60 columns (most are integers) and I consistently get about 2000 hashes different every time I run it, those 2000 records are random each time, not repeats. When I use hashbytes in SQL on the same data in I get consistent results. I guess I will need to switch over to that. Thanks

      Delete
    4. Hmm..interesting. I usually catch something like that when I either forget to build the code after adding fields or I add a field that changes all the time, like a sysdatetime() or something. If you have something else that works then by all means use that solution.

      Delete
  3. Hi, this is a great blog post, thanks for posting! One question, how do you control the ordering of columns so that they are in a deterministic order in the InputBuffer? I can't find a mechanism in SSIS to control ordering of columns so I have to trust what's going on under the hood (which I dislike doing!) How can I be sure that the columns are in the same order each time I run this so that the hashes will be the same?

    ReplyDelete
    Replies
    1. You are welcome. To answer your question as to the order in the buffer I think the order is set by the order columns are added to the buffer before the script component. To get a better grasp asto what is going on column wise behind the scenes take a look at this post I made on stack overflow that may give you some insight into the order of columns in the buffer. Htttp://stackoverflow.com/questions/22746994/getting-column-name-from-pipelinebuffer-in-script-component-in-ssis-2012/22770468#22770468

      Delete
    2. Hi Jim, that's great - thanks. I have played about with this by changing rows in the source database then changing them back and the hash is working as expected. Thanks for sharing this. One thing I want to do is to create a custom component in SSIS with this pattern so I can reuse the logic in multiple packages with dynamic data flows without copying and pasting the script all over the place. I will send a link to this once I have it completed, for the good of the community :)

      Delete
  4. Hi nice article, one question how do you track deleted rows using this approach. regards

    ReplyDelete
    Replies
    1. Hey Alex, this can only handle records that come through the data flow. if its deleted you will need another way of determining that. Unless you have a deleted indicator that you can add to the hash that will cause a new hash to be generated and thus triggering an update at the data destination.

      Delete