使用共享资源创建多个线程
本文关键字:线程 创建 共享资源 | 更新日期: 2023-09-27 18:19:54
好吧,所以我在创建一堆线程时遇到了问题,这些线程都使用同一个对象。这个想法是,我有一个项目的"队列"(又名列表),项目应该一个接一个地处理,直到所有项目都被处理完。目前,这只适用于一个线程(当我将threadcount更改为1时),但当我尝试将其设置为threadcount=2,并且线程正在竞争时,所有线程都将转到。。。。一个糟糕的地方。
以下是我制作的一些快速课程,以给出我试图实现的目标的详细示例。。。我有一个很好的预感,这将与使用"锁定"关键字有关,但我不能100%确定它是如何使用的。
在你的答案中,请用解决方案的代码举例,使你的答案清晰明了。谢谢
代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace MyNamespace
{
class Class1
{
static void Main()
{
Debug.WriteLine("starting application...");
int threadcount = 2;
List<Task> tasks = new List<Task>();
List<Class2> myObjs = new List<Class2>();
myObjs.Add(new Class2("list item 1"));
myObjs.Add(new Class2("list item 2"));
myObjs.Add(new Class2("list item 3"));
myObjs.Add(new Class2("list item 4"));
myObjs.Add(new Class2("list item 5"));
myObjs.Add(new Class2("list item 6"));
myObjs.Add(new Class2("list item 7"));
myObjs.Add(new Class2("list item 8"));
myObjs.Add(new Class2("list item 9"));
Debug.WriteLine("about to create " + threadcount + " task(s)...");
int t = 0;
do
{
t++;
Debug.WriteLine("creating task " + t);
Class3 starter = new Class3();
tasks.Add(starter.StartNewThread(myObjs));
} while (t < threadcount);
Task.WaitAll(tasks.ToArray());
Debug.WriteLine("all tasks have completed");
}
}
class Class2
{
private string m_status;
public string status
{
get { return m_status; }
set { m_status = value; }
}
private string m_text;
public string text
{
get { return m_text; }
set { m_text = value; }
}
private int m_threadid;
public int threadid
{
get { return m_threadid; }
set { m_threadid = value; }
}
public Class2()
{
m_status = "created";
m_text = "";
m_threadid = 0;
}
public Class2(string intext)
{
m_status = "created";
m_text = intext;
m_threadid = 0;
}
}
class Class3
{
public Task StartNewThread(List<Class2> taskObjs)
{
Task<List<Class2>> task = Task.Factory
.StartNew(() => threadTaskWorker(taskObjs),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default)
.ContinueWith(completed_task => threadTaskComplete(completed_task.Result));
return task;
}
private List<Class2> threadTaskWorker(List<Class2> taskObjs)
{
Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
//Process all items in the list that need processing
Class2 nextObj;
do
{
//Look for next item in list that needs processing
nextObj = null;
foreach (Class2 taskObj in taskObjs)
{
if (taskObj.status == "created")
{
nextObj = taskObj;
break;
}
}
if (nextObj != null)
{
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
" is handling " + nextObj.text);
nextObj.status = "processing";
nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
nextObj.text += "(handled)";
Random rnd = new Random();
Thread.Sleep(rnd.Next(300, 3000));
nextObj.status = "completed";
}
} while (nextObj != null);
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
//Return the task object
return taskObjs;
}
private List<Class2> threadTaskComplete(List<Class2> taskObjs)
{
Debug.WriteLine("a thread has finished, here are the current item's status...");
foreach (Class2 taskObj in taskObjs)
{
Debug.WriteLine(taskObj.text +
" thread:" + taskObj.threadid +
" status:" + taskObj.status);
}
//Return the task object
return taskObjs;
}
}
}
结果:
/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 1
thread #10 is handling list item 2
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #11 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 6
thread #10 is handling list item 7
thread #11 is handling list item 8
thread #10 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:processing
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:completed
all tasks have completed
*/
预期结果:
/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 7
thread #10 is handling list item 8
thread #11 is handling list item 9
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:processing
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:completed
all tasks have completed
*/
如果您不想使用ConcurrentQueue
,或者您使用的其他共享资源不是线程安全的,请使用前面在使用lock
关键字时指出的选项。
来自MSDN:
lock关键字通过获取给定对象的互斥锁、执行语句,然后释放锁,将语句块标记为关键部分。
当线程获得给定object
的锁时,遇到lock(object)
语句的其他线程必须等待锁可用后才能继续。
/// any resource shared between threads
private List<int> sharedResource = new List<int>();
/// best practice is to use a private object to synchronise threads
/// see: https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx
private object resourceLock = new object();
void MethodAccessingSharedResource()
{
/// Only one thread can acquire the lock on resourceLock at a time.
lock (resourceLock)
{
/// The thread can safely access the shared resource here.
/// Other threads will wait at lock(resourceLock) until
/// this thread gives up the lock.
}
/// The thread has released the lock on resourceLock.
/// Another thread can now enter the lock(){} code block.
}
首先,感谢@khargoosh和@interceptwind的输入!它是帮助我理解锁并想出这个解决方案的关键。这就是我想出的,最终效果很好!它已经过测试,结果是实际结果。在答案中,我决定使用4个线程来显示结果。
代码:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace MyNamespace
{
class Class1
{
static void Main()
{
Debug.WriteLine("starting application...");
int threadcount = 4;
List<Task> tasks = new List<Task>();
List<Class2> myObjs = new List<Class2>();
myObjs.Add(new Class2("list item 1"));
myObjs.Add(new Class2("list item 2"));
myObjs.Add(new Class2("list item 3"));
myObjs.Add(new Class2("list item 4"));
myObjs.Add(new Class2("list item 5"));
myObjs.Add(new Class2("list item 6"));
myObjs.Add(new Class2("list item 7"));
myObjs.Add(new Class2("list item 8"));
myObjs.Add(new Class2("list item 9"));
Debug.WriteLine("about to create " + threadcount + " task(s)...");
int t = 0;
do
{
t++;
Debug.WriteLine("creating task " + t);
Class3 starter = new Class3();
tasks.Add(starter.StartNewThread(myObjs));
} while (t < threadcount);
Task.WaitAll(tasks.ToArray());
Debug.WriteLine("all tasks have completed");
}
}
class Class2
{
private object m_locker = new object();
public object locker
{
get { return m_locker; }
set { m_locker = value; }
}
private string m_status;
public string status
{
get { return m_status; }
set { m_status = value; }
}
private string m_text;
public string text
{
get { return m_text; }
set { m_text = value; }
}
private int m_threadid;
public int threadid
{
get { return m_threadid; }
set { m_threadid = value; }
}
public Class2()
{
m_status = "created";
m_text = "";
m_threadid = 0;
}
public Class2(string intext)
{
m_status = "created";
m_text = intext;
m_threadid = 0;
}
}
class Class3
{
public Task StartNewThread(List<Class2> taskObjs)
{
Task<List<Class2>> task = Task.Factory
.StartNew(() => threadTaskWorker(taskObjs),
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default)
.ContinueWith(completed_task => threadTaskComplete(completed_task.Result));
return task;
}
private List<Class2> threadTaskWorker(List<Class2> taskObjs)
{
Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
//Process all items in the list that need processing
Class2 nextObj;
do
{
//Look for next item in list that needs processing
nextObj = null;
foreach (Class2 taskObj in taskObjs)
{
nextObj = taskObj;
lock (nextObj.locker)
{
if (taskObj.status == "created")
{
nextObj.status = "processing";
break;
}
else nextObj = null;
}
}
if (nextObj != null)
{
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
" is handling " + nextObj.text);
nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
nextObj.text += "(handled)";
Random rnd = new Random();
Thread.Sleep(rnd.Next(300, 3000));
nextObj.status = "completed";
}
} while (nextObj != null);
Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
//Return the task object
return taskObjs;
}
private List<Class2> threadTaskComplete(List<Class2> taskObjs)
{
Debug.WriteLine("a thread has finished, here are the current item's status...");
foreach (Class2 taskObj in taskObjs)
{
Debug.WriteLine(taskObj.text +
" thread:" + taskObj.threadid +
" status:" + taskObj.status);
}
//Return the task object
return taskObjs;
}
}
}
结果:
/*
starting application...
about to create 4 task(s)...
creating task 1
creating task 2
creating task 3
creating task 4
thread #11 created.
thread #13 created.
thread #12 created.
thread #12 is handling list item 3
thread #11 is handling list item 1
thread #13 is handling list item 2
thread #14 created.
thread #14 is handling list item 4
thread #12 is handling list item 5
thread #11 is handling list item 6
thread #13 is handling list item 7
thread #14 is handling list item 8
thread #12 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:processing
list item 8(handled) thread:14 status:processing
list item 9(handled) thread:12 status:processing
thread #13 destroyed.
thread #14 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
thread #12 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:completed
all tasks have completed
*/
如果您确实想要一个可以并发访问的项目的先进先出队列,那么请使用ConcurrentQueue。使用TryDequeue()方法检索对象将确保每个对象只被访问一次。
示例:
var cq = new ConcurrentQueue<T>();
//populate queue
...
//process queue until empty -- this can be done in parallel
T item;
while(cq.trydequeue(out item)){
//process item
}
//queue was empty when we tried to retrieve something.
首先,您需要一个额外的对象来用作Class2 中的locker
class Class2
{
public object locker = new object();
private string m_status;
...
}
编辑:接下来,在Class3中的处理循环中,您需要首先检查您的nextObj.status是否为"已创建"。如果是,则将其更改为"处理"并继续处理。如果不是,则跳到下一个对象。
请注意,我们将锁定nextObj.status,以防止两个线程同时访问它。(根据MoreOn的评论)
private List<Class2> threadTaskWorker(List<Class2> taskObjs)
{
Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
//Process all items in the list that need processing
foreach (Class2 nextObj in taskObjs)
{
Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
" is handling " + nextObj.text);
lock (nextObj.locker)
{
if (nextObj.status == "created")
nextObj.status = "processing";
else
continue;
}
nextObj.status = "processing";
nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
nextObj.text += "(handled)";
Random rnd = new Random();
Thread.Sleep(rnd.Next(300, 3000));
nextObj.status = "completed";
}
Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
return taskObjs;
}