如何在跨度<T>上并行化合并排序



通过使用Span<T>实现合并排序算法,我对它有了更好的理解。

我的工作代码在这里,你可以在这里运行它(没有不安全的部分。(

using System;
using System.Collections.Generic;

public class Program
{
public static void Main()
{
Console.WriteLine(Sort.Merge("bonakidbonakidbonakid"));
}
}
public static class Sort
{
public unsafe static string Merge(string input, IComparer<char> comparer = null)
{
comparer ??= Comparer<char>.Default;
// buffer to contain the orginal data.
Span<char> source = stackalloc char[input.Length];
input.AsSpan().CopyTo(source);
// other buffer, that starts as the output.
Span<char> buffer = stackalloc char[source.Length];

// copy the stack allocated buffer out as the result;
return MergeLadder(
source,
buffer,
comparer).ToString();
}

public unsafe static T[] Merge<T>(T[] input, IComparer<T> comparer = null)
where T : unmanaged
{
comparer ??= Comparer<T>.Default;

// buffer to contain the orginal data.
Span<T> source = stackalloc T[input.Length];
// intentionally copying input so it is not mutated.
input.AsSpan().CopyTo(source);
// other buffer, that starts as the output.
Span<T> buffer = stackalloc T[source.Length];

// copy the stack allocated buffer out as the result;
return MergeLadder(
source,
buffer,
comparer).ToArray();
}

/// <remarks>
/// Bottom up merge with alternatring buffers.
/// </remarks>
private static ReadOnlySpan<T> MergeLadder<T>(
Span<T> input,
Span<T> output,
IComparer<T> comparer,
int width = 2)
{
var half = width / 2;

// walk the input sequence in mergable strides
// combine the left and right halves into the output
for (int i = 0; i < input.Length; i += width)
{
Span<T> left;
Span<T> right;
Span<T> merge;

var remaining = input.Length - i;
if (remaining <= half)
{
// not enough left for a right.
left = input.Slice(i, remaining);
right = Span<T>.Empty;
merge = output.Slice(i, remaining);
}
else if (remaining < width)
{
// not enought for a whole right.
left = input.Slice(i, half);
right = input.Slice(i + half, remaining - half);
merge = output.Slice(i, remaining);
}
else
{
// the full stride.
left = input.Slice(i, half);
right = input.Slice(i + half, half);
merge = output.Slice(i, width);
}

// Now merge the left and right for this stride.
Merge(left, right, merge, comparer);
}

// Did the last stride cover the whole input?
if (width >= input.Length)
{
// Yes, everything is sorted
return output;
}
else
{
// No, walk the array again with double the width.
// Switch the buffers so we don't walk over the results.
return MergeLadder(output, input, comparer, width * 2);
}
}

private static void Merge<T>(
ReadOnlySpan<T> left,
ReadOnlySpan<T> right,
Span<T> merge,
IComparer<T> comparer)
{
//While either span has an element
for(int m = 0, l = 0, r= 0; l < left.Length || r < right.Length; m++)
{
if (l < left.Length && r < right.Length)
{
//both sides have elements
if (comparer.Compare(left[l], right[r]) <= 0)
{
// left is less than right
merge[m] = left[l];
l++;
}
else
{
// right is less than left
merge[m] = right[r];
r++;
}
}
else if (l < left.Length)
{
// only left has some left
merge[m] = left[l];
l++;
}
else
{
// only right has some left
merge[m] = right[r];
r++;
}
}
}
}

我有一个想法,我可以更改合并数据部分的代码,

// walk the input sequence in mergable strides
// combine the left and right halves into the output
for (int i = 0; i < input.Length; i += width)
{
Span<T> left;
Span<T> right;
Span<T> merge;
var remaining = input.Length - i;
if (remaining <= half)
{
// not enough left for a right.
left = input.Slice(i, remaining);
right = Span<T>.Empty;
merge = output.Slice(i, remaining);
}
else if (remaining < width)
{
// not enought for a whole right.
left = input.Slice(i, half);
right = input.Slice(i + half, remaining - half);
merge = output.Slice(i, remaining);
}
else
{
// the full stride.
left = input.Slice(i, half);
right = input.Slice(i + half, half);
merge = output.Slice(i, width);
}
// Now merge the left and right for this stride.
Merge(left, right, merge, comparer);
}

以并行操作。这可能会提供更好的性能,但由于ref struct的限制,我无法找到一个好的方法来实现这一点。

我知道需要适当使用Memory<T>,但考虑到在内部Merge函数中使用索引器,我不明白如何实现这一点。

如果我能得到一个并行版本,我就可以对两者进行基准测试。有什么想法/建议/改写吗?

您可能应该避免使用stackalloc,因为如果您的输入太大,这将失败。特别是因为您最终将其转换为常规数组,所以您应该首先在堆上分配内存。或者,可以使用一个可以重用的内存块池。如果您的输入足够小,可以放在堆栈中,那么您可能不会从任何并行化中受益。

如果你仔细想想,使用堆栈内存进行并行操作没有什么意义,因为工作线程需要引用属于另一个线程的堆栈内存,我认为没有任何方法可以使其快速且内存安全。

一旦你有了实际的堆内存,你的问题基本上就消失了。MergeLadder可以将正则数组作为输入参数,并且可以将数组转换为Merge方法的跨度。这应该允许您的主循环转换为Parallel.For.

这是一个基于@JonasH建议的异步版本。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class Program
{
public static void Main()
{
Console.WriteLine(Sort.Merge("bonakidbonakidbonakid"));
}
}
public static class Sort
{
public async static ValueTask<string> Merge(
string input,
IComparer<char> comparer = null)
{
comparer ??= Comparer<char>.Default;

// buffer to contain the original data.
Memory<char> source = new char[input.Length];
input.AsMemory().CopyTo(source);
// other buffer, that starts as the output.
Memory<char> buffer = new char[source.Length];

// copy the buffer out as the result;
return (await MergeLadder(
source,
buffer,
comparer)).ToString();
}

public async static ValueTask<T[]> Merge<T>(
T[] input,
IComparer<T> comparer = null)
{
comparer ??= Comparer<T>.Default;

// buffer to contain the orginal data.
Memory<T> source = new T[input.Length];
// intentionally copying input so it is not mutated.
input.AsMemory().CopyTo(source);
// other buffer, that starts as the output.
Memory<T> buffer = new T[source.Length];

// copy the buffer out as the result.
return (await MergeLadder(
source,
buffer,
comparer)).ToArray();
}

/// <remarks>
/// Bottom up merge with alternating buffers.
/// </remarks>
private async static ValueTask<ReadOnlyMemory<T>> MergeLadder<T>(
Memory<T> input,
Memory<T> output,
IComparer<T> comparer,
int width = 2)
{
var half = width / 2;

await Task.WhenAll(GetMergeTasks(
input,
output,
comparer,
width,
half));

// Did the last stride cover the whole input?
if (width >= input.Length)
{
// Yes, everything is sorted
return output;
}
else
{
// No, walk the array again with double the width.
// Switch the buffers so we don't walk over the results.
return await MergeLadder(output, input, comparer, width * 2);
}
}

private static IEnumerable<Task> GetMergeTasks<T>(
Memory<T> input,
Memory<T> output,
IComparer<T> comparer,
int width,
int half)
{
// walk the input sequence in mergeable strides
// combine the left and right halves into the output
for (int i = 0; i < input.Length; i += width)
{
Memory<T> left;
Memory<T> right;
Memory<T> merge;

var remaining = input.Length - i;
if (remaining <= half)
{
// not enough left for a right.
left = input.Slice(i, remaining);
right = Memory<T>.Empty;
merge = output.Slice(i, remaining);
}
else if (remaining < width)
{
// not enough for a whole right.
left = input.Slice(i, half);
right = input.Slice(i + half, remaining - half);
merge = output.Slice(i, remaining);
}
else
{
// the full stride.
left = input.Slice(i, half);
right = input.Slice(i + half, half);
merge = output.Slice(i, width);
}

// Now merge the left and right for this stride.
yield return Task.Run(() => Merge(left, right, merge, comparer));
}
}

private static void Merge<T>(
ReadOnlyMemory<T> leftMemory,
ReadOnlyMemory<T> rightMemory,
Memory<T> mergeMemory,
IComparer<T> comparer)
{
var left = leftMemory.Span;
var right = rightMemory.Span;
var merge = mergeMemory.Span;

//While either span has an element
for(int m = 0, l = 0, r= 0; l < left.Length || r < right.Length; m++)
{
if (l < left.Length && r < right.Length)
{
//both sides have elements
if (comparer.Compare(left[l], right[r]) <= 0)
{
// left is less than right
merge[m] = left[l];
l++;
}
else
{
// right is less than left
merge[m] = right[r];
r++;
}
}
else if (l < left.Length)
{
// only left has some left
merge[m] = left[l];
l++;
}
else
{
// only right has some left
merge[m] = right[r];
r++;
}
}
}
}

现在我想知道,要想让平行版本获胜,需要多大的投入?

最新更新