SSIS中数据流任务列表的程序化创建
本文关键字:程序化 创建 列表 任务 数据流 SSIS | 更新日期: 2023-09-27 18:20:13
我正在编写一些代码来生成自定义SSIS包。我使用了以下代码来生成PipelineTask组件:
private static List<string> getAccessTables()
{
List<string> tables = new List<string>();
OleDbConnection con = new OleDbConnection(AccessConnestionString);
con.Open();
OleDbCommand com = con.CreateCommand();
IEnumerable<DataRow> dtTables = con.GetSchema("tables").AsEnumerable();
tables = dtTables.Where(row => (!row[2].ToString().StartsWith("MSys")
&& !row[2].ToString().Contains('~'))).OrderBy(row => row[2].ToString()).Select(row => row[2].ToString()).ToList();
File.WriteAllLines(@"C:'TableList", tables);
return tables;
}
public static void BuildPackage(Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts)
{
var tables = getAccessTables();
var app = new Application();
Package package = new Package();
package.Name = "AccessToFlatFile";
// Add the SQL OLE-DB connection
ConnectionManager connectionManagerOleDb = package.Connections.Add("OLEDB");
connectionManagerOleDb.Name = "OLEDB";
connectionManagerOleDb.ConnectionString = AccessConnestionString;
// Add the Data Flow Task
int cnt = 64;
Executable tsk = null, prevTsk = null;
for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
{
if (i > 0)
{
prevTsk = tsk;
}
tsk = package.Executables.Add("STOCK:PipelineTask");
//tsk = package.Executables[i];
// Get the task host wrapper, and the Data Flow task
TaskHost taskHost = tsk as TaskHost;
MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject;
if (i > 0)
{
//prevTsk = package.Executables[i-1];
PrecedenceConstraint pcPipelineTask =
package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
}
foreach (var table in tables.Skip(cnt * i).Take(cnt))
{
// Add the Flat File DB connection, basic info only, will define add columns later
ConnectionManager connectionManagerFlatFile = package.Connections.Add("FLATFILE");
connectionManagerFlatFile.ConnectionString = @"G:'Dest'" + table + ".txt";
connectionManagerFlatFile.Name = "FlatFile_" + table;
connectionManagerFlatFile.Properties["Format"].SetValue(connectionManagerFlatFile, "Delimited");
connectionManagerFlatFile.Properties["ColumnNamesInFirstDataRow"].SetValue(connectionManagerFlatFile, true);
connectionManagerFlatFile.Properties["Unicode"].SetValue(connectionManagerFlatFile, true);
// Add OLE-DB source component
IDTSComponentMetaData100 componentSource = dataFlowTask.ComponentMetaDataCollection.New();
componentSource.Name = "OLEDBSource_" + table;
componentSource.ComponentClassID = app.PipelineComponentInfos["OLE DB Source"].CreationName;
// componentSource.ComponentClassID = "DTSAdapter.OleDbSource.3";
// Get OLE-DB source design-time instance, and initialise component
CManagedComponentWrapper instanceSource = componentSource.Instantiate();
instanceSource.ProvideComponentProperties();
// Set source connection
componentSource.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerOleDb.ID;
componentSource.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connectionManagerOleDb);
// Set the source properties
instanceSource.SetComponentProperty("AccessMode", 2);
instanceSource.SetComponentProperty("SqlCommand", "SELECT * FROM [" + table + "]");
// Reinitialize the metadata, refresh columns
instanceSource.AcquireConnections(null);
instanceSource.ReinitializeMetaData();
instanceSource.ReleaseConnections();
componentSource.Name = "OLEDBSource_" + table;
// Add Flat File destination
IDTSComponentMetaData100 componentDestination = dataFlowTask.ComponentMetaDataCollection.New();
componentDestination.Name = "FlatFileDestination_" + table;
componentDestination.ComponentClassID = app.PipelineComponentInfos["Flat File Destination"].CreationName;
// Get Flat File destination design-time instance, and initialise component
CManagedComponentWrapper instanceDestination = componentDestination.Instantiate();
instanceDestination.ProvideComponentProperties();
// Set destination connection
componentDestination.RuntimeConnectionCollection[0].ConnectionManagerID = connectionManagerFlatFile.ID;
componentDestination.RuntimeConnectionCollection[0].ConnectionManager =
DtsConvert.GetExtendedInterface(connectionManagerFlatFile);
IDTSPath100 path = dataFlowTask.PathCollection.New();
path.AttachPathAndPropagateNotifications(componentSource.OutputCollection[0],
componentDestination.InputCollection[0]);
// Get input and virtual input for destination to select and map columns
IDTSInput100 destinationInput = componentDestination.InputCollection[0];
IDTSVirtualInput100 destinationVirtualInput = destinationInput.GetVirtualInput();
IDTSVirtualInputColumnCollection100 destinationVirtualInputColumns =
destinationVirtualInput.VirtualInputColumnCollection;
// Get native flat file connection
RuntimeWrapper.IDTSConnectionManagerFlatFile100 connectionFlatFile =
connectionManagerFlatFile.InnerObject as RuntimeWrapper.IDTSConnectionManagerFlatFile100;
// Create flat file connection columns to match pipeline
int indexMax = destinationVirtualInputColumns.Count - 1;
for (int index = 0; index <= indexMax; index++)
{
// Get input column to replicate in flat file
IDTSVirtualInputColumn100 virtualInputColumn = destinationVirtualInputColumns[index];
// Add column to Flat File connection manager
RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100 flatFileColumn =
connectionFlatFile.Columns.Add() as RuntimeWrapper.IDTSConnectionManagerFlatFileColumn100;
flatFileColumn.ColumnType = "Delimited";
flatFileColumn.ColumnWidth = virtualInputColumn.Length;
flatFileColumn.DataPrecision = virtualInputColumn.Precision;
flatFileColumn.DataScale = virtualInputColumn.Scale;
flatFileColumn.DataType = virtualInputColumn.DataType;
RuntimeWrapper.IDTSName100 columnName = flatFileColumn as RuntimeWrapper.IDTSName100;
columnName.Name = virtualInputColumn.Name;
if (index < indexMax)
flatFileColumn.ColumnDelimiter = "||";
else
flatFileColumn.ColumnDelimiter = "~~";
}
// Reinitialize the metadata, generating external columns from flat file columns
instanceDestination.AcquireConnections(null);
instanceDestination.ReinitializeMetaData();
instanceDestination.ReleaseConnections();
// Select and map destination columns
foreach (IDTSVirtualInputColumn100 virtualInputColumn in destinationVirtualInputColumns)
{
// Select column, and retain new input column
IDTSInputColumn100 inputColumn = instanceDestination.SetUsageType(destinationInput.ID,
destinationVirtualInput, virtualInputColumn.LineageID, DTSUsageType.UT_READONLY);
// Find external column by name
IDTSExternalMetadataColumn100 externalColumn =
destinationInput.ExternalMetadataColumnCollection[inputColumn.Name];
// Map input column to external column
instanceDestination.MapInputColumn(destinationInput.ID, inputColumn.ID, externalColumn.ID);
}
}
app.SaveToXml(String.Format(@"G:'Dest'{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
package.Dispose();
}
}
}
上面的代码可以很好地创建许多DataFlow任务。但如果我们取消对两行注释的注释,应用程序的行为就会非常奇怪。但我想他们之间不会有任何区别。有人知道为什么要用软件包吗。可执行文件[i]和包。可执行文件[i-1]未生成预期结果?它与COM对象和初始化有关吗?
提前谢谢。
从逻辑上讲,代码试图将当前对象中的优先级约束添加到先前创建的对象中。
有效的代码通过保留对当前和以前对象的引用来实现这一点,而失败的代码则试图通过顺序位置来实现这。引用前面的序数很好,问题似乎与任务的创建和立即重新分配有关。假设它应该重新分配给自己,但根据我的观察,如果这是第一次,它就不起作用。
这是麻烦的语句tsk = package.Executables[i];
private static List<string> getAccessTables()
{
List<string> tables = new List<string>() {"A", "B", "C", "D", "E", "F", "G" };
return tables;
}
public static void BuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
{
var tables = getAccessTables();
var app = new Application();
Package package = new Package();
package.Name = "AccessToFlatFile";
// Add the Data Flow Task
int cnt = 1;
Executable tsk = null, prevTsk = null;
for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
{
if (i > 0)
{
prevTsk = tsk;
}
tsk = package.Executables.Add("STOCK:PipelineTask");
//tsk = package.Executables[i];
// Get the task host wrapper, and the Data Flow task
TaskHost taskHost = tsk as TaskHost;
taskHost.Name = string.Format("DFT {0}", tables[i]);
if (i > 0)
{
Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
//prevTsk = package.Executables[i-1];
PrecedenceConstraint pcPipelineTask =
package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
}
}
app.SaveToXml(String.Format(@"C:'Users'bfellows'documents'visual studio 2013'Projects'WTF'WTF'{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
package.Dispose();
Console.WriteLine();
AltBuildPackage();
}
public static void AltBuildPackage(/*Microsoft.SqlServer.Dts.Tasks.ScriptTask.ScriptObjectModel Dts*/)
{
var tables = getAccessTables();
var app = new Application();
Package package = new Package();
package.Name = "AccessToFlatFile";
// Add the Data Flow Task
int cnt = 1;
Executable tsk = null, prevTsk = null;
for (int i = 0; i < Math.Ceiling(tables.Count * 1.0 / cnt * 1.0); i++)
{
tsk = package.Executables.Add("STOCK:PipelineTask");
// This line is what is causing you pain. I don't know why
// Theory is that you're losing your reference
Console.WriteLine(string.Format("'tHash before {0}", tsk.GetHashCode()));
tsk = package.Executables[i];
Console.WriteLine(string.Format("'tHash after {0}", tsk.GetHashCode()));
// Get the task host wrapper, and the Data Flow task
TaskHost taskHost = tsk as TaskHost;
taskHost.Name = string.Format("DFT {0}", tables[i]);
// if the above tsk = assignment is delayed to this point
// the reassignment works fine.
tsk = package.Executables[i];
if (i > 0)
{
prevTsk = package.Executables[i - 1];
Console.WriteLine(string.Format("Linking {0:36} to {1:36}", (tsk as TaskHost).Name, (prevTsk as TaskHost).Name));
PrecedenceConstraint pcPipelineTask =
package.PrecedenceConstraints.Add((Executable)prevTsk, (Executable)tsk);
}
}
app.SaveToXml(String.Format(@"C:'Users'bfellows'documents'visual studio 2013'Projects'WTF'WTF'Alt{0}.dtsx", package.Name + tables.Count.ToString()), package, null);
package.Dispose();
}
输出
Linking DFT B to DFT A
Linking DFT C to DFT B
Linking DFT D to DFT C
Linking DFT E to DFT D
Linking DFT F to DFT E
Linking DFT G to DFT F
Hash before 94299808
Hash after 94299808
Hash before 94320648
Hash after 94299808
Linking DFT B to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}
Hash before 94346952
Hash after 94299808
Linking DFT C to {7E0B0C2B-7C69-4EDF-9EDE-8B2382B8221D}