在c#中执行并发任务

本文关键字:并发 任务 执行 | 更新日期: 2023-09-27 18:11:13

我有一个服务,需要尽快从Amazon SQS读取消息。我们预计会有很大的流量,我希望能够读取超过10K条消息/秒。不幸的是,我目前每秒大约有10条消息。很明显,我有工作要做。

这是我使用的(转换为控制台应用程序,使测试更容易):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;
    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;
    Console.ReadLine();
    timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}
public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS
    _concurrentRequests--;
}

我没有得到任何接近100个并发请求,并且它似乎没有每10毫秒触发OnTimedEvent

我不确定Timer在这里是否是正确的方法。我对这种编码没有太多经验。在这一点上,我愿意尝试任何方法。

感谢calebboyd,我离我的目标更近了一点。下面是一些非常糟糕的代码:

private static SemaphoreSlim _locker;
public static void Main(string[] args) {
    _manager = new MessageManager();
    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}
private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }
}

我能够接近每秒读取可观数量的消息,但问题是我的ProcessMessages呼叫永远不会完成(或者可能会在很长一段时间后)。我想我可能需要限制每次运行的线程数。

有什么建议我可以改进这个代码,使ProcessMessages有机会完成?

在c#中执行并发任务

因为你的MessageManager对象上的ProcessMessages方法不是等待的,我将假设它被绑定到执行它的同一个线程。仅仅将函数标记为async不会将工作传递给新线程。有了这个假设,这段代码实际上并不是用多个线程执行的。您可以使用下面的代码在更多的线程池中执行代码。

很可能manager对象不能处理并发使用。所以我在Task中创建它。运行λ。这也可能是昂贵的,因此不切实际的。

async Task RunBatchProcessingForeverAsync () {
    var lock = new SemaphoreSlim(initialCount: 10);
    while (true) {
        await lock.WaitAsync();
        Task.Run(() => {
            try {
                var manager = new MessageManager();
                manager.ProcessMessages();
            } finally {
                lock.Release();
            }
        });
    }
}

我有一段时间没有写c#了,但这应该会同时运行你的方法10次,反复地,永远地。

正如@calebboyd所建议的,你必须首先使你的线程异步。现在,如果你到这里在调用API时使用并发时,您将看到一个异步线程足以快速地池化网络资源。如果您能够在单个请求中从amazon获得多个消息,那么您的生产者线程(对amazon进行异步调用的线程)将会很好-它可以每秒发送数百个请求。这不会成为你的瓶颈。但是,处理接收到的数据的延续任务被交给线程池。这里您有机会遇到瓶颈—假设每秒到达100个响应,每个响应包含100条消息(以达到您的10K消息/秒近似值)。每秒你有100个新任务,每个任务都需要你的线程处理100个消息。现在有两种选择:(1)这些消息的处理不受CPU限制—您只需将它们发送到您的DB,或者(2)执行CPU消耗计算,例如科学计算、序列化或一些繁重的业务逻辑。如果(1)是您的情况,那么瓶颈将向后推到您的数据库。如果是(2),那么你别无选择,只能向上/向外扩展,或者优化计算。但是你的瓶颈可能不是生产线程——如果实现正确的话(参见上面的示例链接)。

我假定异步方法在线程池中排队,该线程池的线程数与可用处理器的线程数相同。您可能会生成100个请求,但它们仍然由8个线程执行。试着创建一个包含N个线程的数组,并使用它们