直方图函数的并行化

本文关键字:并行化 函数 直方图 | 更新日期: 2023-09-27 17:59:46

我实现了一个简单函数的正常并行验证,该函数从32bppArgb位图计算直方图。在1920x1080的图像上,正常版本大约需要0.03秒,而并行版本需要0.07秒。

线程开销真的那么重吗?除了Parallel,还有其他构造吗。因为这可以加快这个过程吗?我需要加快速度,因为我使用的是每秒30帧的视频。

这是简化的代码:

public sealed class Histogram
{
    public int MaxA = 0;
    public int MaxR = 0;
    public int MaxG = 0;
    public int MaxB = 0;
    public int MaxT = 0;
    public int [] A = null;
    public int [] R = null;
    public int [] G = null;
    public int [] B = null;
    public Histogram ()
    {
        this.A = new int [256];
        this.R = new int [256];
        this.G = new int [256];
        this.B = new int [256];
        this.Initialize();
    }
    public void Initialize ()
    {
        this.MaxA = 0;
        this.MaxR = 0;
        this.MaxG = 0;
        this.MaxB = 0;
        this.MaxT = 0;
        for (int i = 0; i < this.A.Length; i++)
            this.A [i] = 0;
        for (int i = 0; i < this.R.Length; i++)
            this.R [i] = 0;
        for (int i = 0; i < this.G.Length; i++)
            this.G [i] = 0;
        for (int i = 0; i < this.B.Length; i++)
            this.B [i] = 0;
    }
    public void ComputeHistogram (System.Drawing.Bitmap bitmap, bool parallel = false)
    {
        System.Drawing.Imaging.BitmapData data = null;
        data = bitmap.LockBits
        (
            new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height),
            System.Drawing.Imaging.ImageLockMode.ReadOnly,
            System.Drawing.Imaging.PixelFormat.Format32bppArgb
        );
        try
        {
            ComputeHistogram(data, parallel);
        }
        catch
        {
            bitmap.UnlockBits(data);
            throw;
        }
        bitmap.UnlockBits(data);
    }
    public void ComputeHistogram (System.Drawing.Imaging.BitmapData data, bool parallel = false)
    {
        int stride = System.Math.Abs(data.Stride);
        this.Initialize();
        if (parallel)
        {
            unsafe
            {
                System.Threading.Tasks.Parallel.For
                (
                    0,
                    data.Height,
                    new System.Threading.Tasks.ParallelOptions() { MaxDegreeOfParallelism = System.Environment.ProcessorCount },
                    y =>
                    {
                        byte* pointer = ((byte*) data.Scan0) + (stride * y);
                        for (int x = 0; x < stride; x += 4)
                        {
                            this.B [pointer [x + 0]]++;
                            this.G [pointer [x + 1]]++;
                            this.R [pointer [x + 2]]++;
                            this.A [pointer [x + 3]]++;
                        }
                    }
                );
            }
        }
        else
        {
            unsafe
            {
                for (int y = 0; y < data.Height; y++)
                {
                    byte* pointer = ((byte*) data.Scan0) + (stride * y);
                    for (int x = 0; x < stride; x += 4)
                    {
                        this.B [pointer [x + 0]]++;
                        this.G [pointer [x + 1]]++;
                        this.R [pointer [x + 2]]++;
                        this.A [pointer [x + 3]]++;
                    }
                }
            }
        }
        for (int i = 0; i < this.A.Length; i++)
            if (this.MaxA < this.A [i]) this.MaxA = this.A [i];
        for (int i = 0; i < this.R.Length; i++)
            if (this.MaxR < this.R [i]) this.MaxR = this.R [i];
        for (int i = 0; i < this.G.Length; i++)
            if (this.MaxG < this.G [i]) this.MaxG = this.G [i];
        for (int i = 0; i < this.B.Length; i++)
            if (this.MaxB < this.B [i]) this.MaxB = this.B [i];
        if (this.MaxT < this.MaxA) this.MaxT = this.MaxA;
        if (this.MaxT < this.MaxR) this.MaxT = this.MaxR;
        if (this.MaxT < this.MaxG) this.MaxT = this.MaxG;
        if (this.MaxT < this.MaxB) this.MaxT = this.MaxB;
    }
}

直方图函数的并行化

首先,您的并行循环中有一个巨大的错误:

您将有多个线程访问、递增和更新共享数组——由于固有的种族条件,仅在同一个映像上多次运行示例代码就会导致截然不同的结果。

但你不是这么问的。

至于为什么使用并行实现会导致性能下降,简单的答案是,您可能没有在每个并行任务的主体中做足够的工作来抵消创建新任务、调度任务等的"启动成本"。

可能更关键的是,我相信你正在用内存中的所有跳跃来彻底摧毁L1/L2缓存——每个任务线程都会尝试将它认为需要的东西加载到缓存中,但当你到处索引时,你就不再创建一致的访问模式,因此,每次尝试访问位图缓冲区或内部数组时,都可能会出现缓存未命中的情况。

还有一种同样高效的方法可以在不使用不安全代码的情况下获取位图的只读数据。。。实际上,让我们先这样做:

因此,通过调用LockBits,您获得了一个指向非托管内存的指针。让我们复制一下:

System.Drawing.Imaging.BitmapData data = null;
data = bitmap.LockBits
(
    new System.Drawing.Rectangle(0, 0, bitmap.Width, bitmap.Height),
    System.Drawing.Imaging.ImageLockMode.ReadOnly,
    System.Drawing.Imaging.PixelFormat.Format32bppArgb
);
// For later usage
var imageStride = data.Stride;
var imageHeight = data.Height;
// allocate space to hold the data
byte[] buffer = new byte[data.Stride * data.Height];
// Source will be the bitmap scan data
IntPtr pointer = data.Scan0;
// the CLR marshalling system knows how to move blocks of bytes around, FAST.
Marshal.Copy(pointer, buffer, 0, buffer.Length);
// and now we can unlock this since we don't need it anymore
bitmap.UnlockBits(data);
ComputeHistogram(buffer, imageStride, imageHeight, parallel);

现在,对于竞争条件,您可以通过使用Interlocked调用来增加计数,以合理的性能克服这一问题(注意!!多线程编程很难,完全有可能我的解决方案不完美!

public void ComputeHistogram (byte[] data, int stride, int height, bool parallel = false)
{
    this.Initialize();
    if (parallel)
    {
        System.Threading.Tasks.Parallel.For
        (
            0,
            height,
            new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
            y =>
            {
                int startIndex = (stride * y);
                int endIndex = stride * (y+1);
                for (int x = startIndex; x < endIndex; x += 4)
                {
                    // Interlocked actions are more-or-less atomic 
                    // (caveats abound, but this should work for us)
                    Interlocked.Increment(ref this.B[data[x]]);
                    Interlocked.Increment(ref this.G[data[x+1]]);
                    Interlocked.Increment(ref this.R[data[x+2]]);
                    Interlocked.Increment(ref this.A[data[x+3]]);
                }
            }
        );
    }
    else
    {
        // the original way is ok for non-parallel, since only one
        // thread is mucking around with the data
    }
    // Sorry, couldn't help myself, this just looked "cleaner" to me
    this.MaxA = this.A.Max();
    this.MaxR = this.R.Max();
    this.MaxG = this.G.Max();
    this.MaxB = this.B.Max();
    this.MaxT = new[] { this.MaxA, this.MaxB, this.MaxG, this.MaxR }.Max();
}

那么,这对运行时行为有什么影响呢?

不是很多,但至少平行分叉现在计算出了正确的结果。:)

使用一个非常便宜的测试设备:

void Main()
{    
    foreach(var useParallel in new[]{false, true})
    {
        var totalRunTime = TimeSpan.Zero;
        var sw = new Stopwatch();
        var runCount = 10;
        for(int run=0; run < runCount; run++)
        {
            GC.Collect();
            GC.WaitForPendingFinalizers();
            GC.Collect();
            sw.Reset();
            sw.Start();
            var bmp = Bitmap.FromFile(@"c:'temp'banner.bmp") as Bitmap;
            var hist = new Histogram();
            hist.ComputeHistogram(bmp, useParallel);
            sw.Stop();
            totalRunTime = totalRunTime.Add(sw.Elapsed);
        }
        Console.WriteLine("Parallel={0}, Avg={1} ms", useParallel, totalRunTime.TotalMilliseconds / runCount);
    }
}

我得到这样的结果:

Parallel=False, Avg=1.69777 ms
Parallel=True, Avg=5.33584 ms

正如你所看到的,我们还没有解决你最初的问题

因此,让我们尝试一下让并行工作"更好":

让我们看看"给任务更多的工作"有什么作用:

if (parallel)
{
    var batchSize = 2;
    System.Threading.Tasks.Parallel.For
    (
        0,
        height / batchSize,
        new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount },
        y =>
        {
            int startIndex = (stride * y * batchSize);
            int endIndex = startIndex + (stride * batchSize);
            for (int x = startIndex; x < endIndex; x += 4)
            {
                // Interlocked actions are more-or-less atomic 
                // (caveats abound, but this should work for us)
                Interlocked.Increment(ref this.B[data[x]]);
                Interlocked.Increment(ref this.G[data[x+1]]);
                Interlocked.Increment(ref this.R[data[x+2]]);
                Interlocked.Increment(ref this.A[data[x+3]]);
            }
        }
    );
}

结果:

Parallel=False, Avg=1.70273 ms
Parallel=True, Avg=4.82591 ms

哦,看起来很有希望。。。我想知道当我们改变batchSize时会发生什么?

让我们这样改变我们的试验台:

void Main()
{    
    foreach(var useParallel in new[]{false, true})
    {
        for(int batchSize = 1; batchSize < 1024; batchSize <<= 1)
        {
            var totalRunTime = TimeSpan.Zero;
            var sw = new Stopwatch();
            var runCount = 10;
            for(int run=0; run < runCount; run++)
            {
                GC.Collect();
                GC.WaitForPendingFinalizers();
                GC.Collect();
                sw.Reset();
                sw.Start();
                var bmp = Bitmap.FromFile(@"c:'temp'banner.bmp") as Bitmap;
                var hist = new Histogram();
                hist.ComputeHistogram(bmp, useParallel, batchSize);
                sw.Stop();
                totalRunTime = totalRunTime.Add(sw.Elapsed);
            }
            Console.WriteLine("Parallel={0}, BatchSize={1} Avg={2} ms", useParallel, batchSize, totalRunTime.TotalMilliseconds / runCount);
        }        
    }
}

结果:(只显示平行=真,因为非平行不会改变)

Parallel=True, BatchSize=1 Avg=5.57644 ms
Parallel=True, BatchSize=2 Avg=5.49982 ms
Parallel=True, BatchSize=4 Avg=5.20434 ms
Parallel=True, BatchSize=8 Avg=5.1721 ms
Parallel=True, BatchSize=16 Avg=5.00405 ms
Parallel=True, BatchSize=32 Avg=4.44973 ms
Parallel=True, BatchSize=64 Avg=2.28332 ms
Parallel=True, BatchSize=128 Avg=1.39957 ms
Parallel=True, BatchSize=256 Avg=1.29156 ms
Parallel=True, BatchSize=512 Avg=1.28656 ms

一旦批量大小达到64-128的范围,我们似乎就接近了某种渐近线,当然,您的里程数可能会根据位图大小等而有所不同。

我希望这能有所帮助!这是我等待生产构建完成的一天中一个有趣的消遣!:)

创建线程有相当大的开销。执行可能比单线程版本运行得快得多,但完成得太快,无法弥补最初的开销。

如果你每帧都这样做,只会让你慢下来。

但是,如果您手动创建线程池,手动分配工作,并为每个帧重用线程,您可能会发现,到第二帧或第三帧时,您的代码会超过单线程版本。