Tuesday, November 26, 2013

Handling Late Arriving/Open For Write Flat Files in SSIS

     If you have to source flat files for a data warehouse, then you may find yourself at the mercy of the source system for when these files are produced. If the times that these jobs complete vary, it can be quite a pain when trying to schedule your staging loads.  If you miss the window, or get a failure because the file is being written to when the job kicks off, then you have to start the job again. If there are data dependencies on these flat files in later parts of the job, then getting that data loaded into the data marts may have to be delayed as well. 

     What if you had a general idea of a window of time that these flat files were most likely to arrive. Say the flat files are supposed to be there every day at 5:00 pm, but never show up later than 5:30 pm. Wouldn't it be nice to have your load job be more flexible and give the file some time to show up before running the rest of the job, and not having to later bug a DBA to kick it off again to get the late arriving files? Well, luckily it's pretty easy to do, and I'll show you in this post how to do it.

     The control flow for this example is going to look like this:


Figure 1. Control Flow

    We're going to take advantage of a Script Task to do this. If we have success we process the file, if not we log it and/or shoot off an email. First thing we need to do is create our flat file connection manager for our test file. For this example I created a test directory in the root of c: called test. In the test directory I created a file called test.txt that the file connection can reference:


Figure 2. Flat File Connection Manager

    Next, we need to specify some configurations for our script task by using SSIS variables. The first variable we're going to create will be called minutesToCancel. This variable will tell the script task how many minutes to wait for the file to arrive in the directory before timing out. The second variable will be called secondsToTryAgain. This variable will tell the script task how many seconds to wait after not seeing the file in the directory before checking again to see if it arrived. In my example here I set the minutesToCancel to 1 minute and my secondsToTryAgain to 15 seconds. These can be changed to whatever you want based on your particular setup.


Figure 3. SSIS Variables
   
     With our flat file connection manager set up and SSIS variables created, we can start coding. Drag a script task onto the control flow design surface. On the Scipt screen, make sure to select our variables minutesToCancel and secondsToTryAgain as ReadOnlyVariables. This way the script task can access them in code. Click the Edit Script button so we can start coding.


Figure 4. Script Task Script Screen


Paste the following code into the script task:


#region Namespaces
using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.Threading.Tasks;
using System.Threading;
using System.IO;
#endregion

#region Class
/// <summary>
/// ScriptMain is the entry point class of the script.  Do not change the name, attributes,
/// or parent of this class.
/// </summary>
[Microsoft.SqlServer.Dts.Tasks.ScriptTask.SSISScriptTaskEntryPointAttribute]
public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
{

    //Connection manager for the flat file
    ConnectionManager cm;
    //The cancellation token for cancelling the task
    CancellationTokenSource tokenSource;
    //Boolean to determine if file was found
    bool connected = false;

    #region Methods
    /// <summary>
    /// This method is called when this script task executes in the control flow.
    /// Before returning from this method, set the value of Dts.TaskResult to indicate success or failure.
    /// To open Help, press F1.
    /// </summary>
    public void Main()
    {
        try
        {

            tokenSource = new CancellationTokenSource();
            const int Minutes = 60000;
            const int Seconds = 1000;
            //Set minutes before the package stops looking for the file
            int minutesToCancel = (int)Dts.Variables["minutesToCancel"].Value * Minutes;
            //Seconds for thread to wait before trying to see if the file arrived again
            int secondsToTryAgain = (int)Dts.Variables["secondsToTryAgain"].Value * Seconds;
            //Set connection manager to our flat file connection
            cm = Dts.Connections["Flat File Connection Manager"];
            //Set up method the task will call
            Action act = () => CheckforFile(tokenSource.Token, cm, connected, secondsToTryAgain);
            System.Threading.Tasks.Task task = new System.Threading.Tasks.Task(act);
            task.Start();
            //Timeout after specified minutes
            if (!task.Wait(minutesToCancel, tokenSource.Token))
            {
                Failure("Checking for file timedout");//can change to Success() with message if you dont want package to fail for timeout
            }
        }
        catch (OperationCanceledException)
        {
            //do nothing

        }

        catch (Exception e)
        {
            FailComponent(e.ToString());
        }

    }
    /// <summary>
    /// Loops until file arrives or timeout
    /// </summary>
    /// <param name="cancelToken">The token used to trigger task cancellation</param>
    /// <param name="cm">Connection manager for the flat file</param>
    /// <param name="connected">Boolean to set if file arrives</param>
    /// <param name="secondsToTryAgain">The number of milliseconds to wait before trying to see if the file arrived again</param>
    public void CheckforFile(CancellationToken cancelToken, ConnectionManager cm, bool connected, int secondsToTryAgain)
    {
        //Determine if file locked message needs to be resent
        bool resendMessage = true;

        //loop while we haven't triggered a cancellation and file hasn't arrived yet
        while (!cancelToken.IsCancellationRequested)
        {

            //If file arrives trigger success
            if (File.Exists(cm.ConnectionString))
            {
             
                try
                {
                    //Attempt to read first character of file
                    using (TextReader reader = File.OpenText(cm.ConnectionString))
                    {
                        char[] block = new char[1];
                        reader.ReadBlock(block, 0, 1);
                    }
                    //If file is available and not being written to mark success
                    connected = true;
                    Success("File found! Proceeding to process");
                }
                //If file is being written to catch exception
                catch (IOException)
                {
                    if (resendMessage)
                    {
                        InfoComponent("File arrived, but is still being written to");
                        resendMessage = false;
                    }
                }
            }

            //Sleep for specified seconds
            Thread.Sleep(secondsToTryAgain);
        }
    }

    /// <summary>
    /// Stops the while loop and reports success
    /// </summary>
    /// <param name="msg">Success message</param>
    public void Success(string msg)
    {
        if (!tokenSource.IsCancellationRequested)
        {
            InfoComponent(msg);
            tokenSource.Cancel();
            Dts.TaskResult = (int)ScriptResults.Success;

        }
    }

    /// <summary>
    /// Stops the while loop and reports failure
    /// </summary>
    /// <param name="msg">Failure Message</param>
    public void Failure(string msg)
    {
        if (!tokenSource.IsCancellationRequested)
        {
            FailComponent(msg);
            tokenSource.Cancel();
            Dts.TaskResult = (int)ScriptResults.Failure;

        }
    }

    /// <summary>Outputs an Error</summary>
    /// <param name="errorMsg">The error message to send to the UI</param>
    private void FailComponent(string errorMsg)
    {
        Dts.Events.FireError(0, "Error:", errorMsg, String.Empty, 0);
    }
    /// <summary>Outputs a Message</summary>
    /// <param name="errorMsg">The information message to send to the UI</param>
    private void InfoComponent(string infoMsg)
    {
        bool fail = false;
        Dts.Events.FireInformation(1, "Info:", infoMsg, "", 0, ref fail);
    }
    #endregion

    #region ScriptResults declaration
    /// <summary>
    /// This enum provides a convenient shorthand within the scope of this class for setting the
    /// result of the script.
    ///
    /// This code was generated automatically.
    /// </summary>
    enum ScriptResults
    {
        Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
        Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
    };
    #endregion

}
#endregion


     Let's step through and explain some of this code. In order to keep a thread going and keep checking for the file to arrive we utilize a new feature that was introduced with the .net 4.0 library called Tasks. Tasks make anything to do with threads much easier than previous releases of .net. We declare a Task that will kick off a method that is going to keep checking for the arrival of the file in the directory. In addition to this we create a CancellationToken that is going to be passed to CheckForFile(). This token will let the while loop know when to break. The token is set to true through the CancellationTokenSource when the Cancel() method is called. While this token is set to false we keep looping, checking for a file, seeing if the file is still being written to, then putting the thread to sleep for 15 seconds. Once we time out, or the file arrives and is not being written to, we set the CancellationToken to true and exit the while loop.


     If the file is found, and not being written to, we output to the progress tab a file found message, then proceed to the data flow that will process this file:

Figure 5. Success Message
     If the file has not arrived before the time out threshold, we output an error message and fail the script task(if you don't want the script task to fail on time out call Success() instead of Failure() ):

Figure 6. Failure Message
     If the file has arrived, but is not done being written to, we continue to loop and we output that as well:


Figure 7. File Written to Message
     When the file is done being written to, we output success:


Figure 8. Success After File Finished Being Written

     This can save you a lot of headaches if you have jobs that miss the files arriving by a short time. It can also help you with SSIS packages that contain data flows with flat file sources that blow up when trying to read files that are still being written to from the source systems.

No comments:

Post a Comment