具有返回类型的方法的线程池

本文关键字:线程 方法 返回类型 | 更新日期: 2023-09-27 18:37:19

我有一个名为InitializeCRMService()的方法,它返回一个IOrganizationService的对象。现在,我定义了一个名为 GetConnection(string thread) 的不同方法,该方法根据传递给它的参数调用InitializeCRMService()。如果传递给GetConnection的字符串是单个的,它将启动IntializeCRMService()方法的单线程实例,但如果传递的字符串是多个,我需要使用线程池,我需要将方法传递给QueueUserWorkItem。方法InitializeCRMService没有输入参数。它只返回一个服务对象。请在下面找到GetConnection方法中的代码块:

public void GetConnection(string thread)
{
    ParallelOptions ops = new ParallelOptions();
    if(thread.Equals("one"))
    {
        Parallel.For(0, 1, i =>
        {
            dynamic serviceObject = InitializeCRMService();       
        });
    }
    else if (thread.Equals("multi"))
    {
        // HERE I NEED TO IMPLEMENT MULTITHREADING USING THREAD POOL 
        // AND NOT PARALLEL FOR LOOP......
        // ThreadPool.QueueUserWorkItem(new WaitCallback(InitializeCRMService));
    }
}

请注意,我的方法InitializeCRMService()的返回类型为服务对象。

请告诉我如何实现它。

具有返回类型的方法的线程池

由于您希望在插槽可用时在线程池中执行 InitializeCRMService 执行,并且您只执行一次,因此解决方案取决于您要如何处理 InitializeCRMService 的返回值。

如果你只想忽略它,到目前为止我有两个选择。


选项 1

public void GetConnection(string thread)
{
    //I found that ops is not being used
    //ParallelOptions ops = new ParallelOptions();
    if(thread.Equals("one"))
    {
        Parallel.For(0, 1, i =>
        {
            //You don't really need to have a variable
            /*dynamic serviceObject =*/ InitializeCRMService();
        });
    }
    else if (thread.Equals("multi"))
    {
        ThreadPool.QueueUserWorkItem
        (
             new WaitCallback
             (
                 (_) =>
                 {
                      //You don't really need to have a variable
                      /*dynamic serviceObject =*/ InitializeCRMService();
                 }
             )
        );
    }
}

另一方面,如果您需要将其传递到某个地方以存储它,以后可以重用它,您可以这样做:

public void GetConnection(string thread)
{
    //I found that ops is not being used
    //ParallelOptions ops = new ParallelOptions();
    if(thread.Equals("one"))
    {
        Parallel.For(0, 1, i =>
        {
            //It seems to me a good idea to take the same path here too
            //dynamic serviceObject = InitializeCRMService();
            Store(InitializeCRMService());
        });
    }
    else if (thread.Equals("multi"))
    {
        ThreadPool.QueueUserWorkItem
        (
             new WaitCallback
             (
                 (_) =>
                 {
                      Store(InitializeCRMService());
                 }
             )
        );
    }
}

其中商店是这样的:

private void Store(dynamic serviceObject)
{
    //store serviceObject somewhere you can use it later.
    //Depending on your situation you may want to
    // set a flag or use a ManualResetEvent to notify
    // that serviceObject is ready to be used.
    //Any pre proccess can be done here too.
    //Take care of thread affinity,
    // since this may come from the ThreadPool
    // and the consuming thread may be another one,
    // you may need some synchronization.
}

现在,如果需要允许类的客户端访问 serviceObject,可以采用以下方法:

//Note: I marked it as partial because there may be other code not showed here
// in particular I will not write the method GetConnection again. That said...
// you can have it all in a single block in a single file without using partial.
public partial class YourClass
{
    private dynamic _serviceObject;
    private void Store(dynamic serviceObject)
    {
        _serviceObject = serviceObject;
    }
    public dynamic ServiceObject
    {
        get
        {
            return _serviceObject;
        }
    }
}

但这并不能解决所有情况。特别是,如果您希望线程等待服务对象准备就绪:

public partial class YourClass
{
    private ManualResetEvent _serviceObjectWaitHandle = new ManualResetEvent(false);
    private dynamic _serviceObject;

    private void Store(dynamic serviceObject)
    {
        _serviceObject = serviceObject;
        //If you need to do some work as soon as _serviceObject is ready...
        // then it can be done here, this may still be the thread pool thread.
        //If you need to call something like the UI...
        // you will need to use BeginInvoke or a similar solution.
        _serviceObjectWaitHandle.Set();
    }
    public void WaitForServiceObject()
    {
            //You may also expose other overloads, just for convenience.
            //This will wait until Store is executed
            //When _serviceObjectWaitHandle.Set() is called
            // this will let other threads pass.
            _serviceObjectWaitHandle.WaitOne();
    }
    public dynamic ServiceObject
    {
        get
        {
            return _serviceObject;
        }
    }
}

不过,我还没有涵盖所有场景。对于意图...如果多次调用 GetConnection 会发生什么情况?我们需要决定是否要允许这样做,如果我们这样做,我们如何处理旧的 serviceObject?(我们需要调用某些东西来消除它吗?如果我们允许多个线程一次调用 GetConnection,这可能会有问题。所以默认情况下我会说我们没有,但我们也不想阻止其他线程......

解决方案是什么?遵循:

//This is another part of the same class
//This one includes GetConnection
public partial class YourClass
{
    //1 if GetConnection has been called, 0 otherwise
    private int _initializingServiceObject;
    public void GetConnection(string thread)
    {
        if (Interlocked.CompareExchange(ref _initializingServiceObject, 1, 0) == 0)
        {
            //Go on, it is the first time GetConnection is called
            //I found that ops is not being used
            //ParallelOptions ops = new ParallelOptions();
            if(thread.Equals("one"))
            {
                Parallel.For(0, 1, i =>
                {
                    //It seems to me a good idea to take the same path here too
                    //dynamic serviceObject = InitializeCRMService();
                    Store(InitializeCRMService());
                });
            }
            else if (thread.Equals("multi"))
            {
                ThreadPool.QueueUserWorkItem
                (
                     new WaitCallback
                     (
                         (_) =>
                         {
                              Store(InitializeCRMService());
                         }
                     )
                );
            }
        }
    }
}

最后,如果我们允许多线程使用_serviceObject,并且_serviceObject不是线程安全的,我们可能会遇到麻烦。使用监视器或使用读写锁定是解决此问题的两种选择。

你还记得吗?

    public dynamic ServiceObject
    {
        get
        {
            return _serviceObject;
        }
    }

好的,您希望调用方在_serviceObject处于阻止其他线程进入的上下文中时访问该(请参阅 System.Threading.Monitor),并确保它停止使用它,然后离开我之前提到的这个上下文。

现在考虑调用方线程仍然可以在某处存储_serviceObject的副本,然后离开同步,然后对_serviceObject执行某些操作,当另一个线程使用它时可能会发生这种情况。

在线程方面,我习惯于考虑每个角落的情况。但是,如果您可以控制调用线程,则可以仅使用上面显示的属性很好地做到这一点。如果你不...让我们谈谈它,我警告你,它可以是广泛的。


选项 2

这是一种完全不同的行为,您的问题中提出的Damien_The_Unbeliever表扬使我认为您可能打算返回serviceObject。在这种情况下,它不会在线程之间共享,并且可以一次具有多个 serviceObject。所需的任何同步都留给调用方。

好的,这可能是您一直在寻找的:

public void GetConnection(string thread, Action<dynamic> callback)
{
    if (ReferenceEquals(callback, null))
    {
        throw new ArgumentNullException("callback");
    }
    //I found that ops is not being used
    //ParallelOptions ops = new ParallelOptions();
    if(thread.Equals("one"))
    {
        Parallel.For(0, 1, i =>
        {
            callback(InitializeCRMService());
        });
    }
    else if (thread.Equals("multi"))
    {
        ThreadPool.QueueUserWorkItem
        (
             new WaitCallback
             (
                 (_) =>
                 {
                      callback(InitializeCRMService());
                 }
             )
        );
    }
}

回调应该是什么样子?好吧,只要它不在线程之间共享就可以了。为什么?因为调用 GetConnection 的每个线程都会传递它自己的回调操作,并且将接收不同的 serviceObject,因此不存在一个线程对其执行的操作影响另一个线程对其执行的操作的风险(因为它不是同一个 serviceObject)。

除非您希望一个线程调用this然后与其他线程共享,否则在这种情况下,这是调用者的问题,它将在另一个地方解决。


最后一件事,您可以使用枚举来表示当前在字符串线程中传递的选项。事实上,由于只有两个选项,您可以考虑使用布尔值,除非它们将来可能会出现更多情况。