使用多个线程通过NetworkStream发送数据
本文关键字:NetworkStream 数据 线程 | 更新日期: 2023-09-27 17:59:40
我正在尝试构建一个命令行聊天室,服务器在其中处理连接,并将输入从一个客户端重复到所有其他客户端。目前,服务器能够接收来自多个客户端的输入,但只能单独将信息发送回这些客户端。我认为我的问题是每个连接都是在一个单独的线程上处理的。我将如何允许线程相互通信或能够向每个线程发送数据?
服务器代码:
namespace ConsoleApplication
{
class TcpHelper
{
private static object _lock = new object();
private static List<Task> _connections = new List<Task>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; } = false;
private static Task StartListener()
{
return Task.Run(async () =>
{
IPAddress address = IPAddress.Parse("127.0.0.1");
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine($"Server started. Listening to TCP clients at 127.0.0.1:{port}");
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
});
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
// start the new connection task
var connectionTask = HandleConnectionAsync(tcpClient);
// add it to the list of pending task
lock (_lock)
_connections.Add(connectionTask);
// catch all errors of HandleConnectionAsync
try
{
await connectionTask;
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
// remove pending task
lock (_lock)
_connections.Remove(connectionTask);
}
}
private static async Task HandleConnectionAsync(TcpClient client)
{
await Task.Yield();
{
using (var networkStream = client.GetStream())
{
if (client != null)
{
Console.WriteLine("Client connected. Waiting for data.");
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
string clientmessage = "";
string servermessage = "";
while (clientmessage != null && clientmessage != "quit")
{
clientmessage = await streamreader.ReadLineAsync();
Console.WriteLine(clientmessage);
servermessage = clientmessage;
streamwriter.WriteLine(servermessage);
streamwriter.Flush();
}
Console.WriteLine("Closing connection.");
networkStream.Dispose();
}
}
}
}
public static void Main(string[] args)
{
// Start the server
Console.WriteLine("Hit Ctrl-C to close the chat server");
TcpHelper.StartListener().Wait();
}
}
}
客户代码:
namespace Client2
{
public class Program
{
private static void clientConnect()
{
TcpClient socketForServer = new TcpClient();
bool status = true;
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
IPAddress address = IPAddress.Parse("127.0.0.1");
socketForServer.ConnectAsync(address, 5678);
Console.WriteLine("Connected to Server");
}
catch
{
Console.WriteLine("Failed to Connect to server{0}:999", "localhost");
return;
}
NetworkStream networkStream = socketForServer.GetStream();
StreamReader streamreader = new StreamReader(networkStream);
StreamWriter streamwriter = new StreamWriter(networkStream);
try
{
string clientmessage = "";
string servermessage = "";
while (status)
{
Console.Write(userName + ": ");
clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
status = false;
streamwriter.WriteLine("quit");
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.Flush();
}
if ((clientmessage != "quit") && (clientmessage != "quit"))
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
servermessage = streamreader.ReadLine();
Console.WriteLine("Server:" + servermessage);
}
}
}
catch
{
Console.WriteLine("Exception reading from the server");
}
streamreader.Dispose();
networkStream.Dispose();
streamwriter.Dispose();
}
public static void Main(string[] args)
{
clientConnect();
}
}
}
代码中的主要错误是没有尝试将从一个客户端接收的数据发送到其他连接的客户端。您的服务器中有_connections
列表,但列表中唯一存储的是连接的Task
对象,您甚至不需要对这些对象执行任何操作。
相反,您应该维护连接本身的列表,这样当您从一个客户端接收到消息时,您就可以将该消息重新发送给其他客户端。
至少,这应该是一个List<TcpClient>
,但因为您使用的是StreamReader
和StreamWriter
,所以您还需要初始化这些对象并将其存储在列表中。此外,您应该包括一个客户端标识符。一个明显的选择是客户端的名称(即用户输入的名称),但您的示例在聊天协议中没有提供任何机制来传输该标识作为连接初始化的一部分,因此在我的示例(下面)中,我只使用一个简单的整数值。
你发布的代码中还有一些其他违规行为,例如:
- 在一个全新的线程中启动一个任务,只是为了执行一些语句,使您达到启动异步操作的地步。在我的示例中,我只是省略了代码的
Task.Run()
部分,因为它不是必需的 - 为
IsFaulted
返回特定于连接的任务时进行检查。由于在返回此Task
对象时不太可能实际发生任何I/O,因此此逻辑几乎没有用处。对Wait()
的调用将引发一个异常,该异常将传播到主线程的Wait()
调用,从而终止服务器。但是,如果出现任何其他错误,您不会终止服务器,因此不清楚您为什么要在这里这样做 - 有一个对
Task.Yield()
的虚假调用。我不知道你想在那里完成什么,但不管是什么,那句话都没用。我只是把它取了下来 - 在客户端代码中,只有在发送数据后,才尝试从服务器接收数据。这是非常错误的;您希望客户端能够响应并在数据发送给他们时立即接收数据。在我的版本中,我包含了一个简单的小匿名方法,它会立即被调用,以启动一个单独的消息接收循环,该循环将与主用户输入循环异步并发执行
- 同样在客户端代码中,在导致服务器关闭连接的"退出"消息之后,您发送了"…has left…消息。这意味着服务器将永远不会真正接收到"…已离开…消息。我颠倒了消息的顺序,这样"退出"总是客户端发送的最后一个消息
我的版本是这样的:
服务器:
class TcpHelper
{
class ClientData : IDisposable
{
private static int _nextId;
public int ID { get; private set; }
public TcpClient Client { get; private set; }
public TextReader Reader { get; private set; }
public TextWriter Writer { get; private set; }
public ClientData(TcpClient client)
{
ID = _nextId++;
Client = client;
NetworkStream stream = client.GetStream();
Reader = new StreamReader(stream);
Writer = new StreamWriter(stream);
}
public void Dispose()
{
Writer.Close();
Reader.Close();
Client.Close();
}
}
private static readonly object _lock = new object();
private static readonly List<ClientData> _connections = new List<ClientData>();
private static TcpListener listener { get; set; }
private static bool accept { get; set; }
public static async Task StartListener()
{
IPAddress address = IPAddress.Any;
int port = 5678;
listener = new TcpListener(address, port);
listener.Start();
Console.WriteLine("Server started. Listening to TCP clients on port {0}", port);
while (true)
{
var tcpClient = await listener.AcceptTcpClientAsync();
Console.WriteLine("Client has connected");
var task = StartHandleConnectionAsync(tcpClient);
if (task.IsFaulted)
task.Wait();
}
}
// Register and handle the connection
private static async Task StartHandleConnectionAsync(TcpClient tcpClient)
{
ClientData clientData = new ClientData(tcpClient);
lock (_lock) _connections.Add(clientData);
// catch all errors of HandleConnectionAsync
try
{
await HandleConnectionAsync(clientData);
}
catch (Exception ex)
{
// log the error
Console.WriteLine(ex.ToString());
}
finally
{
lock (_lock) _connections.Remove(clientData);
clientData.Dispose();
}
}
private static async Task HandleConnectionAsync(ClientData clientData)
{
Console.WriteLine("Client connected. Waiting for data.");
string clientmessage;
while ((clientmessage = await clientData.Reader.ReadLineAsync()) != null && clientmessage != "quit")
{
string message = "From " + clientData.ID + ": " + clientmessage;
Console.WriteLine(message);
lock (_lock)
{
// Locking the entire operation ensures that a) none of the client objects
// are disposed before we can write to them, and b) all of the chat messages
// are received in the same order by all clients.
foreach (ClientData recipient in _connections.Where(r => r.ID != clientData.ID))
{
recipient.Writer.WriteLine(message);
recipient.Writer.Flush();
}
}
}
Console.WriteLine("Closing connection.");
}
}
客户:
class Program
{
private const int _kport = 5678;
private static async Task clientConnect()
{
IPAddress address = IPAddress.Loopback;
TcpClient socketForServer = new TcpClient();
string userName;
Console.Write("Input Username: ");
userName = Console.ReadLine();
try
{
await socketForServer.ConnectAsync(address, _kport);
Console.WriteLine("Connected to Server");
}
catch (Exception e)
{
Console.WriteLine("Failed to Connect to server {0}:{1}", address, _kport);
return;
}
using (NetworkStream networkStream = socketForServer.GetStream())
{
var readTask = ((Func<Task>)(async () =>
{
using (StreamReader reader = new StreamReader(networkStream))
{
string receivedText;
while ((receivedText = await reader.ReadLineAsync()) != null)
{
Console.WriteLine("Server:" + receivedText);
}
}
}))();
using (StreamWriter streamwriter = new StreamWriter(networkStream))
{
try
{
while (true)
{
Console.Write(userName + ": ");
string clientmessage = Console.ReadLine();
if ((clientmessage == "quit") || (clientmessage == "QUIT"))
{
streamwriter.WriteLine(userName + " has left the conversation");
streamwriter.WriteLine("quit");
streamwriter.Flush();
break;
}
else
{
streamwriter.WriteLine(userName + ": " + clientmessage);
streamwriter.Flush();
}
}
await readTask;
}
catch (Exception e)
{
Console.WriteLine("Exception writing to server: " + e);
throw;
}
}
}
}
public static void Main(string[] args)
{
clientConnect().Wait();
}
}
你还需要做很多工作。你可能想在服务器端实现聊天用户名的正确初始化。至少,对于真实世界的代码,您需要进行更多的错误检查,并确保可靠地生成客户端ID(如果您只想要正ID值,则在它回滚到0
之前,连接数不能超过2^31-1)。
我还做了一些其他并非绝对必要的小更改,例如使用IPAddress.Any
和IPAddress.Loopback
值而不是解析字符串,并且只是简单地简化和清理代码。此外,我现在没有使用C#6编译器,所以我更改了使用C#6功能的代码,使其使用C#5进行编译。
要做一个成熟的聊天服务器,你还有很多工作要做。但我希望以上内容能让你重回正轨。