Showing posts with label task. Show all posts
Showing posts with label task. Show all posts

Monday, December 2, 2013

Building Multiple Asynchronous Outputs for a Script Component Source

     When working with either generating or consuming data, in a script component source, you may find yourself having to create more than one output for the data. Especially when having to deal with nested arrays in objects, a custom data source component that pulls different partitions of data from a data base, or objects that in turn have nested objects in them that contain data that need separate outputs. When writing the code for these outputs in the overridden CreateNewOutPutRows method, you would usually call the code for each of these synchronously(one at a time). To show how this works let's create a script component source that has 3 outputs that we will call synchronously. The data flow for this will look like this:

Figure 1. Data Flow

     I changed the DefaultBufferMaxRows property for the data flow to 3,000,000 rows for this example. This means that each output is allowed 3 million rows per output buffer. This will be our baseline for performance measurements. The outputs for this script component source will each contain 1 integer field:


Figure 2. Inputs and Outputs

     In the code for our script component we're going to create a class that contains 3 arrays, that contain 65 million integers, for each of the 3 outputs and call these outputs synchronously.

#region Namespaces
using System;
using System.Data;
using System.Threading.Tasks;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{

    public override void CreateNewOutputRows()
    {
        IDTSComponentMetaData100 compMetadata = this.ComponentMetaData;
        ArrayA AClass = new ArrayA();

              //Outputs data to output A
              OutputA(AClass.AcompMetadata);
              //Outputs data to output B
              OutputB(AClass.BcompMetadata);
              //Outputs data to output B
              OutputC(AClass.CcompMetadata);
           
    }

    private void OutputA(int[] iAarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output A [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iAarray)
        {
            ABuffer.AddRow();
            ABuffer.OUTA = i;
        }
        OutputMessage(String.Format("Completed sending data to output A [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    private void OutputB(int[] iBarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output B [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iBarray)
        {
            BBuffer.AddRow();
            BBuffer.OUTB = i;
        }
        OutputMessage(String.Format("Completed data to output B [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    private void OutputC(int[] iCarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output C [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iCarray)
        {
            CBuffer.AddRow();
            CBuffer.OUTC = i;
        }
        OutputMessage(String.Format("Completed data to output C [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    public class ArrayA
    {
        public int[] A = new int[65000000];
        public int[] B = new int[65000000];
        public int[] C = new int[65000000];
        public ArrayA()
        {
            for (int i = 0; i < A.Lengthi++)
            {
                A[i] = i;
                B[i] = i;
                C[i] = i;
            }
        }
    }

    private void OutputMessage(string message, IDTSComponentMetaData100 compMetadata)
    {
        {
            bool mbool = false;
            compMetadata.FireInformation(1, compMetadata.Name, message, "", 0, ref mbool);
        }
    }


}

Let's run the package and watch the outputs run synchronously one after the other. First A outputs:
Figure 3. Output A Running

After A has completed B kicks off:

Figure 4. Output B Running

Then finishes up the outputs with C:


Figure 5. Output C Running

When viewing the execution results we can see each output ran right after the other and took about 41 seconds to complete total:

Figure 6. Synchronous Results


 Now while 41 seconds is good, we can do better. We can change this code so that all 3 outputs run asynchronously(at the same time) on their own threads. To do this we take advantage of a task factory. We create tasks to run each of the methods that output data to the 3 outputs. The task factory will call these tasks asynchronously. So we change the code to call our methods to:

Task[] tasks = new Task[3];

tasks[0] = Task.Factory.StartNew(() => OutputA(AClass.AcompMetadata));
tasks[1] = Task.Factory.StartNew(() => OutputB(AClass.BcompMetadata));
tasks[2] = Task.Factory.StartNew(() => OutputC(AClass.CcompMetadata));
//Wait for all tasks to finish before going to post execute
Task.WaitAll(tasks);

Let's run the package again and see how it performs with asynchronous outputs. At a first look we can see all outputs pushing out data at the same time:


Figure 7. Outputs Running Asynchronously
From viewing the execution results we can verify that they all started pushing out data at the same time, and finished in about 30 seconds:


Figure 8. Asynchronous Results
This means we increased our performance by about 27%. If you have packages that pump out a lot of data, and take a long time to finish, a 27% increase in performance can mean a lot. Now keep in mind your mileage may vary based on memory/number of processors/cores etc. 

Tuesday, November 26, 2013

Handling Late Arriving/Open For Write Flat Files in SSIS

     If you have to source flat files for a data warehouse, then you may find yourself at the mercy of the source system for when these files are produced. If the times that these jobs complete vary, it can be quite a pain when trying to schedule your staging loads.  If you miss the window, or get a failure because the file is being written to when the job kicks off, then you have to start the job again. If there are data dependencies on these flat files in later parts of the job, then getting that data loaded into the data marts may have to be delayed as well. 

     What if you had a general idea of a window of time that these flat files were most likely to arrive. Say the flat files are supposed to be there every day at 5:00 pm, but never show up later than 5:30 pm. Wouldn't it be nice to have your load job be more flexible and give the file some time to show up before running the rest of the job, and not having to later bug a DBA to kick it off again to get the late arriving files? Well, luckily it's pretty easy to do, and I'll show you in this post how to do it.

     The control flow for this example is going to look like this:


Figure 1. Control Flow

    We're going to take advantage of a Script Task to do this. If we have success we process the file, if not we log it and/or shoot off an email. First thing we need to do is create our flat file connection manager for our test file. For this example I created a test directory in the root of c: called test. In the test directory I created a file called test.txt that the file connection can reference:


Figure 2. Flat File Connection Manager

    Next, we need to specify some configurations for our script task by using SSIS variables. The first variable we're going to create will be called minutesToCancel. This variable will tell the script task how many minutes to wait for the file to arrive in the directory before timing out. The second variable will be called secondsToTryAgain. This variable will tell the script task how many seconds to wait after not seeing the file in the directory before checking again to see if it arrived. In my example here I set the minutesToCancel to 1 minute and my secondsToTryAgain to 15 seconds. These can be changed to whatever you want based on your particular setup.


Figure 3. SSIS Variables
   
     With our flat file connection manager set up and SSIS variables created, we can start coding. Drag a script task onto the control flow design surface. On the Scipt screen, make sure to select our variables minutesToCancel and secondsToTryAgain as ReadOnlyVariables. This way the script task can access them in code. Click the Edit Script button so we can start coding.


Figure 4. Script Task Script Screen


Paste the following code into the script task:


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Threading.Tasks;
using System.Threading;
using System.IO;
#endregion

#region Class
/// <summary>
/// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{

    //Connection manager for the flat file
    ConnectionManager cm;
    //The cancellation token for cancelling the task
    CancellationTokenSource tokenSource;
    //Boolean to determine if file was found
    bool connected = false;

    #region Methods
    /// <summary>
    /// This method is called when this script task executes in the control flow.
    /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
    /// To open Help, press F1.
    /// </summary>
    public void Main()
    {
        try
        {

            tokenSource = new CancellationTokenSource();
            const int Minutes = 60000;
            const int Seconds = 1000;
            //Set minutes before the package stops looking for the file
            int minutesToCancel = (int)Dts.Variables["minutesToCancel"].Value * Minutes;
            //Seconds for thread to wait before trying to see if the file arrived again
            int secondsToTryAgain = (int)Dts.Variables["secondsToTryAgain"].Value * Seconds;
            //Set connection manager to our flat file connection
            cm = Dts.Connections["Flat File Connection Manager"];
            //Set up method the task will call
            Action act = () => CheckforFile(tokenSource.Token, cm, connected, secondsToTryAgain);
            System.Threading.Tasks.Task task = new System.Threading.Tasks.Task(act);
            task.Start();
            //Timeout after specified minutes
            if (!task.Wait(minutesToCancel, tokenSource.Token))
            {
                Failure("Checking for file timedout");//can change to Success() with message if you dont want package to fail for timeout
            }
        }
        catch (OperationCanceledException)
        {
            //do nothing

        }

        catch (Exception e)
        {
            FailComponent(e.ToString());
        }

    }
    /// <summary>
    /// Loops until file arrives or timeout
    /// </summary>
    /// <param name="cancelToken">The token used to trigger task cancellation</param>
    /// <param name="cm">Connection manager for the flat file</param>
    /// <param name="connected">Boolean to set if file arrives</param>
    /// <param name="secondsToTryAgain">The number of milliseconds to wait before trying to see if the file arrived again</param>
    public void CheckforFile(CancellationToken cancelToken, ConnectionManager cm, bool connected, int secondsToTryAgain)
    {
        //Determine if file locked message needs to be resent
        bool resendMessage = true;

        //loop while we haven't triggered a cancellation and file hasn't arrived yet
        while (!cancelToken.IsCancellationRequested)
        {

            //If file arrives trigger success
            if (File.Exists(cm.ConnectionString))
            {
             
                try
                {
                    //Attempt to read first character of file
                    using (TextReader reader = File.OpenText(cm.ConnectionString))
                    {
                        char[] block = new char[1];
                        reader.ReadBlock(block, 0, 1);
                    }
                    //If file is available and not being written to mark success
                    connected = true;
                    Success("File found! Proceeding to process");
                }
                //If file is being written to catch exception
                catch (IOException)
                {
                    if (resendMessage)
                    {
                        InfoComponent("File arrived, but is still being written to");
                        resendMessage = false;
                    }
                }
            }

            //Sleep for specified seconds
            Thread.Sleep(secondsToTryAgain);
        }
    }

    /// <summary>
    /// Stops the while loop and reports success
    /// </summary>
    /// <param name="msg">Success message</param>
    public void Success(string msg)
    {
        if (!tokenSource.IsCancellationRequested)
        {
            InfoComponent(msg);
            tokenSource.Cancel();
            Dts.TaskResult = (int)ScriptResults.Success;

        }
    }

    /// <summary>
    /// Stops the while loop and reports failure
    /// </summary>
    /// <param name="msg">Failure Message</param>
    public void Failure(string msg)
    {
        if (!tokenSource.IsCancellationRequested)
        {
            FailComponent(msg);
            tokenSource.Cancel();
            Dts.TaskResult = (int)ScriptResults.Failure;

        }
    }

    /// <summary>Outputs an Error</summary>
    /// <param name="errorMsg">The error message to send to the UI</param>
    private void FailComponent(string errorMsg)
    {
        Dts.Events.FireError(0, "Error:", errorMsg, String.Empty, 0);
    }
    /// <summary>Outputs a Message</summary>
    /// <param name="errorMsg">The information message to send to the UI</param>
    private void InfoComponent(string infoMsg)
    {
        bool fail = false;
        Dts.Events.FireInformation(1, "Info:", infoMsg, "", 0, ref fail);
    }
    #endregion

    #region ScriptResults declaration
    /// <summary>
    /// This enum provides a convenient shorthand within the scope of this class for setting the
    /// result of the script.
    ///
    /// This code was generated automatically.
    /// </summary>
    enum ScriptResults
    {
        Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
        Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
    };
    #endregion

}
#endregion


     Let's step through and explain some of this code. In order to keep a thread going and keep checking for the file to arrive we utilize a new feature that was introduced with the .net 4.0 library called Tasks. Tasks make anything to do with threads much easier than previous releases of .net. We declare a Task that will kick off a method that is going to keep checking for the arrival of the file in the directory. In addition to this we create a CancellationToken that is going to be passed to CheckForFile(). This token will let the while loop know when to break. The token is set to true through the CancellationTokenSource when the Cancel() method is called. While this token is set to false we keep looping, checking for a file, seeing if the file is still being written to, then putting the thread to sleep for 15 seconds. Once we time out, or the file arrives and is not being written to, we set the CancellationToken to true and exit the while loop.


     If the file is found, and not being written to, we output to the progress tab a file found message, then proceed to the data flow that will process this file:

Figure 5. Success Message
     If the file has not arrived before the time out threshold, we output an error message and fail the script task(if you don't want the script task to fail on time out call Success() instead of Failure() ):

Figure 6. Failure Message
     If the file has arrived, but is not done being written to, we continue to loop and we output that as well:


Figure 7. File Written to Message
     When the file is done being written to, we output success:


Figure 8. Success After File Finished Being Written

     This can save you a lot of headaches if you have jobs that miss the files arriving by a short time. It can also help you with SSIS packages that contain data flows with flat file sources that blow up when trying to read files that are still being written to from the source systems.