Thursday, May 16, 2013

Archive Flat Files Processed During ETL for your Data Warehouse Using SSIS

     Your data warehouse will usually draw from multiple sources within your environment. These sources may vary between other databases, web services, flat files, etc. Now while connecting to a source such as a database is pretty straight forward(data providers that comes with SSIS standard), dealing with flat files is an entirely different animal. When the SSIS package, that loads your data from flat files, has to scan the file directory it will have some kind of mechanism to determine whether or not the file has been already loaded into your data warehouse. You would probably log this in a database table somewhere after processing. The more files that stay in this directory, the longer this process will take. So your goal should be 2 fold, 1. archive files that have been loaded and 2. remove files from the archive location after a certain retention period has passed. We can do this using a script task in SSIS with little effort. 

     What we want to do is make sure that we have some kind of controls/configurations in place for this, so that once we migrate this package up through QA-->Production we can alter its behavior without having to re-migrate. This would be file directories to scan, file extensions to look for and the retention period of the files. We would want to store the file directories in table, so that we could always remove ones that we no longer need and add new ones. For this example we will load them in a configuration table called CFG_FILE_DIRECTORY and insert the file directory locations into a column called FILE_DIR.

INSERT INTO CFG_FILE_DIRECTORY VALUES
('C:\FILE_LOADS\FINANCE'),
('C:\FILE_LOADS\MARKETING'),
('C:\FILE_LOADS\SERVICE')

This represents parent directories for our flat files. These can come from different systems from different departments in your organization. In this example, we have 3 departments, Finance, Marketing and Service. The full file hierarchy, with archive locations would look like:

Figure 1. Flat File Directories

     Now we need to set up the database table that will log what files have been processed during ETL. This table would probably have a number of columns that capture data such as time loaded, load fail/success, error message, etc. For the sake of this example we'll just have 2 columns. One called FLAT_FILE_NAME that will contain the name of the file that was loaded in a database table we'll call CFG_LOG_FLAT_FILE. The other we'll call FILE_DIR which will contain the directory of the file.With this done, we need to put some files in these directories that we'll archive and stage some records in CFG_LOG_FLAT_FILE that represent these records.

Figure 2. Files to Archive
We then insert these values into CFG_LOG_FLAT_FILE, as if these files were processed during our ETL job:

INSERT INTO CFG_LOG_FLAT_FILE VALUES
('finance_extract_20130325_061930.csv','C:\FILE_LOADS\FINANCE' ),
('marketing_extract_delta_20130425_044147.csv','C:\FILE_LOADS\MARKETING'),
('service_extract_20130515_044532.csv','C:\FILE_LOADS\SERVICE')

     We also need to set up a file to be deleted. This would represent a file that was archived awhile ago and is beyond a set retention period. Here is one that we will place in our Finance\Archive directory:

Figure 3. File That will Test Retention Period

You can see from this image that the last write date, to this file, was 10/30/2012 (198 days ago at the time of this entry):

Figure 4. File Properties for Archived File
     We now have the files and database table entries set up to test whatever solution we create to deal with archiving/deleting file sources. Next, we need to create a few variables that will handle collecting the list of file directories that contain our flat files, contain a list of file extensions to look for and the retention period for these files. Let's create these 4 variables and name them FILE_DIR (which will hold the value of the file directory we are dealing with at the time), FILE_DIR_ARRAY (which will collect all the parent directories we need to look in from CFG_FILE_DIRECTORY), FILE_EXTENSIONS (which will contain a semi-colon separating list of file extensions of files we wish to archive) and PERIOD (which will contain the retention period of a file in days):

Figure 5. Variables for Archiving Files
     With our variables and files all set up, we can start building our package. Our control flow for this package will look like:

Figure 6. Package Control Flow
  The Execute SQL Task will be used to get the list of file directories from CFG_FILE_DIRECTORY and load them into the FILE_DIR_ARRAY variable. On the General table make sure to pick the database connection of your database, in this example it's Jim_Test. It is also important to make sure the Result Set is Full result set, since we are returning a record set and not just one value. Our SQL Statement will be:

SELECT FILE_DIR FROM CFG_FILE_DIRECTORY


Figure 7. Execute SQL Task General Screen

     Next, we need to set up our variable to capture the file directory names returned from this SQL statement.  On the Result Set screen click the Add button and find the FILE_DIR_ARRAY variable we created:

Figure 8. Execute SQL Task Result Set Screen

     On success we call the foreach loop container which will loop through each directory stored in our variable FILE_DIR_ARRAY. To configure this we need to look at the Collection screen. We need to select the type of enumerator, source variable and the enumeration mode:

Figure 9. Foreach Loop Collection Screen

     This foreach loop will iterate over the values in the FILE_DIR_ARRAY variable. For each iteration we need to set a variable to the value of the file directory in focus in FILE_DIR_ARRAY. For this we use our FILE_DIR variable on the Variable Mappings screen:

Figure 10. Foreach Loop Variable Mappings Screen


     Inside our foreach loop, we want to place our script task that will do our file archiving and deleting. On the Script screen we want to select the variables that the script component will need to perform its task. This includes the current file directory in focus in the loop (FILE_DIR), the file extensions to look for (FILE_EXTENSIONS) and the retention period of the files (PERIOD):

Figure 11. Script Task  Script Screen

     With our variables selected, let's click the Edit Script button and set up our coding environment. Since were going to be using database tables and collections, it would be a good idea to take advantage of LINQ(that was introduced in .NET 3.5). With LINQ we can query a database table and a collection in the same statement, very handy if your're trying to look up a file that was processed in a directory against values stored in a database. In order to do this, we need to use the entity framework, which allows us to do some object relational mapping in our project. In our scripting environment menu click View-->Database Explorer. This will display a window where we can select our database that contains the tables we wish to use, i.e. CFG_FILE_DIRECTORY and CFG_LOG_FLAT_FILE. In this window, click the add connection and navigate to the location of your .mdf file that contains the tables we need(note: this is so much easier in .NET 4.0, you can use a data provider instead of pointing to a physical file):

Figure 12. Add Connection Window

     These tables are now accessible to our project:

Figure 13. Database Explorer Window
     We're not done yet though! In order to use these in our project we need to set up a .dbml(database modeling language) file. This handles loading the meta data of the tables in our database, and lets us use them as objects in our project. To do this, in our scripting environment menu, click Project-->Add New Item. This will bring up a new menu which lets us choose which item to add. On the Data node, select LINQ to SQL Classes:



Figure 14. Add New Item Window
You will now see this file added to our project under project explorer. We now need to add our database tables to the dbml file. To do this double click the fileArchive.dbml in our project explorer. This will bring up a design surface. From the database explorer window drag and drop the 2 tables onto this surface:

Figure 15. DBML Design Surface
     
     Finally! We're ready to do some coding. Paste the following code into your ScriptMain.cs (assuming you named your project archive_files):

using System;
using System.Data;
using Microsoft.SqlServer.Dts.Runtime;
using System.Windows.Forms;
using System.IO;
using System.Collections;
using Wrap = Microsoft.SqlServer.Dts.Runtime.Wrapper;
using System.Data.Common;
using System.Text;
using System.Data.SqlClient;
using System.Linq;
using System.Collections.Generic;
using System.Data.Linq;
using System.Configuration;
using System.Data.OleDb;



namespace archive_files.csproj
{
    [System.AddIn.AddIn("ScriptMain", Version = "1.0", Publisher = "Me", Description = "Used to Delete Old Files and Archive Loaded Files")]
    public partial class ScriptMain : Microsoft.SqlServer.Dts.Tasks.ScriptTask.VSTARTScriptObjectModelBase
    {
        //Enum for script results
        enum ScriptResults
        {
            Success = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Success,
            Failure = Microsoft.SqlServer.Dts.Runtime.DTSExecResult.Failure
        };

        public void Main()
        {
            //Get retention period
            int RetentionPeriod = Convert.ToInt32(Dts.Variables["User::PERIOD"].Value.ToString());
            //Set Directory to archive path
            string directoryArchivePath = Dts.Variables["User::FILE_DIR"].Value.ToString() + "\\archive";
            //Set Directory to parent path
            string directoryParentPath = Dts.Variables["User::FILE_DIR"].Value.ToString();
            //Get file extensions to look for
            string[] extensions = Dts.Variables["User::FILE_EXTENSIONS"].Value.ToString().Split(new char[] { ';' });
            //Array List to hold delete file names
            ArrayList deletefileinfos = new ArrayList();
            //File directory
            DirectoryInfo archiveDi = new DirectoryInfo(directoryArchivePath);
            //Array List to hold archive file names
            ArrayList movefileinfos = new ArrayList();
            //File directory
            DirectoryInfo moveDi = new DirectoryInfo(directoryParentPath);

            try
            {
                //Adds files, that are candidates to be deleted, to the array list
                //if they have the file extensions we care about
                foreach (string ext in extensions)
                {
                    deletefileinfos.AddRange(archiveDi.GetFiles(ext));
                }

                //Adds files, that are candidates to be archived, to the array list
                //if they have the file extensions we care about
                foreach (string ext in extensions)
                {
                    movefileinfos.AddRange(moveDi.GetFiles(ext));
                }

                //Calls the method to delete old files from archive that are beyond the retention period
                deleteFiles(deletefileinfos, directoryArchivePath, RetentionPeriod);

                //Calls the method to archive processed files
                archiveFiles(movefileinfos, directoryParentPath, directoryArchivePath);

                //Return Success
                Dts.TaskResult = (int)ScriptResults.Success;
            }
            catch (Exception e)
            {
                //Fail and output failure message
                bool  fail = false;
                Dts.TaskResult = (int)ScriptResults.Failure;
                Dts.Events.FireInformation(1, "Error Archiving Files!", e.ToString(), "", 0, ref fail);
       
            }

        }
        //Method that deletes files older than the specified retention period
        private void deleteFiles(ArrayList deletefileinfos, string directoryArchivePath, int RetentionPeriod)
        {

            //File directory
            DirectoryInfo archiveDi = new DirectoryInfo(directoryArchivePath);

            //Gets all the file names where the retention period has past
            var query = from FileInfo file in deletefileinfos
                        where file.LastWriteTime <= DateTime.Now.AddDays(-RetentionPeriod)
                        select file;

            //Deletes records older than retention period
            foreach (FileInfo currFile in query)
            {
                currFile.Delete();
            }
        }
        //Method that archives files processed
        private void archiveFiles(ArrayList myfileinfos, string directoryParentPath, string directoryArchivePath )
        {
            //Gets the database connection
            string gcsConn = (string)Dts.Connections["Jim_Test"].ConnectionString;
            IDbConnection connection = new OleDbConnection(gcsConn);
            DataContext context = new DataContext(connection);
            //Finds the sub folder
            string subjectArea = directoryParentPath.Substring(directoryParentPath.LastIndexOf("\\") + 1);
            //Set our table objects
            Table<CFG_LOG_FLAT_FILE> tblLog = context.GetTable<CFG_LOG_FLAT_FILE>();
            Table<CFG_FILE_DIRECTORY> tblDir = context.GetTable<CFG_FILE_DIRECTORY>();
            //Gets all the file names that exist in the parent directory
            //that have been loaded into the warehouse already
            var query = from  
                             FileInfo file in  myfileinfos join
                            stg in tblLog on file.Name equals stg.FLAT_FILE_NAME join
                              dir in tblDir on stg.FILE_DIR equals dir.FILE_DIR
                        where (dir.FILE_DIR.Contains(subjectArea))
                      && myfileinfos != null
                        select  file;

            //Archive each file that matches what has been logged
            foreach (FileInfo currFileInfo in query)
            {
                //If the file already exists we delete
                if (File.Exists(Path.Combine(directoryArchivePath, currFileInfo.Name)))
                {
                    File.Delete(currFileInfo.FullName);
                }
                else
                {
                    //If file condition returns true, and the file doesnt exist, move it to the archive folder
                    currFileInfo.MoveTo(Path.Combine(directoryArchivePath, currFileInfo.Name));
                }
            }
            

            }
        
       
        }

      
   
}

     Lets go through and explain some of this code. First of all we set our SSIS variables to local variables so we can use this in the code using this kind of syntax Dts.Variables["User::PERIOD"](for our SSIS variable PERIOD). Next we load up 2 ArrayLists with the files names from the directories. One for the parent path that are candidates to be moved to archive
//Array List to hold archive file names
            ArrayList movefileinfos = new ArrayList();
            //File directory
            DirectoryInfo moveDi = new DirectoryInfo(directoryParentPath);
 and the second with file names in the archive path that are candidates to be deleted. 
//Array List to hold delete file names
            ArrayList deletefileinfos = new ArrayList();
            //File directory
            DirectoryInfo archiveDi = new DirectoryInfo(directoryArchivePath);.

    We now call our 2 methods, one that deletes files that have a last written to date greater than or equal to the retention period
 //Calls the method to delete old files from archive that are beyond the rentention period
deleteFiles(deletefileinfos, directoryArchivePath, RetentionPeriod);,
 and one that archives the files we've processed in our data warehouse 
//Calls the method to archive processed files
archiveFiles(movefileinfos, directoryParentPath, directoryArchivePath);

     In our deleteFiles method we use this LINQ query to find all the files that have a last modified date greater than or equal to the retention range:
//Gets all the file names where the retention period has past
var query = from FileInfo file in deletefileinfos
where file.LastWriteTime <= DateTime.Now.AddDays(-RetentionPeriod)
 select file;
The results of which, are then deleted from the archive directory.

     In our archiveFiles method we use this LINQ query to find all the files in the parent path that have been logged in the CFG_LOG_FLAT_FILE database table:
//Gets all the file names that exist in the parent directory
//that have been loaded into the warehouse already
var query = from  FileInfo file in  myfileinfos join
           stg in tblLog on file.Name equals stg.FLAT_FILE_NAME join
            dir in tblDir on stg.FILE_DIR equals dir.FILE_DIR
            where (dir.FILE_DIR.Contains(subjectArea))
             && myfileinfos != null
                      
             select  file;
These files are then moved to the archive directory within the parent directory.

   Build, Save and lets run the package. When done, lets go back and look at our file directories to see if everything went as planned:

Figure 16. Package Results
     From what we see here all the files moved from their parent directories to their archive directories and the file that existed in the Finance\Archive directory, that had a last write date greater than the retention period range, has been deleted.

No comments:

Post a Comment