与.NET中的TPL并发.Parallel Generator在一段时间后阻塞并陷入困境

本文关键字:一段时间 陷入困境 Generator 中的 NET TPL 并发 Parallel | 更新日期: 2023-09-27 18:16:05

我正在CodeGolf.stackexchange.com上运行一个游戏,玩家可以在游戏中提交机器人进行竞争。

在这个阶段,有70个机器人,在(N*(N+1((/2场比赛中,比赛运行得很慢,所以现在我想将其并行化。游戏规则之一是机器人可以写入自己的数据目录,所以我想确保我没有一个机器人实例同时玩两场比赛。

我已经编写了一个IEnuemable<T>生成器来返回有效的匹配(有效的情况是两个玩家当前都没有参与另一个匹配(,但我遇到了某种并发/阻塞问题,这会导致Enumerable循环无限。

在任意时刻,while(any matches)调用将继续循环,因为剩下的比赛只涉及activePlayers列表中的玩家,所以它将继续循环。这意味着比赛正在进行中。

但是IsCompleted事件再也不会被调用了,所以运行中的任何匹配都会以某种方式被阻止,或者它们已经完成,但我在_activePlayers.TryRemove()代码中遇到了并发错误。

public interface IMatchGenerator
{
    void CompleteMatch(Match match);
}
public class MatchGenerator : IMatchGenerator
{
    private static object SYNC_LOCK = new object();
    private ConcurrentDictionary<Player, Player> _activePlayers; //tracks players actively playing
    private ConcurrentQueue<Match> _allMatches;
    public MatchGenerator()
    {
        _activePlayers = new ConcurrentDictionary<Player, Player>();
    }
    public IEnumerable<Match> Generate(IList<Match> matches)
    {
        //take the list of matches passed in and stick them in a queue.
        _allMatches = new ConcurrentQueue<Match>(matches);
        //keep looping while there are matches to play
        while (_allMatches.Any())
        {
            Match nextMatch;
            lock (SYNC_LOCK)
            {
                 _allMatches.TryDequeue(out nextMatch); //Grab from front of queue
                if (!_activePlayers.ContainsKey(nextMatch.Player1) &&
                     !_activePlayers.ContainsKey(nextMatch.Player2))
                {
                    //If neither player is in the active player list, then this is a
                    //good match so add both players
                    _activePlayers.TryAdd(nextMatch.Player1, nextMatch.Player1);
                    _activePlayers.TryAdd(nextMatch.Player2, nextMatch.Player2);
                }
                else
                {
                    //Otherwise push this match back in to the start of the queue...
                    //FIFO should move on to next;
                    _allMatches.Enqueue(nextMatch);
                    nextMatch = null;
                }
            }
            if (nextMatch != null)
                yield return nextMatch;
        }
    }
    public void CompleteMatch(Match match)
    {
        //Matches in progress have the generator attached to them and will call 
        //home when they are complete to remove players from the active list
        Player junk1, junk2;
        lock (SYNC_LOCK)
        {
            _activePlayers.TryRemove(match.Player1, out junk1);
            _activePlayers.TryRemove(match.Player2, out junk2);
        }
        if (junk1 == null || junk2 == null)
        {
            Debug.WriteLine("Uhoh! a match came in for completion but on of the players who should have been in the active list didn't get removed");
        }
    }
}

以及使用这个的代码。

var mg = new MatchGenerator();
//Code to generate IList<Match> or all player combinations and attach mg
Parallel.ForEach(mg.Generate(matches),
                    new ParallelOptions() {MaxDegreeOfParallelism = 8},
                    match =>
                    {
                        var localMatch = match;
                        try
                        {
                            PlayMatch(localMatch, gameLogDirectory, results);
                        }
                        finally
                        {
                            localMatch.IsCompleted();
                        }
                    });

从这里开始,它变得有点冗长,但并没有太多的事情发生。PlayMatch(...)调用一个方法Play,其中包含一些字符串生成器代码。Plays根据正在播放的机器人程序调用几个外部进程(如ruby/python等(。它还流式写入每个玩家的日志文件,但假设一次只有一个玩家机器人程序在运行,这里应该会有任何冲突。

整个Control程序在GitHub@上可用

https://github.com/eoincampbell/big-bang-game/blob/master/BigBang.Orchestrator/Program.cs

    public static Result Play(Player p1, Player p2, string gameLogDirectory)
    {
        var dir = Player.PlayerDirectory;
        var result = new Result() { P1 = p1, P2 = p2, P1Score = 0, P2Score = 0 };
        string player1ParamList = string.Empty, player2ParamList = string.Empty;
        List<long> p1Times = new List<long>(), p2Times = new List<long>();
        Stopwatch sw1 = new Stopwatch(), sw2 = new Stopwatch(), swGame = new Stopwatch();
        var sb = new StringBuilder();
        var proc = new Process
        {
            StartInfo =
            {
                UseShellExecute = false, RedirectStandardOutput = true, WorkingDirectory = dir
            }
        };
        swGame.Start();
        sb.AppendLine("+--------------------------------------------------------------------------------------------+");
        sb.AppendFormat("| Starting Game between {0} & {1} 'n", p1.Name, p2.Name);
        sb.AppendLine("| ");
        for (var i = 0; i < 1; i++)
        {
            sw1.Reset();
            sw1.Start();
            var o1 = ProcessRunner.RunPlayerProcess(ref proc, player1ParamList, player2ParamList, p1, dir);
            sw1.Stop();
            p1Times.Add(sw1.ElapsedMilliseconds);
            //System.Threading.Thread.Sleep(1);
            sw2.Reset();
            sw2.Start();
            var o2 = ProcessRunner.RunPlayerProcess(ref proc, player2ParamList, player1ParamList, p2, dir);
            sw2.Stop();
            p2Times.Add(sw2.ElapsedMilliseconds);

            var whoWon = GetWinner(o1, o2, ref player1ParamList, ref player2ParamList);
            var whoWonMessage = "Draw Match";
            if (whoWon == "P1")
            {
                result.P1Score++;
                whoWonMessage = string.Format("{0} wins", p1.Name);
            }
            else if (whoWon == "P2")
            {
                result.P2Score++;
                whoWonMessage = string.Format("{0} wins", p2.Name);
            }
            sb.AppendFormat("| {0} plays {1} | {2} plays {3} | {4}'n", p1.Name, o1, p2.Name, o2, whoWonMessage);
        }
        swGame.Stop();
        sb.AppendLine("| ");
        sb.AppendFormat("| Game Time: {0}", swGame.Elapsed);
        result.WriteLine(sb.ToString());
        var resultMessage = string.Format("Result: {0} vs {1}: {2} - {3}",
                        result.P1,
                        result.P2,
                        result.P1Score,
                        result.P2Score);
        sb.AppendLine("| ");
        sb.AppendFormat("| {0}", resultMessage);

        using (var p1sw = new StreamWriter(Path.Combine(gameLogDirectory, p1.Name + ".log"), true))
        {
            p1sw.WriteLine(sb.ToString());
        }
        using (var p2sw = new StreamWriter(Path.Combine(gameLogDirectory, p2.Name + ".log"), true))
        {
            p2sw.WriteLine(sb.ToString());
        }
        result.P1AvgTimeMs = p1Times.Average();
        result.P2AvgTimeMs = p2Times.Average();
        return result;
    }

与.NET中的TPL并发.Parallel Generator在一段时间后阻塞并陷入困境

我认为您的问题是由在IEnumerable<T>上使用Parallel.ForEach()引起的,这可能需要无限长的时间才能生成下一个元素。这与在BlockingCollection.GetConsumingEnumerable():上使用Parallel.ForEach()基本上是相同的问题

Parallel.ForEach和PLINQ默认使用的分区算法都使用分块,以最大限度地降低同步成本:它不会为每个元素获取一次锁,而是获取锁,获取一组元素(一个块(,然后释放锁。

但是,由于在您的情况下,序列中的下一个元素直到前一个元素被处理后才会产生,这将导致不确定的阻塞。

我认为这里的正确解决方案是使用BlockingCollection<T>而不是yield return:用blockingCollection.Add(nextMatch)替换yield return nextMatch,然后在单独的线程上运行Generate(),并在Parallel.ForEach()中使用上面提到的博客文章中的blockingCollection.GetConsumingPartitioner()

我也不喜欢Generate()浪费了整个CPU核心,在没有有效匹配的情况下基本上什么都不做,但这是一个单独的问题。