限制C#中并行线程的数量
本文关键字:线程 并行 限制 | 更新日期: 2023-09-27 18:20:25
我正在编写一个C#程序,通过FTP生成并上传50万个文件。我想并行处理4个文件,因为机器有4个内核,文件生成需要更长的时间。是否可以将以下Powershell示例转换为C#?或者有没有更好的框架,比如C#中的Actor框架(比如F#MailboxProcessor)?
Powershell示例
$maxConcurrentJobs = 3;
# Read the input and queue it up
$jobInput = get-content .'input.txt
$queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) )
foreach($item in $jobInput)
{
$queue.Enqueue($item)
}
# Function that pops input off the queue and starts a job with it
function RunJobFromQueue
{
if( $queue.Count -gt 0)
{
$j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue()
Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null
}
}
# Start up to the max number of concurrent jobs
# Each job will take care of running the rest
for( $i = 0; $i -lt $maxConcurrentJobs; $i++ )
{
RunJobFromQueue
}
更新:
与远程FTP服务器的连接可能很慢,所以我想限制FTP上传处理。
假设您是用TPL构建的,您可以将ParallelOptions.MaxDegreesOfParallelism设置为您想要的任何值。
并行。用于代码示例。
任务并行库是您的朋友。请参阅此链接,该链接描述了您可以使用的内容。基本上,框架4将这些基本上是后台线程池线程优化为运行机器上的处理器数量。
也许是这样的:
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
然后在你的循环中,类似于:
Parallel.Invoke(options,
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"),
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
如果使用.Net 4.0,则可以使用并行库
假设你正在遍历50万个文件,你可以使用parallel Foreach"并行"迭代,或者你可以看看PLinq这里是两个之间的比较
从本质上讲,您需要为每个要上传的文件创建一个Action或Task,将它们放在List中,然后处理该列表,限制可以并行处理的数量。
我的博客文章展示了如何使用Tasks和Actions来实现这一点,并提供了一个示例项目,您可以下载并运行它来查看两者的运行情况。
带操作
如果使用Actions,则可以使用内置的.Net Parallel.Invoke函数。在这里,我们将它限制为最多并行运行4个线程。
var listOfActions = new List<Action>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => UploadFile(localFile)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 4};
Parallel.Invoke(options, listOfActions.ToArray());
不过,这个选项不支持异步,我假设你的FileUpload函数会支持异步,所以你可能想使用下面的Task示例。
带任务
Tasks没有内置功能。但是,你可以使用我在博客上提供的。
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
然后创建你的任务列表,并调用函数让它们运行,比如说一次最多同时运行4个,你可以这样做:
var listOfTasks = new List<Task>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await UploadFile(localFile)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);
此外,由于此方法支持异步,因此不会像使用Parallel.Invoke或Parallel.ForEach那样阻塞UI线程。
我已经编写了以下技术,其中我使用BlockingCollection作为线程计数管理器。实现和处理该作业非常简单。它只接受Task对象,并向阻塞列表中添加一个整数值,使运行线程数增加1。当线程完成时,它将对象排成队列,并为即将到来的任务释放添加操作时的块。
public class BlockingTaskQueue
{
private BlockingCollection<int> threadManager { get; set; } = null;
public bool IsWorking
{
get
{
return threadManager.Count > 0 ? true : false;
}
}
public BlockingTaskQueue(int maxThread)
{
threadManager = new BlockingCollection<int>(maxThread);
}
public async Task AddTask(Task task)
{
Task.Run(() =>
{
Run(task);
});
}
private bool Run(Task task)
{
try
{
threadManager.Add(1);
task.Start();
task.Wait();
return true;
}
catch (Exception ex)
{
return false;
}
finally
{
threadManager.Take();
}
}
}