使用共享资源创建多个线程

本文关键字:线程 创建 共享资源 | 更新日期: 2023-09-27 18:19:54

好吧,所以我在创建一堆线程时遇到了问题,这些线程都使用同一个对象。这个想法是,我有一个项目的"队列"(又名列表),项目应该一个接一个地处理,直到所有项目都被处理完。目前,这只适用于一个线程(当我将threadcount更改为1时),但当我尝试将其设置为threadcount=2,并且线程正在竞争时,所有线程都将转到。。。。一个糟糕的地方。

以下是我制作的一些快速课程,以给出我试图实现的目标的详细示例。。。我有一个很好的预感,这将与使用"锁定"关键字有关,但我不能100%确定它是如何使用的。

在你的答案中,请用解决方案的代码举例,使你的答案清晰明了。谢谢

代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace MyNamespace
{
    class Class1
    {
        static void Main()
        {
            Debug.WriteLine("starting application...");
            int threadcount = 2;
            List<Task> tasks = new List<Task>();
            List<Class2> myObjs = new List<Class2>();
            myObjs.Add(new Class2("list item 1"));
            myObjs.Add(new Class2("list item 2"));
            myObjs.Add(new Class2("list item 3"));
            myObjs.Add(new Class2("list item 4"));
            myObjs.Add(new Class2("list item 5"));
            myObjs.Add(new Class2("list item 6"));
            myObjs.Add(new Class2("list item 7"));
            myObjs.Add(new Class2("list item 8"));
            myObjs.Add(new Class2("list item 9"));
            Debug.WriteLine("about to create " + threadcount + " task(s)...");
            int t = 0;
            do
            {
                t++;
                Debug.WriteLine("creating task " + t);
                Class3 starter = new Class3();
                tasks.Add(starter.StartNewThread(myObjs));
            } while (t < threadcount);
            Task.WaitAll(tasks.ToArray());
            Debug.WriteLine("all tasks have completed");
        }
    }
    class Class2
    {
        private string m_status;
        public string status
        {
            get { return m_status; }
            set { m_status = value; }
        }
        private string m_text;
        public string text
        {
            get { return m_text; }
            set { m_text = value; }
        }
        private int m_threadid;
        public int threadid
        {
            get { return m_threadid; }
            set { m_threadid = value; }
        }
        public Class2()
        {
            m_status = "created";
            m_text = "";
            m_threadid = 0;
        }
        public Class2(string intext)
        {
            m_status = "created";
            m_text = intext;
            m_threadid = 0;
        }
    }
    class Class3
    {
        public Task StartNewThread(List<Class2> taskObjs)
        {
            Task<List<Class2>> task = Task.Factory
                .StartNew(() => threadTaskWorker(taskObjs),
                CancellationToken.None,
                TaskCreationOptions.None,
                TaskScheduler.Default)
                .ContinueWith(completed_task => threadTaskComplete(completed_task.Result));
            return task;
        }
        private List<Class2> threadTaskWorker(List<Class2> taskObjs)
        {
            Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
            //Process all items in the list that need processing
            Class2 nextObj;
            do
            {
                //Look for next item in list that needs processing
                nextObj = null;
                foreach (Class2 taskObj in taskObjs)
                {
                    if (taskObj.status == "created")
                    {
                        nextObj = taskObj;
                        break;
                    }
                }
                if (nextObj != null)
                {
                    Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                        " is handling " + nextObj.text);
                    nextObj.status = "processing";
                    nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
                    nextObj.text += "(handled)";
                    Random rnd = new Random();
                    Thread.Sleep(rnd.Next(300, 3000));
                    nextObj.status = "completed";
                }
            } while (nextObj != null);
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
            //Return the task object
            return taskObjs;
        }
        private List<Class2> threadTaskComplete(List<Class2> taskObjs)
        {
            Debug.WriteLine("a thread has finished, here are the current item's status...");
            foreach (Class2 taskObj in taskObjs)
            {
                Debug.WriteLine(taskObj.text +
                    " thread:" + taskObj.threadid +
                    " status:" + taskObj.status);
            }
            //Return the task object
            return taskObjs;
        }
    }
}

结果:

/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 1
thread #10 is handling list item 2
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #11 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 6
thread #10 is handling list item 7
thread #11 is handling list item 8
thread #10 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:processing
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:completed
all tasks have completed
*/

预期结果:

/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 7
thread #10 is handling list item 8
thread #11 is handling list item 9
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:processing
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:completed
all tasks have completed
*/

使用共享资源创建多个线程

如果您不想使用ConcurrentQueue,或者您使用的其他共享资源不是线程安全的,请使用前面在使用lock关键字时指出的选项。

来自MSDN:

lock关键字通过获取给定对象的互斥锁、执行语句,然后释放锁,将语句块标记为关键部分。

当线程获得给定object的锁时,遇到lock(object)语句的其他线程必须等待锁可用后才能继续。

/// any resource shared between threads
private List<int> sharedResource = new List<int>();
/// best practice is to use a private object to synchronise threads
/// see: https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx
private object resourceLock = new object();
void MethodAccessingSharedResource()
{
    /// Only one thread can acquire the lock on resourceLock at a time.
    lock (resourceLock)
    {
        /// The thread can safely access the shared resource here.
        /// Other threads will wait at lock(resourceLock) until 
        /// this thread gives up the lock.
    }
    /// The thread has released the lock on resourceLock.
    /// Another thread can now enter the lock(){} code block.
}

首先,感谢@khargoosh和@interceptwind的输入!它是帮助我理解锁并想出这个解决方案的关键。这就是我想出的,最终效果很好!它已经过测试,结果是实际结果。在答案中,我决定使用4个线程来显示结果。

代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace MyNamespace
{
    class Class1
    {
        static void Main()
        {
            Debug.WriteLine("starting application...");
            int threadcount = 4;
            List<Task> tasks = new List<Task>();
            List<Class2> myObjs = new List<Class2>();
            myObjs.Add(new Class2("list item 1"));
            myObjs.Add(new Class2("list item 2"));
            myObjs.Add(new Class2("list item 3"));
            myObjs.Add(new Class2("list item 4"));
            myObjs.Add(new Class2("list item 5"));
            myObjs.Add(new Class2("list item 6"));
            myObjs.Add(new Class2("list item 7"));
            myObjs.Add(new Class2("list item 8"));
            myObjs.Add(new Class2("list item 9"));
            Debug.WriteLine("about to create " + threadcount + " task(s)...");
            int t = 0;
            do
            {
                t++;
                Debug.WriteLine("creating task " + t);
                Class3 starter = new Class3();
                tasks.Add(starter.StartNewThread(myObjs));
            } while (t < threadcount);
            Task.WaitAll(tasks.ToArray());
            Debug.WriteLine("all tasks have completed");
        }
    }
    class Class2
    {
        private object m_locker = new object();
        public object locker
        {
            get { return m_locker; }
            set { m_locker = value; }
        }
        private string m_status;
        public string status
        {
            get { return m_status; }
            set { m_status = value; }
        }
        private string m_text;
        public string text
        {
            get { return m_text; }
            set { m_text = value; }
        }
        private int m_threadid;
        public int threadid
        {
            get { return m_threadid; }
            set { m_threadid = value; }
        }
        public Class2()
        {
            m_status = "created";
            m_text = "";
            m_threadid = 0;
        }
        public Class2(string intext)
        {
            m_status = "created";
            m_text = intext;
            m_threadid = 0;
        }
    }
    class Class3
    {
        public Task StartNewThread(List<Class2> taskObjs)
        {
            Task<List<Class2>> task = Task.Factory
                .StartNew(() => threadTaskWorker(taskObjs),
                CancellationToken.None,
                TaskCreationOptions.None,
                TaskScheduler.Default)
                .ContinueWith(completed_task => threadTaskComplete(completed_task.Result));
            return task;
        }
        private List<Class2> threadTaskWorker(List<Class2> taskObjs)
        {
            Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
            //Process all items in the list that need processing
            Class2 nextObj;
            do
            {
                //Look for next item in list that needs processing
                nextObj = null;
                foreach (Class2 taskObj in taskObjs)
                {
                    nextObj = taskObj;
                    lock (nextObj.locker)
                    {
                        if (taskObj.status == "created")
                        {
                            nextObj.status = "processing";
                            break;
                        }
                        else nextObj = null;
                    }
                }
                if (nextObj != null)
                {
                    Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                        " is handling " + nextObj.text);
                    nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
                    nextObj.text += "(handled)";
                    Random rnd = new Random();
                    Thread.Sleep(rnd.Next(300, 3000));
                    nextObj.status = "completed";
                }
            } while (nextObj != null);
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
            //Return the task object
            return taskObjs;
        }
        private List<Class2> threadTaskComplete(List<Class2> taskObjs)
        {
            Debug.WriteLine("a thread has finished, here are the current item's status...");
            foreach (Class2 taskObj in taskObjs)
            {
                Debug.WriteLine(taskObj.text +
                    " thread:" + taskObj.threadid +
                    " status:" + taskObj.status);
            }
            //Return the task object
            return taskObjs;
        }
    }
}

结果:

/*
starting application...
about to create 4 task(s)...
creating task 1
creating task 2
creating task 3
creating task 4
thread #11 created.
thread #13 created.
thread #12 created.
thread #12 is handling list item 3
thread #11 is handling list item 1
thread #13 is handling list item 2
thread #14 created.
thread #14 is handling list item 4
thread #12 is handling list item 5
thread #11 is handling list item 6
thread #13 is handling list item 7
thread #14 is handling list item 8
thread #12 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:processing
list item 8(handled) thread:14 status:processing
list item 9(handled) thread:12 status:processing
thread #13 destroyed.
thread #14 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
thread #12 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:completed
all tasks have completed
*/

如果您确实想要一个可以并发访问的项目的先进先出队列,那么请使用ConcurrentQueue。使用TryDequeue()方法检索对象将确保每个对象只被访问一次。

示例:

var cq = new ConcurrentQueue<T>();
//populate queue
...
//process queue until empty -- this can be done in parallel
T item;
while(cq.trydequeue(out item)){
    //process item
}
//queue was empty when we tried to retrieve something.

首先,您需要一个额外的对象来用作Class2 中的locker

class Class2
{
    public object locker = new object();
    private string m_status;
    ...
}

编辑:接下来,在Class3中的处理循环中,您需要首先检查您的nextObj.status是否为"已创建"。如果是,则将其更改为"处理"并继续处理。如果不是,则跳到下一个对象。

请注意,我们将锁定nextObj.status,以防止两个线程同时访问它。(根据MoreOn的评论)

    private List<Class2> threadTaskWorker(List<Class2> taskObjs)
    {
        Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");
        //Process all items in the list that need processing
        foreach (Class2 nextObj in taskObjs)
        {
            Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                " is handling " + nextObj.text);
            lock (nextObj.locker)
            {
                if (nextObj.status == "created")
                    nextObj.status = "processing";
                else
                    continue;
            }
            nextObj.status = "processing";
            nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
            nextObj.text += "(handled)";
            Random rnd = new Random();
            Thread.Sleep(rnd.Next(300, 3000));
            nextObj.status = "completed";
        }
        Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
        return taskObjs;
    }