功能快速排序是否有可能比命令式快速排序运行得更快?



我正在尝试理解函数式编程,虽然代码看起来很漂亮,但我担心与命令式实现相比会降低性能。

但是,我非常惊讶地看到功能实现比我的命令式实现快得多(看起来很丑)。

现在,我确信我的命令式实现中存在一些错误,但是,我不确定这个错误是什么。

一些基准: 适用于 35 个元素的大小: 152954779纳秒

35 上的命令式: 198337749325纳秒

即使我在列表中添加 10 个元素,情况也会恶化

代码在 kotlin 中:

祈使的:

fun quickSort(numbers: IntArray, l: Int, r: Int): IntArray {
if (l >= r)
return numbers
fun swap(m: Int, n: Int) {
val temp = numbers[m]
numbers[m] = numbers[n]
numbers[n] = temp
}
var i = l + 1
var j = l + 1
val pivot = numbers[l]
while (j < r) {
if (numbers[j] < pivot) {
if (numbers[i] > pivot) {
swap(i, j)
}
i++
}
j++
}
swap(l, i - 1)
quickSort(numbers, 0, i - 1)
quickSort(numbers, i, r)
return numbers
}

我相信我可以重构并改进它,但这不是我现在的目标。

命令式 2:

fun partitionTest(arr: IntArray, left: Int, right: Int): Int {
var i = left
var j = right
var tmp: Int
val pivot = arr[(left + right) / 2]
while (i <= j) {
while (arr[i] < pivot)
i++
while (arr[j] > pivot)
j--
if (i <= j) {
tmp = arr[i]
arr[i] = arr[j]
arr[j] = tmp
i++
j--
}
}
return i
}

fun quickSortTest(arr: IntArray, left: Int, right: Int) {
val index = partitionTest(arr, left, right)
if (left < index - 1)
quickSort(arr, left, index - 1)
if (index < right)
quickSort(arr, index, right)
}

功能的:

fun functionalQuickSort(numbers: IntArray): IntArray {
return when {
numbers.size <= 1 -> numbers
else -> {
val pivotIndex = 0
functionalQuickSort(numbers.filter { it < numbers[pivotIndex] }.toIntArray()) + numbers[pivotIndex] + functionalQuickSort(
numbers.filter { it > numbers[pivotIndex] }.toIntArray()
)
}
}
}

主要:

val numbers = Random().ints(10).toArray()
var start = System.nanoTime()
functionalQuickSort(numbers).also { println(it.contentToString()) }
var end = System.nanoTime()
println("Took ${end - start}")

start = System.nanoTime()
quickSort(numbers,0,numbers.size).also { println(it.contentToString()) }
end = System.nanoTime()
println("Took ${end - start}")

我使用了一个已知良好的命令式快速排序算法,而不是你的算法,这对我来说看起来很糟糕。我的分区代码在结构上与您的不同,因为它使用 C.A.R. Hoare 的原始方案,而您的似乎使用的是 Lomuto 方案(因其简单性而流行,但不因其效率而流行)。

我还编写了处理大多数 JVM 微基准测试问题的代码。在这里:

import java.util.concurrent.ThreadLocalRandom
import kotlin.system.measureTimeMillis
const val PROBLEM_SIZE = 1_000_000L
fun quickSort(array: IntArray, lo: Int, hi: Int) {
if (lo >= hi) {
return
}
val p = partition(array, lo, hi)
quickSort(array, lo, p)
quickSort(array, p + 1, hi)
}
private fun partition(array: IntArray, lo: Int, hi: Int): Int {
val pivot = array[(lo + hi) / 2]
var i = lo - 1
var j = hi + 1
while (true) {
do {
i++
} while (array[i] < pivot)
do {
j--
} while (array[j] > pivot)
if (i >= j) {
return j
}
array[i] = array[j].also { array[j] = array[i] }
}
}
fun functionalQuickSort(numbers: IntArray): IntArray {
return when {
numbers.size <= 1 -> numbers
else -> {
val pivotIndex = 0
functionalQuickSort(numbers.filter { it < numbers[pivotIndex] }.toIntArray()) +
numbers[pivotIndex] +
functionalQuickSort(numbers.filter { it > numbers[pivotIndex] }.toIntArray()
)
}
}
}
fun main(args: Array<String>) {
benchmark("imperative", ::runImperativeQuickSort)
benchmark("functional", ::functionalQuickSort)
}
fun benchmark(name: String, block : (IntArray) -> IntArray) {
println("Warming up $name")
(1..4).forEach {
validate(block(randomInts()))
}
println("Measuring")
val average = (1..10).map {
var result: IntArray? = null
val input = randomInts()
val took = measureTimeMillis {
result = block(input)
}
validate(result!!)
took
}.average()
println("An average $name run took $average ms")
}
private fun runImperativeQuickSort(array: IntArray): IntArray {
quickSort(array, 0, array.size - 1)
return array
}
private fun randomInts() = ThreadLocalRandom.current().ints(PROBLEM_SIZE).toArray()
private fun validate(array: IntArray) {
var prev = array[0]
(1 until array.size).forEach {
array[it].also { curr ->
require(curr >= prev)
prev = curr
}
}
}

典型输出:

Warming up imperative
Measuring
An average imperative run took 106.6 ms
Warming up functional
Measuring
An average functional run took 537.4 ms

所以......不,功能版本不是更快。

我花了一段时间才找到它,但是您的递归调用中有一个错误:

quickSort(numbers, 0, i - 1)

这应该是:

quickSort(numbers, l, i - 1)
^

作为一个小的优化,您还可以在长度为 1(除了长度为 0)的段上提前返回:

if (l + 1 >= r)
return numbers

似乎还有一些我没有详细研究的问题。while循环中的嵌套if对我来说看起来很狡猾;我认为可以删除内部if

while (j < r) {
if (numbers[j] < pivot) {
swap(i, j)
i++
}
j++
}

仔细考虑你的不变量是什么,以及每个语句是否维护它们。

通过这些调整,命令式版本在 100000 个元素上的运行速度提高了大约 10 倍。

还要考虑如果两个元素相等会发生什么,这对于这么小的数组不太可能,但对于 100000 个元素的数组来说会发生(生日悖论)。在这种情况下,您会发现您的功能实现已损坏。


关于基准测试:

  • 确保您的输入足够大,可以获取数据而不是噪音。超过一秒的运行时间很好。
  • 不包括输入数据的生成。
  • 绝对不包括输出的打印。I/O 相对较慢。

最新更新