什么是TaskCompletionSource(.NET)的Python等价物

本文关键字:NET Python 等价物 TaskCompletionSource 什么 | 更新日期: 2023-09-27 18:33:59

我正在尝试实现以下伪代码(部分灵感来自 .NET 中TaskCompletionSource<T>可能实现的代码(,其中目标是等待接收到特定事件以继续执行(或抛出TimeoutError(:

from axel import Event
def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
    if provider.State == ProviderState.Connected: return True
    if provider.State == ProviderState.Faulted: return False
    taskCompletion = TaskCompletionSource<bool>()
    def onStateChanged(sender, e: ProviderStateChangedEventArgs):
        if e.State == ProviderState.Connected:
            taskCompletion.TrySetResult(True)
        elif e.State == ProviderState.Faulted:
            taskCompletion.TrySetResult(False)
    provider.StateChanged += onStateChanged
    try:
        if provider.State == ProviderState.Connected: return True
        elif provider.State == ProviderState.Faulted: return False
        if not taskCompletion.Task.Wait(ms_timeout) or not taskCompletion.Task.Result or provider.State != ProviderState.Connected:
            return False
    finally:
        provider.StateChanged -= onStateChanged
    return True

什么是推荐的pythonic(或.网络式但与python兼容(实现这一目标的方法?

什么是TaskCompletionSource<T>(.NET)的Python等价物

python 等价物是 asyncio.Future

import asyncio
source = asyncio.Future()
async def await_concat(x):
    return x + await source

async def set_after_two_seconds():
    print("READY")
    await asyncio.sleep(2)
    print("SET")
    source.set_result("GO")
print(asyncio.get_event_loop().run_until_complete(asyncio.gather(
    await_concat("PASS "),
    set_after_two_seconds(),
)))
## prints
# READY
## (note: two second pause here)
# SET
# ['PASS GO', None]

我试图以asyncio的方式重写您的代码,但请注意:函数_wait_for_provider_up已更改为coroutine

from axel import Event
async def _wait_for_provider_up(self, data_provider: Provider, ms_timeout: int) -> bool:
    if provider.State == ProviderState.Connected: 
        return True
    if provider.State == ProviderState.Faulted: 
        return False
    taskCompletion = asyncio.Future()
    def onStateChanged(sender, e: ProviderStateChangedEventArgs):
        if e.State == ProviderState.Connected:
            taskCompletion.set_result(True)
        elif e.State == ProviderState.Faulted:
            taskCompletion.set_result(False)
    provider.StateChanged += onStateChanged
    try:
        if provider.State == ProviderState.Connected: 
            return True
        elif provider.State == ProviderState.Faulted: 
            return False
        try:
            result = await asyncio.wait_for(taskCompletion, timeout=ms_timeout/1000)
            if not result or provider.State != ProviderState.Connected:
                return False
        except asyncio.TimeoutError:
            return False  
    finally:
        provider.StateChanged -= onStateChanged
    return True

代码仍然不是很pythonic,但我希望你能明白我的意思。

假设提供程序是 python 线程。您可以使用 join(timeout( 方法,该方法等待线程竞争或超时。join(( 总是返回 None,所以要检查线程是否还活着,你需要使用 isAlive((。这是实现任务完成的一个选项

def taskCompletion(thread,timeout): 
   thread.join(timeout) 
   return isAlive()