连接到现有的工作者/任务
本文关键字:工作者 任务 连接 | 更新日期: 2023-09-27 18:03:41
我想就如何解决我遇到的一个问题征求一些意见。
假设我有一个服务器和一个客户端。
客户端A连接并请求服务器生成一个从1到9999的新线程计数。我们把它命名为线程a。
客户端B连接并要求服务器生成另一个线程,从9999倒数到1。我们称它为线程b。
线程A向调用客户端发送当前计数(i)的更新。e线程A更新客户A,线程B更新客户B).
客户端A然后断开连接并重新连接-我的问题是,我如何"重新订阅"当前线程并继续获得更新?
是否有任何现有的库来帮助完成这些任务?
更新:如果有帮助的话,再澄清一点:
这些服务器线程将在后台持续运行。客户端将连接到服务器并启动一个任务,该任务将向客户端发送消息。用户知道任务已经启动,因为他们的客户端正在接收消息。用户断开客户端连接,但任务仍在后台处理。用户决定检查任务,因此他们再次启动客户机并连接到服务器。最后一句话是我想要达到的效果。我很抱歉没有说清楚。
要解决这个问题,试试这个:
断开连接时给服务器一个通知使用GUID进行"身份验证",该GUID在第一次连接和每次重新连接时创建并发送到服务器。
服务器获取GUID,将它们保存到一个列表中,并在每个连接中查看服务器获取GUID,查找现有条目,如果有条目,则知道客户端已经连接。如果没有,那么客户端还没有连接,所以服务器必须创建一个新的线程。
可以使用其他唯一ID(如MAC、硬件校验和等)代替GUID
这是一个抽象的例子。由于问题没有指定具体的API,我将使用虚构的名称和类。
为了识别客户端,我们将使用一些唯一的标识令牌,我们称之为ClientToken
。令牌将在第一个客户端连接上分配。它将被返回给客户端进行进一步的身份验证,它也将被服务器用于处理任务跟踪。
跟踪部分是某种从令牌到任务数据的字典。任务数据是一对任务及其消息队列:
private readonly ConcurrentDictionary<ClientToken, TaskData> m_tasks;
class TaskData
{
public Task ProcessingTask { get; set; }
public BlockingCollection<Message> TaskMessages { get; } =
new BlockingCollection<Message>();
}
假设ClientA以任何方式连接到服务器。处理连接的部分如下所示:
//this is a kind of event handler, we should'n 'await' on it
async void OnClientConnectedAsync(Client client)
{
var token = client.GetToken();
TaskData taskData;
if (token != null && m_tasks.TryGetValue(token, out taskData))
{
await ProcessMessagesAsync(taskData);
}
else
{
taskData = new TaskData();
token = GenerateUniqueTokenBasedOnTheClientParameters(client);
await client.SetTokenAsync(token);
if (m_tasks.TryAdd(token, taskData))
{
taskData.ProcessingTask =
InitProcessingTask(
taskData.Messages, //will be used by the
client);
await ProcessMessagesAsync(taskData);
}
}
}
async Task ProcessMessagesAsync(TaskData taskData)
{
while (client.IsConnected)
{
var message = taskData.Messages.Take();
await client.SendAsync(message);
}
}
你应该决定Messages
容量和它的项目寿命