生产者-消费者模式中的线程
本文关键字:线程 模式 消费者 生产者 | 更新日期: 2023-09-27 18:05:54
首先为我糟糕的英语道歉。我已经编写了这个代码来读取。csv文件,一个接一个,读取后,它将被删除。.csv文件格式如下:
ID Name ParentID
------------------------
0 [Root]
1 A 0
2 B 0
3 C 1
4 D 2
,结果将是
[Root]
_A
__C
_B
__D
就像树一样。但是我只能在一个文件上执行,并且在第一次之后,消费者再也不会醒来,所以请帮助我修复这个问题或解释如何调用这个原因和修复方法。
谢谢你。
class Program
{
static ProducerConsumer PrdCos;
static string[] file;
static void Main(string[] args)
{
string directory = "";
PrdCos = new ProducerConsumer();
Console.CancelKeyPress += new ConsoleCancelEventHandler(Console_CancelKeyPress);
directory = Input_directory();
file = null;
while (true)
{
if (Check_exist(directory) == true)
break;
else
{
Console.WriteLine("Please Reinput 'n");
directory = Input_directory();
}
}
file = Directory.GetFiles(directory, "*.csv");
Console.WriteLine("'n-------------------------Program Start------------------------------");
Thread thread = new Thread(new ThreadStart(consumeJob));
Thread thread2 = new Thread(new ThreadStart(procedureJob));
thread.IsBackground = true;
thread2.IsBackground = true;
thread.Start();
thread2.Start();
//delete file
//for (int i = 0; i < file.Length; i++)
// File.Delete(file[i]);
Console.ReadLine();
while(Console.ReadKey(true).Key == ConsoleKey.Enter)
{
Console.WriteLine("press ctr + c to exit");
}
}
static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e){}
static void procedureJob()
{
if (file.Length > 0)
{
foreach (string str in file)
{
PrdCos.Produce(str);
Thread.Sleep(1000);
}
}
}
static void consumeJob()
{
List<string> a = new List<string>();
a = file.ToList();
//for(int i = 0; i<a.Count;i++)
//{
PrdCos.Consume();
Thread.Sleep(100);
//}
}
static string Input_directory()
{
string str = "";
Console.Write("Input file Directory: ");
str = Console.ReadLine();
return str;
}
static bool Check_exist(string path)
{
int Count;
string[] file;
//string fileName = "";
string filePath;
filePath = path;
if (Directory.Exists(filePath) != true) // Check file path exist
{
Console.WriteLine("{0} is not valid Directory!!!", path);
return false;
}
else
{
Count = Directory.GetFiles(filePath, "*").Length;
file = Directory.GetFiles(filePath, "*.csv");
}
if (Count == 0)
{
Console.WriteLine("Folder is Empty!!!");
return false;
}
if (Count > 0 && file.Length == 0)
{
Console.WriteLine("Input File not Valid!!!");
return false;
}
return true;
}
}
public class ProducerConsumer
{
Queue queue = new Queue();
object locker = new object();
int node = 0;
int NODE = 0;
public void Produce(string path)
{
Console.WriteLine("Processing Produce");
lock (locker)
{
try
{
//foreach (string mPath in path)
//{
var reader = new StreamReader(File.OpenRead(path));
System.Collections.Generic.List<string> Str1 = new System.Collections.Generic.List<string>();
while (!reader.EndOfStream)
{
string line = reader.ReadLine();
if (line.Split(',').Length > 3)
{
Console.WriteLine("File's Data Is Wrong Format!!!");
return;
}
Str1.Add(line);
}
reader.Close();
queue.Enqueue(Str1);
Thread.Sleep(2000);
//}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}
Console.WriteLine("Finished Processing Produce");
}
public void Consume()
{
Console.WriteLine("Processing Consume");
List<string> ParentId = new List<string>();
DataRow[] row = { };
lock (locker)
{
try
{
if (queue.Count > 0)
{
foreach (System.Collections.Generic.List<string> str in queue)
{
try
{
node = 0;
NODE = 0;
NODE = str.Count() - 1;
PrintData(str, 0, str);
Console.WriteLine("'n");
}
catch (Exception ex)
{
Console.Write(ex);
}
}
Console.WriteLine("Finished Processing Consume");
Thread.Sleep(1000);
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
//while (queue.Count == 0)
//{
// Monitor.Wait(locker);
// queue.Dequeue();
//}
//if (queue == null)
// return;
//Thread.Sleep(100);
}
}
private void PrintWithIndent(string value, int indent)
{
node++;
Console.WriteLine("{0}{1}", new string(' ', indent), value);// prtValue[1]);
}
public void PrintData(List<string> Str, int indent, List<string> StrMain)
{
List<string> child = new List<string>();
string[] Value = null;
string[] ValueMain = null;
for (int i = 0; i < Str.Count; i++)
{
if (node >= NODE)
return;
child = new List<string>();
ValueMain = Str[i].Split(',');
if (Str[i].ToString() == StrMain[0].ToString())
continue;
else
PrintWithIndent(ValueMain[1], indent);
foreach (string mStr in StrMain)
{
Value = mStr.Split(',');
if (Value[2] == ValueMain[0])
child.Add(mStr);
}
child = child.OrderBy(x => (x.Split(',')[1])).ToList(); //Sort by Alphabet
if (child.Count > 0)
{
PrintData(child, indent + 1, StrMain);
}
}
}
}
我注意到的第一个问题是您的消费者正在结束。还有更多的问题——我建议你把它扔掉,重新开始。
首先,您需要确保您正确地理解了模式。然后你需要将经典模式与。net中的线程关联起来。
一旦你理解了概念层面的东西,你还应该研究并行LINQ (PLINQ)和任务并行库(TPL) -将它们用于你的生产者/消费者将节省大量的时间和bug(没有更多的锁定,你可以让线程在等待时阻塞,等等)。
这里有一些其他的参考文献可能会有所帮助:
- 编写单个生产者/单个消费者的最有效方式队列
- 有效带有多个线程的消费者线程生产商
- 一个有效这是一种任意数量的消费者开始的方式线程?