处理AKKA中运行在任务中的服务的事件.净的演员
本文关键字:事件 服务 任务 AKKA 运行 处理 | 更新日期: 2023-09-27 17:50:07
我在WPF应用程序中使用Prism框架。我有一个生产者服务,它在Task中运行,并在找到文件时引发CompositePresentationEvent。我有一个Akka演员订阅了这个事件。actor的处理程序看起来很简单,它看起来像这样:
private void HandleFileReceive(FileEventArgs args)
{
Self.Tell(new FileReceived(args.File));
}
当事件到达上面的处理程序时,我收到一个System。NotSupportedException伴有此消息:"没有活动的actor上下文,这很可能是由于使用了来自该actor内部的异步操作"。
我认为这是因为服务运行在与actor处理程序不同的线程中。有没有一种方法来处理这种类型的事情在Akka.NET?
我并不完全反对编写新的actor来完成我所需要的服务的工作。问题是,根据文件中的某些设置,服务会有所不同。目前,我正在使用MEF处理此问题,并从IoC容器获取给定接口的正确实现者。我想继续保持从核心代码(演员所在的地方)中抽象出制作人的具体实现。
关于通过这个(感知)线程问题和/或动态生成实现给定接口的ProducerActor的任何建议?
感谢- g
我有一个类似的问题,我想运行一个进程,从一个演员运行命令行应用程序。问题是我想通过处理process.OutputDataReceived
得到输出。
我最终做的是使用自定义堆栈来放置来自处理程序process.OutputDataReceived += (sender, e) => Output.Push(e.Data);
的消息
和自定义堆栈看起来像
class OutputStack<T> : Stack<T>
{
public event EventHandler OnAdd;
public void Push(T item)
{
base.Push(item);
if (null != OnAdd) OnAdd(this, null);
}
}
然后在构造函数中处理自定义堆栈的OnAdd
OutputStack<string> Output = new OutputStack<string>();
Output.OnAdd += (sender, e) =>
{
if (Output.Count > 0)
{
var message = Output.Pop();
actor.Tell(new LogActor.LogMessage(message));
}
};
这是一个小hacky,但它的工作,我得到发送消息一旦它发生(并得到处理)。希望将来我能解决这个问题。
我最终创建了一个参与者来处理响应检索,并更改了生产者接口,这样具体的生产者只需要实现一个简单的方法。旧的生产者在检查响应之前负责等待配置驱动的时间,但是新代码利用AKKA的调度器,以相同的配置驱动间隔告诉检索器何时检查响应。我在下面提供了一个示例,但是我的代码在错误处理等方面有点复杂。
在下面的代码中,您可以看到IProducer的接口和具体实现:LocalProducer和EmailProducer。在实际代码中,这些属性告诉容器一些用于从容器获得正确实现的其他信息。ConsumerActor是此场景的父参与者,并处理Consume消息。它在其构造函数中创建了一个ResponseRetrieverActor,并为ResponseRetrieverActor安排了一个重复的通知,以便检索器在给定的间隔内检查响应。
在ResponseRetrieverActor的retriveresponses处理程序中是IoC魔术发生的地方。我从工厂方法获得传输类型(这里没有显示——它实际上是在各种配置文件中设置的)。传输类型用于使用前面提到的具体生产者的属性从IoC容器中检索正确的生产者。最后,生产者被用来获取响应,并告诉父级(即消费者)列表中的每个文件。
public interface IProducer
{
List<string> GetResponses();
}
[Export(typeof(IProducer))] // Other attributes needed for MEF Export to differentiate the multiple IProducer implementations
public LocalProducer : IProducer
{
public List<string> GetResponse()
{
// get files from a directory
}
}
[Export(typeof(IProducer))]
public EmailProducer : IProducer
{
public List<string> GetResponse()
{
// get files from email account
}
}
public class ConsumerActor
{
public class Consume
{
public Consume(string file) { this.File = file; }
public string File { get; set; }
}
public ConsumerActor()
{
_retriever = Context.ActorOf(Props.Create<ResponseRetrieverActor>(), "retriever");
var interval = 10000;
Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(0, interval, _retriever, new ResponseRetrieverActor.RetrieveResponses(), Self);
Start();
}
private void Start()
{
Receive<Consume>(msg =>
{
// do something with the msg.File
});
}
private IActorRef _retriever;
}
public class ResponseRetrieverActor
{
public class RetrieveResponses { }
public ConsumerActor() { Start(); }
private void Start()
{
Receive<RetrieveResponses>(msg => HandleRetrieveResponses());
}
private void HandleRetrieveResponses()
{
var transportType = TransportFactory.GetTransportType(); // Gets transport protocol for the producer we need to use (Email, File, ect.)
var producer = ServiceLocator.Current.GetInstance<IProducer>(transportType); // Gets a producer from the IoC container for the specified transportType
var responses = producer.GetResponses();
foreach(var response in responseFiles)
{
Context.Parent.Tell(new ConsumerActor.Consume(response));
}
}
}
不确定这是否解决了OPs问题,但是当我搜索"没有活动的ActorContext"时,这很可能是由于使用async"我到了这里,所以我把问题的简单解决方案放在这里:
如果你在某个处理程序中等待某个异步调用,该处理程序必须是异步的。如果ReceiveAsync<>发生上述异常,而不是Receive<>。ReceiveAsync<比;在异步处理程序未完成时挂起参与者的邮箱。>