生产者-消费者模式中的线程

本文关键字:线程 模式 消费者 生产者 | 更新日期: 2023-09-27 18:05:54

首先为我糟糕的英语道歉。我已经编写了这个代码来读取。csv文件,一个接一个,读取后,它将被删除。.csv文件格式如下:

ID     Name     ParentID
------------------------
0      [Root]    
1      A        0
2      B        0
3      C        1
4      D        2

,结果将是

[Root]
_A
__C
_B
__D

就像树一样。但是我只能在一个文件上执行,并且在第一次之后,消费者再也不会醒来,所以请帮助我修复这个问题或解释如何调用这个原因和修复方法。

谢谢你。

class Program
{
    static ProducerConsumer PrdCos;
    static string[] file;
    static void Main(string[] args)
    {
        string directory = "";
        PrdCos = new ProducerConsumer();
        Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);
        directory = Input_directory();
        file = null;
        while (true)
        {
            if (Check_exist(directory) == true)
                break;
            else 
            {
                Console.WriteLine("Please Reinput 'n");
                directory = Input_directory();
            }
        }
        file = Directory.GetFiles(directory, "*.csv");
        Console.WriteLine("'n-------------------------Program Start------------------------------");
        Thread thread =  new Thread(new ThreadStart(consumeJob));
        Thread thread2 = new Thread(new ThreadStart(procedureJob));
        thread.IsBackground = true;
        thread2.IsBackground = true;
        thread.Start();
        thread2.Start();
        //delete file
        //for (int i = 0; i < file.Length; i++)
        //    File.Delete(file[i]);
        Console.ReadLine();
        while(Console.ReadKey(true).Key == ConsoleKey.Enter)
        {
            Console.WriteLine("press ctr + c to exit");
        }
    }
    static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e){}
    static void procedureJob()
    {
        if (file.Length > 0)
        {
            foreach (string str in file)
            {
                PrdCos.Produce(str);
                Thread.Sleep(1000);
            }
        }
    }
    static void consumeJob()
    {
        List<string> a = new List<string>();
        a = file.ToList();
        //for(int i = 0; i<a.Count;i++)
        //{
            PrdCos.Consume();
            Thread.Sleep(100);
        //}
    }
    static string Input_directory()
    {
        string str = ""; 
        Console.Write("Input file Directory:  ");
        str = Console.ReadLine();
        return str;
    }
    static bool Check_exist(string path)
    {
        int Count;
        string[] file;
        //string fileName = "";
        string filePath;
        filePath = path;
        if (Directory.Exists(filePath) != true)                 // Check file path exist
        {
            Console.WriteLine("{0} is not valid Directory!!!", path);
            return false;
        }
        else
        {
            Count = Directory.GetFiles(filePath, "*").Length;
            file = Directory.GetFiles(filePath, "*.csv");
        }
        if (Count == 0)
        {
            Console.WriteLine("Folder is Empty!!!");
            return false;
        }
        if (Count > 0 && file.Length == 0)
        {
            Console.WriteLine("Input File not Valid!!!");
            return false;
        }
        return true;
    }
    }
    public class ProducerConsumer
    {
    Queue queue = new Queue();
    object locker = new object();
    int node = 0;
    int NODE = 0;
    public void Produce(string path)
    {
        Console.WriteLine("Processing Produce");
            lock (locker)
            {
                try
                {
                    //foreach (string mPath in path)
                    //{
                    var reader = new StreamReader(File.OpenRead(path));
                    System.Collections.Generic.List<string> Str1 = new System.Collections.Generic.List<string>();
                    while (!reader.EndOfStream)
                    {
                        string line = reader.ReadLine();
                        if (line.Split(',').Length > 3)
                        {
                            Console.WriteLine("File's Data Is Wrong Format!!!");
                            return;
                        }
                        Str1.Add(line);
                    }
                    reader.Close();
                    queue.Enqueue(Str1);
                    Thread.Sleep(2000);
                    //}
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex);
                }
            }
        Console.WriteLine("Finished Processing Produce");
    }
    public void Consume()
    {
        Console.WriteLine("Processing Consume");
        List<string> ParentId = new List<string>();
        DataRow[] row = { };
        lock (locker)
        {
            try
            {
                if (queue.Count > 0)
                {
                    foreach (System.Collections.Generic.List<string> str in queue)
                    {
                        try
                        {
                            node = 0;
                            NODE = 0;
                            NODE = str.Count() - 1;
                            PrintData(str, 0, str);
                            Console.WriteLine("'n");
                        }
                        catch (Exception ex)
                        {
                            Console.Write(ex);
                        }
                    }
                    Console.WriteLine("Finished Processing Consume");
                    Thread.Sleep(1000);
                }
            }                
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            //while (queue.Count == 0)
            //{
            //    Monitor.Wait(locker);
            //    queue.Dequeue();
            //}
            //if (queue == null)
            //    return;
            //Thread.Sleep(100);
        }
    }
    private void PrintWithIndent(string value, int indent)
    {
        node++;
        Console.WriteLine("{0}{1}", new string(' ', indent), value);// prtValue[1]);
    }
    public void PrintData(List<string> Str, int indent, List<string> StrMain)
    {

        List<string> child = new List<string>();
        string[] Value = null;
        string[] ValueMain = null;
        for (int i = 0; i < Str.Count; i++)
        {
            if (node >= NODE)
                return;
            child = new List<string>();
            ValueMain = Str[i].Split(',');
            if (Str[i].ToString() == StrMain[0].ToString())
                continue;
            else
                PrintWithIndent(ValueMain[1], indent);
            foreach (string mStr in StrMain)
            {
                Value = mStr.Split(',');
                if (Value[2] == ValueMain[0])
                    child.Add(mStr);
            }
            child = child.OrderBy(x => (x.Split(',')[1])).ToList();    //Sort by Alphabet
            if (child.Count > 0)
            {
                PrintData(child, indent + 1, StrMain);
            }
        }
    }
}

生产者-消费者模式中的线程

我注意到的第一个问题是您的消费者正在结束。还有更多的问题——我建议你把它扔掉,重新开始。

首先,您需要确保您正确地理解了模式。然后你需要将经典模式与。net中的线程关联起来。

一旦你理解了概念层面的东西,你还应该研究并行LINQ (PLINQ)和任务并行库(TPL) -将它们用于你的生产者/消费者将节省大量的时间和bug(没有更多的锁定,你可以让线程在等待时阻塞,等等)。

这里有一些其他的参考文献可能会有所帮助:

  • 编写单个生产者/单个消费者的最有效方式队列
  • 有效带有多个线程的消费者线程生产商
  • 一个有效这是一种任意数量的消费者开始的方式线程?