Monday, December 2, 2013

Building Multiple Asynchronous Outputs for a Script Component Source

     When working with either generating or consuming data, in a script component source, you may find yourself having to create more than one output for the data. Especially when having to deal with nested arrays in objects, a custom data source component that pulls different partitions of data from a data base, or objects that in turn have nested objects in them that contain data that need separate outputs. When writing the code for these outputs in the overridden CreateNewOutPutRows method, you would usually call the code for each of these synchronously(one at a time). To show how this works let's create a script component source that has 3 outputs that we will call synchronously. The data flow for this will look like this:

Figure 1. Data Flow

     I changed the DefaultBufferMaxRows property for the data flow to 3,000,000 rows for this example. This means that each output is allowed 3 million rows per output buffer. This will be our baseline for performance measurements. The outputs for this script component source will each contain 1 integer field:


Figure 2. Inputs and Outputs

     In the code for our script component we're going to create a class that contains 3 arrays, that contain 65 million integers, for each of the 3 outputs and call these outputs synchronously.

#region Namespaces
using System;
using System.Data;
using System.Threading.Tasks;
using Microsoft.SqlServer.Dts.Pipeline.Wrapper;
using Microsoft.SqlServer.Dts.Runtime.Wrapper;
#endregion

/// <summary>
/// This is the class to which to add your code.  Do not change the name, attributes, or parent
/// of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Pipeline.SSISScriptComponentEntryPointAttribute]
public class ScriptMain : UserComponent
{

    public override void CreateNewOutputRows()
    {
        IDTSComponentMetaData100 compMetadata = this.ComponentMetaData;
        ArrayA AClass = new ArrayA();

              //Outputs data to output A
              OutputA(AClass.AcompMetadata);
              //Outputs data to output B
              OutputB(AClass.BcompMetadata);
              //Outputs data to output B
              OutputC(AClass.CcompMetadata);
           
    }

    private void OutputA(int[] iAarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output A [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iAarray)
        {
            ABuffer.AddRow();
            ABuffer.OUTA = i;
        }
        OutputMessage(String.Format("Completed sending data to output A [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    private void OutputB(int[] iBarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output B [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iBarray)
        {
            BBuffer.AddRow();
            BBuffer.OUTB = i;
        }
        OutputMessage(String.Format("Completed data to output B [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    private void OutputC(int[] iCarrayIDTSComponentMetaData100 compMetadata)
    {
        OutputMessage(String.Format("Sending data to output C [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
        foreach (int i in iCarray)
        {
            CBuffer.AddRow();
            CBuffer.OUTC = i;
        }
        OutputMessage(String.Format("Completed data to output C [{0}]"DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")), compMetadata);
    }

    public class ArrayA
    {
        public int[] A = new int[65000000];
        public int[] B = new int[65000000];
        public int[] C = new int[65000000];
        public ArrayA()
        {
            for (int i = 0; i < A.Lengthi++)
            {
                A[i] = i;
                B[i] = i;
                C[i] = i;
            }
        }
    }

    private void OutputMessage(string message, IDTSComponentMetaData100 compMetadata)
    {
        {
            bool mbool = false;
            compMetadata.FireInformation(1, compMetadata.Name, message, "", 0, ref mbool);
        }
    }


}

Let's run the package and watch the outputs run synchronously one after the other. First A outputs:
Figure 3. Output A Running

After A has completed B kicks off:

Figure 4. Output B Running

Then finishes up the outputs with C:


Figure 5. Output C Running

When viewing the execution results we can see each output ran right after the other and took about 41 seconds to complete total:

Figure 6. Synchronous Results


 Now while 41 seconds is good, we can do better. We can change this code so that all 3 outputs run asynchronously(at the same time) on their own threads. To do this we take advantage of a task factory. We create tasks to run each of the methods that output data to the 3 outputs. The task factory will call these tasks asynchronously. So we change the code to call our methods to:

Task[] tasks = new Task[3];

tasks[0] = Task.Factory.StartNew(() => OutputA(AClass.AcompMetadata));
tasks[1] = Task.Factory.StartNew(() => OutputB(AClass.BcompMetadata));
tasks[2] = Task.Factory.StartNew(() => OutputC(AClass.CcompMetadata));
//Wait for all tasks to finish before going to post execute
Task.WaitAll(tasks);

Let's run the package again and see how it performs with asynchronous outputs. At a first look we can see all outputs pushing out data at the same time:


Figure 7. Outputs Running Asynchronously
From viewing the execution results we can verify that they all started pushing out data at the same time, and finished in about 30 seconds:


Figure 8. Asynchronous Results
This means we increased our performance by about 27%. If you have packages that pump out a lot of data, and take a long time to finish, a 27% increase in performance can mean a lot. Now keep in mind your mileage may vary based on memory/number of processors/cores etc. 

No comments:

Post a Comment