如何从c#中将任务排队到芹菜
本文关键字:任务 排队 芹菜 | 更新日期: 2023-09-27 18:12:50
据我所知,像RabbitMQ这样的消息代理可以促进用不同语言/平台编写的不同应用程序相互通信。因此,由于celery可以使用RabbitMQ作为消息代理,我相信我们可以将来自任何应用程序的任务排队给celery,即使生产者不是用Python编写的。
现在我正试图弄清楚如何通过RabbitMQ从c#编写的应用程序中队列任务到芹菜。但是我还没有找到这样的例子。
我找到的唯一接近这个的信息是这个SO问题
其中接受的答案建议使用芹菜消息格式协议从Java队列消息到RabbitMQ。但是,答案中给出的链接没有任何示例,只有消息格式。
此外,消息格式说明在此协议中通信需要任务id (UUID)。我的c#应用程序应该如何知道芹菜任务的任务id ?据我所知,它只能知道任务名称,而不能知道任务id。
我不知道这个问题是否仍然相关,但希望答案能帮助别人。
下面是我如何成功地将一个任务队列分配给芹菜示例worker。
-
你需要在你的生产者(客户端)和RabbitMQ之间建立连接,如下所述。
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = username; factory.Password = password; factory.VirtualHost = virtualhost; factory.HostName = hostname; factory.Port = port; IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel();
在RabbitMQ默认配置中,只有Guest用户,只能用于本地连接(从127.0.0.1)。这个问题的答案解释了如何在RabbitMQ中定义用户。
-
下一步-创建回调来获取结果。本例使用的是Direct reply-to,所以应答侦听器看起来像:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var ansBody = ea.Body; var ansMessage = Encoding.UTF8.GetString(ansBody); Console.WriteLine(" [x] Received {0}", ansMessage); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
-
创建一个任务消息,芹菜将使用:
IDictionary<string, object> headers = new Dictionary<string, object>(); headers.Add("task", "tasks.add"); Guid id = Guid.NewGuid(); headers.Add("id", id.ToString()); IBasicProperties props = channel.CreateBasicProperties(); props.Headers = headers; props.CorrelationId = (string)headers["id"]; props.ContentEncoding = "utf-8"; props.ContentType = "application/json"; props.ReplyTo = "amq.rabbitmq.reply-to"; object[] taskArgs = new object[] { 1, 200 }; object[] arguments = new object[] { taskArgs, new object(), new object()}; MemoryStream stream = new MemoryStream(); DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[])); ser.WriteObject(stream, arguments); stream.Position = 0; StreamReader sr = new StreamReader(stream); string message = sr.ReadToEnd(); var body = Encoding.UTF8.GetBytes(message);
-
最后,将消息发布到RabbitMQ:
channel.BasicPublish(exchange: "", routingKey: "celery", basicProperties: props, body: body);
芹菜配花。Flower提供了一个REST API来管理任务。https://flower.readthedocs.io/en/latest/api.html文章——api-task-async-apply(+)。在大多数情况下,这比手动创建任务并将其插入MQ要简单和健壮得多。
根据这篇文章,芹菜。net客户端使用默认的。net框架自带的TaskScheduler。它知道如何为您的任务生成ID。本文还提供了一些示例。