我正在尝试理解函数式编程,虽然代码看起来很漂亮,但我担心与命令式实现相比会降低性能。
但是,我非常惊讶地看到功能实现比我的命令式实现快得多(看起来很丑)。
现在,我确信我的命令式实现中存在一些错误,但是,我不确定这个错误是什么。
一些基准: 适用于 35 个元素的大小: 152954779纳秒
35 上的命令式: 198337749325纳秒
即使我在列表中添加 10 个元素,情况也会恶化
代码在 kotlin 中:
祈使的:
fun quickSort(numbers: IntArray, l: Int, r: Int): IntArray {
if (l >= r)
return numbers
fun swap(m: Int, n: Int) {
val temp = numbers[m]
numbers[m] = numbers[n]
numbers[n] = temp
}
var i = l + 1
var j = l + 1
val pivot = numbers[l]
while (j < r) {
if (numbers[j] < pivot) {
if (numbers[i] > pivot) {
swap(i, j)
}
i++
}
j++
}
swap(l, i - 1)
quickSort(numbers, 0, i - 1)
quickSort(numbers, i, r)
return numbers
}
我相信我可以重构并改进它,但这不是我现在的目标。
命令式 2:
fun partitionTest(arr: IntArray, left: Int, right: Int): Int {
var i = left
var j = right
var tmp: Int
val pivot = arr[(left + right) / 2]
while (i <= j) {
while (arr[i] < pivot)
i++
while (arr[j] > pivot)
j--
if (i <= j) {
tmp = arr[i]
arr[i] = arr[j]
arr[j] = tmp
i++
j--
}
}
return i
}
fun quickSortTest(arr: IntArray, left: Int, right: Int) {
val index = partitionTest(arr, left, right)
if (left < index - 1)
quickSort(arr, left, index - 1)
if (index < right)
quickSort(arr, index, right)
}
功能的:
fun functionalQuickSort(numbers: IntArray): IntArray {
return when {
numbers.size <= 1 -> numbers
else -> {
val pivotIndex = 0
functionalQuickSort(numbers.filter { it < numbers[pivotIndex] }.toIntArray()) + numbers[pivotIndex] + functionalQuickSort(
numbers.filter { it > numbers[pivotIndex] }.toIntArray()
)
}
}
}
主要:
val numbers = Random().ints(10).toArray()
var start = System.nanoTime()
functionalQuickSort(numbers).also { println(it.contentToString()) }
var end = System.nanoTime()
println("Took ${end - start}")
start = System.nanoTime()
quickSort(numbers,0,numbers.size).also { println(it.contentToString()) }
end = System.nanoTime()
println("Took ${end - start}")
我使用了一个已知良好的命令式快速排序算法,而不是你的算法,这对我来说看起来很糟糕。我的分区代码在结构上与您的不同,因为它使用 C.A.R. Hoare 的原始方案,而您的似乎使用的是 Lomuto 方案(因其简单性而流行,但不因其效率而流行)。
我还编写了处理大多数 JVM 微基准测试问题的代码。在这里:
import java.util.concurrent.ThreadLocalRandom
import kotlin.system.measureTimeMillis
const val PROBLEM_SIZE = 1_000_000L
fun quickSort(array: IntArray, lo: Int, hi: Int) {
if (lo >= hi) {
return
}
val p = partition(array, lo, hi)
quickSort(array, lo, p)
quickSort(array, p + 1, hi)
}
private fun partition(array: IntArray, lo: Int, hi: Int): Int {
val pivot = array[(lo + hi) / 2]
var i = lo - 1
var j = hi + 1
while (true) {
do {
i++
} while (array[i] < pivot)
do {
j--
} while (array[j] > pivot)
if (i >= j) {
return j
}
array[i] = array[j].also { array[j] = array[i] }
}
}
fun functionalQuickSort(numbers: IntArray): IntArray {
return when {
numbers.size <= 1 -> numbers
else -> {
val pivotIndex = 0
functionalQuickSort(numbers.filter { it < numbers[pivotIndex] }.toIntArray()) +
numbers[pivotIndex] +
functionalQuickSort(numbers.filter { it > numbers[pivotIndex] }.toIntArray()
)
}
}
}
fun main(args: Array<String>) {
benchmark("imperative", ::runImperativeQuickSort)
benchmark("functional", ::functionalQuickSort)
}
fun benchmark(name: String, block : (IntArray) -> IntArray) {
println("Warming up $name")
(1..4).forEach {
validate(block(randomInts()))
}
println("Measuring")
val average = (1..10).map {
var result: IntArray? = null
val input = randomInts()
val took = measureTimeMillis {
result = block(input)
}
validate(result!!)
took
}.average()
println("An average $name run took $average ms")
}
private fun runImperativeQuickSort(array: IntArray): IntArray {
quickSort(array, 0, array.size - 1)
return array
}
private fun randomInts() = ThreadLocalRandom.current().ints(PROBLEM_SIZE).toArray()
private fun validate(array: IntArray) {
var prev = array[0]
(1 until array.size).forEach {
array[it].also { curr ->
require(curr >= prev)
prev = curr
}
}
}
典型输出:
Warming up imperative
Measuring
An average imperative run took 106.6 ms
Warming up functional
Measuring
An average functional run took 537.4 ms
所以......不,功能版本不是更快。
我花了一段时间才找到它,但是您的递归调用中有一个错误:
quickSort(numbers, 0, i - 1)
这应该是:
quickSort(numbers, l, i - 1)
^
作为一个小的优化,您还可以在长度为 1(除了长度为 0)的段上提前返回:
if (l + 1 >= r)
return numbers
似乎还有一些我没有详细研究的问题。while
循环中的嵌套if
对我来说看起来很狡猾;我认为可以删除内部if
:
while (j < r) {
if (numbers[j] < pivot) {
swap(i, j)
i++
}
j++
}
仔细考虑你的不变量是什么,以及每个语句是否维护它们。
通过这些调整,命令式版本在 100000 个元素上的运行速度提高了大约 10 倍。
还要考虑如果两个元素相等会发生什么,这对于这么小的数组不太可能,但对于 100000 个元素的数组来说会发生(生日悖论)。在这种情况下,您会发现您的功能实现已损坏。
关于基准测试:
- 确保您的输入足够大,可以获取数据而不是噪音。超过一秒的运行时间很好。
- 不包括输入数据的生成。
- 绝对不包括输出的打印。I/O 相对较慢。