处理AKKA中运行在任务中的服务的事件.净的演员

本文关键字:事件 服务 任务 AKKA 运行 处理 | 更新日期: 2023-09-27 17:50:07

我在WPF应用程序中使用Prism框架。我有一个生产者服务,它在Task中运行,并在找到文件时引发CompositePresentationEvent。我有一个Akka演员订阅了这个事件。actor的处理程序看起来很简单,它看起来像这样:

private void HandleFileReceive(FileEventArgs args)
{
    Self.Tell(new FileReceived(args.File));
}

当事件到达上面的处理程序时,我收到一个System。NotSupportedException伴有此消息:"没有活动的actor上下文,这很可能是由于使用了来自该actor内部的异步操作"。

我认为这是因为服务运行在与actor处理程序不同的线程中。有没有一种方法来处理这种类型的事情在Akka.NET?

我并不完全反对编写新的actor来完成我所需要的服务的工作。问题是,根据文件中的某些设置,服务会有所不同。目前,我正在使用MEF处理此问题,并从IoC容器获取给定接口的正确实现者。我想继续保持从核心代码(演员所在的地方)中抽象出制作人的具体实现。

关于通过这个(感知)线程问题和/或动态生成实现给定接口的ProducerActor的任何建议?

感谢

- g

处理AKKA中运行在任务中的服务的事件.净的演员

我有一个类似的问题,我想运行一个进程,从一个演员运行命令行应用程序。问题是我想通过处理process.OutputDataReceived得到输出。

我最终做的是使用自定义堆栈来放置来自处理程序process.OutputDataReceived += (sender, e) => Output.Push(e.Data);的消息

和自定义堆栈看起来像

class OutputStack<T> : Stack<T>
{
   public event EventHandler OnAdd;
   public void Push(T item)
   {
       base.Push(item);
       if (null != OnAdd) OnAdd(this, null);
   }
} 

然后在构造函数中处理自定义堆栈的OnAdd

OutputStack<string> Output = new OutputStack<string>();
Output.OnAdd += (sender, e) =>
{
    if (Output.Count > 0)
    {
        var message = Output.Pop();
        actor.Tell(new LogActor.LogMessage(message));
    }
};

这是一个小hacky,但它的工作,我得到发送消息一旦它发生(并得到处理)。希望将来我能解决这个问题。

我最终创建了一个参与者来处理响应检索,并更改了生产者接口,这样具体的生产者只需要实现一个简单的方法。旧的生产者在检查响应之前负责等待配置驱动的时间,但是新代码利用AKKA的调度器,以相同的配置驱动间隔告诉检索器何时检查响应。我在下面提供了一个示例,但是我的代码在错误处理等方面有点复杂。

在下面的代码中,您可以看到IProducer的接口和具体实现:LocalProducer和EmailProducer。在实际代码中,这些属性告诉容器一些用于从容器获得正确实现的其他信息。ConsumerActor是此场景的父参与者,并处理Consume消息。它在其构造函数中创建了一个ResponseRetrieverActor,并为ResponseRetrieverActor安排了一个重复的通知,以便检索器在给定的间隔内检查响应。

在ResponseRetrieverActor的retriveresponses处理程序中是IoC魔术发生的地方。我从工厂方法获得传输类型(这里没有显示——它实际上是在各种配置文件中设置的)。传输类型用于使用前面提到的具体生产者的属性从IoC容器中检索正确的生产者。最后,生产者被用来获取响应,并告诉父级(即消费者)列表中的每个文件。

public interface IProducer
{
   List<string> GetResponses();
}
[Export(typeof(IProducer))] // Other attributes needed for MEF Export to differentiate the multiple IProducer implementations
public LocalProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from a directory
   }
}
[Export(typeof(IProducer))]
public EmailProducer : IProducer
{
   public List<string> GetResponse()
   {
       // get files from email account
   }
}
public class ConsumerActor
{
    public class Consume
    {
       public Consume(string file) { this.File = file; }
       public string File { get; set; }
    }
    public ConsumerActor() 
    {
       _retriever = Context.ActorOf(Props.Create<ResponseRetrieverActor>(), "retriever");
       var interval = 10000;
       Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, interval, _retriever, new ResponseRetrieverActor.RetrieveResponses(), Self);
       Start(); 
    }
    private void Start()
    {
        Receive<Consume>(msg => 
        {
           // do something with the msg.File 
        });
    }
    private IActorRef _retriever;
}
public class ResponseRetrieverActor
{
  public class RetrieveResponses { }
  public ConsumerActor() { Start(); }
  private void Start()
  {
    Receive<RetrieveResponses>(msg => HandleRetrieveResponses());  
  }
  private void HandleRetrieveResponses()
  {
     var transportType = TransportFactory.GetTransportType(); // Gets transport protocol for the producer we need to use (Email, File, ect.)
     var producer = ServiceLocator.Current.GetInstance<IProducer>(transportType); // Gets a producer from the IoC container for the specified transportType
     var responses = producer.GetResponses();
     foreach(var response in responseFiles)
     {
         Context.Parent.Tell(new ConsumerActor.Consume(response));
     }
  }
}

不确定这是否解决了OPs问题,但是当我搜索"没有活动的ActorContext"时,这很可能是由于使用async"我到了这里,所以我把问题的简单解决方案放在这里:

如果你在某个处理程序中等待某个异步调用,该处理程序必须是异步的。如果ReceiveAsync<>发生上述异常,而不是Receive<>。ReceiveAsync<比;在异步处理程序未完成时挂起参与者的邮箱。>