与.NET中的TPL并发.Parallel Generator在一段时间后阻塞并陷入困境
本文关键字:一段时间 陷入困境 Generator 中的 NET TPL 并发 Parallel | 更新日期: 2023-09-27 18:16:05
我正在CodeGolf.stackexchange.com上运行一个游戏,玩家可以在游戏中提交机器人进行竞争。
在这个阶段,有70个机器人,在(N*(N+1((/2场比赛中,比赛运行得很慢,所以现在我想将其并行化。游戏规则之一是机器人可以写入自己的数据目录,所以我想确保我没有一个机器人实例同时玩两场比赛。
我已经编写了一个IEnuemable<T>
生成器来返回有效的匹配(有效的情况是两个玩家当前都没有参与另一个匹配(,但我遇到了某种并发/阻塞问题,这会导致Enumerable循环无限。
在任意时刻,while(any matches)
调用将继续循环,因为剩下的比赛只涉及activePlayers
列表中的玩家,所以它将继续循环。这意味着比赛正在进行中。
但是IsCompleted
事件再也不会被调用了,所以运行中的任何匹配都会以某种方式被阻止,或者它们已经完成,但我在_activePlayers.TryRemove()
代码中遇到了并发错误。
public interface IMatchGenerator
{
void CompleteMatch(Match match);
}
public class MatchGenerator : IMatchGenerator
{
private static object SYNC_LOCK = new object();
private ConcurrentDictionary<Player, Player> _activePlayers; //tracks players actively playing
private ConcurrentQueue<Match> _allMatches;
public MatchGenerator()
{
_activePlayers = new ConcurrentDictionary<Player, Player>();
}
public IEnumerable<Match> Generate(IList<Match> matches)
{
//take the list of matches passed in and stick them in a queue.
_allMatches = new ConcurrentQueue<Match>(matches);
//keep looping while there are matches to play
while (_allMatches.Any())
{
Match nextMatch;
lock (SYNC_LOCK)
{
_allMatches.TryDequeue(out nextMatch); //Grab from front of queue
if (!_activePlayers.ContainsKey(nextMatch.Player1) &&
!_activePlayers.ContainsKey(nextMatch.Player2))
{
//If neither player is in the active player list, then this is a
//good match so add both players
_activePlayers.TryAdd(nextMatch.Player1, nextMatch.Player1);
_activePlayers.TryAdd(nextMatch.Player2, nextMatch.Player2);
}
else
{
//Otherwise push this match back in to the start of the queue...
//FIFO should move on to next;
_allMatches.Enqueue(nextMatch);
nextMatch = null;
}
}
if (nextMatch != null)
yield return nextMatch;
}
}
public void CompleteMatch(Match match)
{
//Matches in progress have the generator attached to them and will call
//home when they are complete to remove players from the active list
Player junk1, junk2;
lock (SYNC_LOCK)
{
_activePlayers.TryRemove(match.Player1, out junk1);
_activePlayers.TryRemove(match.Player2, out junk2);
}
if (junk1 == null || junk2 == null)
{
Debug.WriteLine("Uhoh! a match came in for completion but on of the players who should have been in the active list didn't get removed");
}
}
}
以及使用这个的代码。
var mg = new MatchGenerator();
//Code to generate IList<Match> or all player combinations and attach mg
Parallel.ForEach(mg.Generate(matches),
new ParallelOptions() {MaxDegreeOfParallelism = 8},
match =>
{
var localMatch = match;
try
{
PlayMatch(localMatch, gameLogDirectory, results);
}
finally
{
localMatch.IsCompleted();
}
});
从这里开始,它变得有点冗长,但并没有太多的事情发生。PlayMatch(...)
调用一个方法Play
,其中包含一些字符串生成器代码。Plays
根据正在播放的机器人程序调用几个外部进程(如ruby/python等(。它还流式写入每个玩家的日志文件,但假设一次只有一个玩家机器人程序在运行,这里应该会有任何冲突。
整个Control程序在GitHub@上可用
https://github.com/eoincampbell/big-bang-game/blob/master/BigBang.Orchestrator/Program.cs
public static Result Play(Player p1, Player p2, string gameLogDirectory)
{
var dir = Player.PlayerDirectory;
var result = new Result() { P1 = p1, P2 = p2, P1Score = 0, P2Score = 0 };
string player1ParamList = string.Empty, player2ParamList = string.Empty;
List<long> p1Times = new List<long>(), p2Times = new List<long>();
Stopwatch sw1 = new Stopwatch(), sw2 = new Stopwatch(), swGame = new Stopwatch();
var sb = new StringBuilder();
var proc = new Process
{
StartInfo =
{
UseShellExecute = false, RedirectStandardOutput = true, WorkingDirectory = dir
}
};
swGame.Start();
sb.AppendLine("+--------------------------------------------------------------------------------------------+");
sb.AppendFormat("| Starting Game between {0} & {1} 'n", p1.Name, p2.Name);
sb.AppendLine("| ");
for (var i = 0; i < 1; i++)
{
sw1.Reset();
sw1.Start();
var o1 = ProcessRunner.RunPlayerProcess(ref proc, player1ParamList, player2ParamList, p1, dir);
sw1.Stop();
p1Times.Add(sw1.ElapsedMilliseconds);
//System.Threading.Thread.Sleep(1);
sw2.Reset();
sw2.Start();
var o2 = ProcessRunner.RunPlayerProcess(ref proc, player2ParamList, player1ParamList, p2, dir);
sw2.Stop();
p2Times.Add(sw2.ElapsedMilliseconds);
var whoWon = GetWinner(o1, o2, ref player1ParamList, ref player2ParamList);
var whoWonMessage = "Draw Match";
if (whoWon == "P1")
{
result.P1Score++;
whoWonMessage = string.Format("{0} wins", p1.Name);
}
else if (whoWon == "P2")
{
result.P2Score++;
whoWonMessage = string.Format("{0} wins", p2.Name);
}
sb.AppendFormat("| {0} plays {1} | {2} plays {3} | {4}'n", p1.Name, o1, p2.Name, o2, whoWonMessage);
}
swGame.Stop();
sb.AppendLine("| ");
sb.AppendFormat("| Game Time: {0}", swGame.Elapsed);
result.WriteLine(sb.ToString());
var resultMessage = string.Format("Result: {0} vs {1}: {2} - {3}",
result.P1,
result.P2,
result.P1Score,
result.P2Score);
sb.AppendLine("| ");
sb.AppendFormat("| {0}", resultMessage);
using (var p1sw = new StreamWriter(Path.Combine(gameLogDirectory, p1.Name + ".log"), true))
{
p1sw.WriteLine(sb.ToString());
}
using (var p2sw = new StreamWriter(Path.Combine(gameLogDirectory, p2.Name + ".log"), true))
{
p2sw.WriteLine(sb.ToString());
}
result.P1AvgTimeMs = p1Times.Average();
result.P2AvgTimeMs = p2Times.Average();
return result;
}
我认为您的问题是由在IEnumerable<T>
上使用Parallel.ForEach()
引起的,这可能需要无限长的时间才能生成下一个元素。这与在BlockingCollection.GetConsumingEnumerable()
:上使用Parallel.ForEach()
基本上是相同的问题
Parallel.ForEach和PLINQ默认使用的分区算法都使用分块,以最大限度地降低同步成本:它不会为每个元素获取一次锁,而是获取锁,获取一组元素(一个块(,然后释放锁。
但是,由于在您的情况下,序列中的下一个元素直到前一个元素被处理后才会产生,这将导致不确定的阻塞。
我认为这里的正确解决方案是使用BlockingCollection<T>
而不是yield return
:用blockingCollection.Add(nextMatch)
替换yield return nextMatch
,然后在单独的线程上运行Generate()
,并在Parallel.ForEach()
中使用上面提到的博客文章中的blockingCollection.GetConsumingPartitioner()
。
我也不喜欢Generate()
浪费了整个CPU核心,在没有有效匹配的情况下基本上什么都不做,但这是一个单独的问题。