SSIS中数据流任务列表的程序化创建

本文关键字:程序化 创建 列表 任务 数据流 SSIS | 更新日期: 2023-09-27 18:20:13

我正在编写一些代码来生成自定义SSIS包。我使用了以下代码来生成PipelineTask组件:

    private static List<string> getAccessTables()
    {
        List<string> tables = new List<string>();
        OleDbConnection con = new OleDbConnection(AccessConnestionString);
        con.Open();
        OleDbCommand com = con.CreateCommand();
        IEnumerable<DataRow> dtTables = con.GetSchema("tables").AsEnumerable();
        tables = dtTables.Where(row => (!row[2].ToString().StartsWith("MSys")
            && !row[2].ToString().Contains('~'))).OrderBy(row => row[2].ToString()).Select(row => row[2].ToString()).ToList();
        File.WriteAllLines(@"C:'TableList", tables);
        return tables;
    }


    public static void BuildPackage(Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts)
    {
        var tables = getAccessTables();
        var app = new Application();
        Package package = new Package();
        package.Name = "AccessToFlatFile";
        // Add the SQL OLE-DB connection
        ConnectionManager connectionManagerOleDb = package.Connections.Add("OLEDB");
        connectionManagerOleDb.Name = "OLEDB";
        connectionManagerOleDb.ConnectionString = AccessConnestionString;
        // Add the Data Flow Task 
        int cnt = 64;
        Executable tsk = null, prevTsk = null;
        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            if (i > 0)
            {
                prevTsk = tsk;
            }
            tsk = package.Executables.Add("STOCK:PipelineTask");
            //tsk = package.Executables[i];
            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject;
            if (i > 0)
            {
                //prevTsk = package.Executables[i-1];
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }
            foreach (var table in tables.Skip(cnt * i).Take(cnt))
            {
                // Add the Flat File DB connection, basic info only, will define add columns later
                ConnectionManager connectionManagerFlatFile = package.Connections.Add("FLATFILE");
                connectionManagerFlatFile.ConnectionString = @"G:'Dest'" + table + ".txt";
                connectionManagerFlatFile.Name = "FlatFile_" + table;
                connectionManagerFlatFile.Properties["Format"].SetValue(connectionManagerFlatFile, "Delimited");
                connectionManagerFlatFile.Properties["ColumnNamesInFirstDataRow"].SetValue(connectionManagerFlatFile, true);
                connectionManagerFlatFile.Properties["Unicode"].SetValue(connectionManagerFlatFile, true);

                // Add OLE-DB source component
                IDTSComponentMetaData100 componentSource = dataFlowTask.ComponentMetaDataCollection.New();
                componentSource.Name = "OLEDBSource_" + table;
                componentSource.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
                //            componentSource.ComponentClassID = "DTSAdapter.OleDbSource.3";
                // Get OLE-DB source design-time instance, and initialise component
                CManagedComponentWrapper instanceSource = componentSource.Instantiate();
                instanceSource.ProvideComponentProperties();
                // Set source connection
                componentSource.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerOleDb.ID;
                componentSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connectionManagerOleDb);
                // Set the source properties
                instanceSource.SetComponentProperty("AccessMode", 2);
                instanceSource.SetComponentProperty("SqlCommand", "SELECT * FROM [" + table + "]");
                // Reinitialize the metadata, refresh columns
                instanceSource.AcquireConnections(null);
                instanceSource.ReinitializeMetaData();
                instanceSource.ReleaseConnections();
                componentSource.Name = "OLEDBSource_" + table;

                // Add Flat File destination
                IDTSComponentMetaData100 componentDestination = dataFlowTask.ComponentMetaDataCollection.New();
                componentDestination.Name = "FlatFileDestination_" + table;
                componentDestination.ComponentClassID = app.PipelineComponentInfos["Flat File Destination"].CreationName;
                // Get Flat File destination design-time instance, and initialise component
                CManagedComponentWrapper instanceDestination = componentDestination.Instantiate();
                instanceDestination.ProvideComponentProperties();
                // Set destination connection
                componentDestination.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerFlatFile.ID;
                componentDestination.RuntimeConnectionCollection[0].ConnectionManager =
                    DtsConvert.GetExtendedInterface(connectionManagerFlatFile);
                IDTSPath100 path = dataFlowTask.PathCollection.New();
                path.AttachPathAndPropagateNotifications(componentSource.OutputCollection[0],
                    componentDestination.InputCollection[0]);

                // Get input and virtual input for destination to select and map columns
                IDTSInput100 destinationInput = componentDestination.InputCollection[0];
                IDTSVirtualInput100 destinationVirtualInput = destinationInput.GetVirtualInput();
                IDTSVirtualInputColumnCollection100 destinationVirtualInputColumns =
                    destinationVirtualInput.VirtualInputColumnCollection;
                // Get native flat file connection 
                RuntimeWrapper.IDTSConnectionManagerFlatFile100 connectionFlatFile =
                    connectionManagerFlatFile.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile100;
                // Create flat file connection columns to match pipeline
                int indexMax = destinationVirtualInputColumns.Count - 1;
                for (int index = 0; index <= indexMax; index++)
                {
                    // Get input column to replicate in flat file
                    IDTSVirtualInputColumn100 virtualInputColumn = destinationVirtualInputColumns[index];
                    // Add column to Flat File connection manager
                    RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100 flatFileColumn =
                        connectionFlatFile.Columns.Add() as RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100;
                    flatFileColumn.ColumnType = "Delimited";
                    flatFileColumn.ColumnWidth = virtualInputColumn.Length;
                    flatFileColumn.DataPrecision = virtualInputColumn.Precision;
                    flatFileColumn.DataScale = virtualInputColumn.Scale;
                    flatFileColumn.DataType = virtualInputColumn.DataType;
                    RuntimeWrapper.IDTSName100 columnName = flatFileColumn as RuntimeWrapper.IDTSName100;
                    columnName.Name = virtualInputColumn.Name;
                    if (index < indexMax)
                        flatFileColumn.ColumnDelimiter = "||";
                    else
                        flatFileColumn.ColumnDelimiter = "~~";
                }
                // Reinitialize the metadata, generating external columns from flat file columns
                instanceDestination.AcquireConnections(null);
                instanceDestination.ReinitializeMetaData();
                instanceDestination.ReleaseConnections();
                // Select and map destination columns
                foreach (IDTSVirtualInputColumn100 virtualInputColumn in destinationVirtualInputColumns)
                {
                    // Select column, and retain new input column
                    IDTSInputColumn100 inputColumn = instanceDestination.SetUsageType(destinationInput.ID,
                        destinationVirtualInput, virtualInputColumn.LineageID, DTSUsageType.UT_READONLY);
                    // Find external column by name
                    IDTSExternalMetadataColumn100 externalColumn =
                        destinationInput.ExternalMetadataColumnCollection[inputColumn.Name];
                    // Map input column to external column
                    instanceDestination.MapInputColumn(destinationInput.ID, inputColumn.ID, externalColumn.ID);
                }
            }

            app.SaveToXml(String.Format(@"G:'Dest'{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
            package.Dispose();
        }
    }
}

上面的代码可以很好地创建许多DataFlow任务。但如果我们取消对两行注释的注释,应用程序的行为就会非常奇怪。但我想他们之间不会有任何区别。有人知道为什么要用软件包吗。可执行文件[i]和包。可执行文件[i-1]未生成预期结果?它与COM对象和初始化有关吗?

提前谢谢。

SSIS中数据流任务列表的程序化创建

从逻辑上讲,代码试图将当前对象中的优先级约束添加到先前创建的对象中。

有效的代码通过保留对当前和以前对象的引用来实现这一点,而失败的代码则试图通过顺序位置来实现这。引用前面的序数很好,问题似乎与任务的创建和立即重新分配有关。假设它应该重新分配给自己,但根据我的观察,如果这是第一次,它就不起作用。

这是麻烦的语句tsk = package.Executables[i];

    private static List<string> getAccessTables()
    {
        List<string> tables = new List<string>() {"A", "B", "C", "D", "E", "F", "G" };
        return tables;
    }
    public static void BuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
    {
        var tables = getAccessTables();
        var app = new Application();
        Package package = new Package();
        package.Name = "AccessToFlatFile";
        // Add the Data Flow Task 
        int cnt = 1;
        Executable tsk = null, prevTsk = null;
        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            if (i > 0)
            {
                prevTsk = tsk;
            }
            tsk = package.Executables.Add("STOCK:PipelineTask");
            //tsk = package.Executables[i];
            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            taskHost.Name = string.Format("DFT {0}", tables[i]);
            if (i > 0)
            {
                Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
                //prevTsk = package.Executables[i-1];
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }
        }
        app.SaveToXml(String.Format(@"C:'Users'bfellows'documents'visual studio 2013'Projects'WTF'WTF'{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
        package.Dispose();
        Console.WriteLine();
        AltBuildPackage();
    }
    public static void AltBuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
    {
        var tables = getAccessTables();
        var app = new Application();
        Package package = new Package();
        package.Name = "AccessToFlatFile";
        // Add the Data Flow Task 
        int cnt = 1;
        Executable tsk = null, prevTsk = null;
        for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
        {
            tsk = package.Executables.Add("STOCK:PipelineTask");
            // This line is what is causing you pain. I don't know why
            // Theory is that you're losing your reference
            Console.WriteLine(string.Format("'tHash before {0}", tsk.GetHashCode()));
            tsk = package.Executables[i];
            Console.WriteLine(string.Format("'tHash after  {0}", tsk.GetHashCode()));

            // Get the task host wrapper, and the Data Flow task
            TaskHost taskHost = tsk as TaskHost;
            taskHost.Name = string.Format("DFT {0}", tables[i]);
            // if the above tsk = assignment is delayed to this point
            // the reassignment works fine.
            tsk = package.Executables[i];
            if (i > 0)
            {                    
                prevTsk = package.Executables[i - 1];
                Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
                PrecedenceConstraint pcPipelineTask =
                       package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
            }
        }
        app.SaveToXml(String.Format(@"C:'Users'bfellows'documents'visual studio 2013'Projects'WTF'WTF'Alt{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
        package.Dispose();
    }

输出

Linking DFT B to DFT A
Linking DFT C to DFT B
Linking DFT D to DFT C
Linking DFT E to DFT D
Linking DFT F to DFT E
Linking DFT G to DFT F
        Hash before 94299808
        Hash after  94299808
        Hash before 94320648
        Hash after  94299808
Linking DFT B to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}
        Hash before 94346952
        Hash after  94299808
Linking DFT C to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}