并行执行Combine publisher创建竞争条件



我试图在HKWorkoutEvent定义的时间间隔内查询HealthKit的心率值和步骤,以填充我定义的用于存储多个变量的自定义本地模型,它在下面定义。

struct SGWorkoutEvent: Identifiable {
let id = UUID()
let type: HKWorkoutEventType
let splitActiveDurationQuantity: HKQuantity?
let splitDistanceQuantity: HKQuantity?
let totalDistanceQuantity: HKQuantity?
let splitMeasuringSystem: HKUnit
let steps: HKQuantity?
let heartRate: HKQuantity?
}

可以从HKWorkoutEvent中提取除stepsheartRate以外的Al性质。但是,我正在尝试构建一个组合管道,它可以让我创建一个出版商数组,以便并行查询心率、步数和传递锻炼事件,因此在sink中,我收到一个具有这些值的3元素元组,因此我可以填充上面的模型。我现在看到的是

// Extract the workout's segments (defined automatically by an Apple Watch)
let workoutSegments = (workout.workoutEvents ?? []).filter({ $0.type == .segment })
// For each of the workout segments defined above create a HKStatisticQuery that starts on the interval's
// beginning and ends on the interval's end so the HealthKit query is properly defined to be
// executed between that interval.
let segmentsWorkoutPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
}))
.assertNoFailure()
// Do the same logic as above in `segmentsWorkoutPublisher` but for steps
let stepsPublisher = Publishers.MergeMany(workoutSegments.map({ $0.dateInterval }).map({
healthStore.statistic(for: HKObjectType.quantityType(forIdentifier: HKQuantityTypeIdentifier.stepCount)!, with: .cumulativeSum, from: $0.start, to: $0.end)
}))
.assertNoFailure()
Publishers.Zip3(workoutSegments.publisher, stepsPublisher, segmentsWorkoutPublisher)

.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in

let d = SGWorkoutEvent(type: pace.type,
splitActiveDurationQuantity: pace.splitDuration,
splitDistanceQuantity: pace.splitDistance,
totalDistanceQuantity: pace.totalDistanceQuantity,
splitMeasuringSystem: pace.splitMeasuringSystem,
steps: steps.sumQuantity(),
heartRate: hrs.averageQuantity())

self.paces.append(d)
})
.store(in: &bag)

HKHealthStore.statistic(for:...)只是HKStatisticsQueryHKHealthStore扩展上定义的一个组合包装器,见下文。

public func statistic(for type: HKQuantityType, with options: HKStatisticsOptions, from startDate: Date, to endDate: Date, _ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {

let subject = PassthroughSubject<HKStatistics, Error>()

let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])

let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in

guard error == nil else {
hkCombineLogger.error("Error fetching statistics (error!.localizedDescription)")
return
}

subject.send(statistics!)
subject.send(completion: .finished)
})

self.execute(query)

return subject.eraseToAnyPublisher()
}

我在这里看到的是某种竞争条件,其中步数和心率检索没有同时返回。因此,我看到了一些没有意义的值,比如一次5英尺200步的1K分割和另一次相同持续时间的700步。实际情况应该是,这两个间隔应该显示150左右的值,但似乎我可能没有使用正确的组合操作符。

我希望看到的预期行为是Publishers.Zip上的每个发布者都有每个3-item元组按顺序(第一个间隔,第二个间隔…)完成查询,而不是这种不可复制的竞争条件。

为了尝试提供更多的上下文,我认为这类似于拥有一个具有不同时间戳的温度,湿度和降雨机会的模型,并查询三个不同的API端点以检索三个不同的值并将它们合并到模型中。

这里有很多东西要解开,但我要试一试。让我们从HKHealthStore.statistic函数开始。您希望运行一个(可能是异步的)查询,发布一个具有单个结果的序列,然后结束。这似乎真的是使用Future的理想情况。我没有HealthKit的任何经验,我不能保证这将编译,但转换可能看起来像这样:

public func statistic(
for type: HKQuantityType,
with options: HKStatisticsOptions,
from startDate: Date,
to endDate: Date,
_ limit: Int = HKObjectQueryNoLimit) -> AnyPublisher<HKStatistics, Error> {
let future = Future<HKStatistics, Error> {
fulfillPromise in
let predicate = HKStatisticsQuery.predicateForSamples(withStart: startDate, end: endDate, options: [.strictEndDate, .strictStartDate])
let query = HKStatisticsQuery(quantityType: type, quantitySamplePredicate: predicate, options: options, completionHandler: { (query, statistics, error) in
guard error == nil else {
hkCombineLogger.error("Error fetching statistics (error!.localizedDescription)")
fulfillPromise(.failure(error!))
}
fulfillPromise(.success(statistics!))
})
self.execute(query)
}
return future.eraseToAnyPublisher()
}

现在我们有了" one shot "运行查询并在有值时触发的发布者。

现在让我们看看你的segmentsWorkoutPublisher(以及扩展stepsPublisher)。

在使用Combine时,如果你发现自己使用Publisher.<SomeOperatorType>构造函数,你应该非常小心。根据我的经验,这样做很少是正确的。(话虽如此,后来使用Zip3对我来说似乎没问题)。

在这种情况下,您正在创建Publishers(您的Futures)的序列。但是你真的对Publishers序列不感兴趣。你感兴趣的序列值Publishers产生。在某种意义上,你想要"解开"每个Publisher(通过等待其值),并将这些结果发送到序列中。这正是flatMap的目的!让我们这样做:

let segmentsWorkoutPublisher =
workoutSegments
.map { $0.dateInterval }
.flatMap {
healthStore.statistic(for: HKQuantityType.quantityType(forIdentifier: HKQuantityTypeIdentifier.heartRate)!, with: .discreteAverage, from: $0.start, to: $0.end)
}
.assertNoFailure()

这将生成一个序列字符串,但随后等待每个序列发出一个值,并将值发送到更远的地方。

stepsPublisher也会以类似的方式改变。

我想这会让你到达你需要去的地方。作为观察这一点的一部分,我创建了一个Playground,在那里我重新设计了你的例子,但使用了更简化的类型。下次遇到类似的问题时,您可能会尝试类似的方法——过滤掉多余的细节,并尝试创建一个更简单的示例。如果你能像这样把你的代码放在操场上,编译起来就不会有太多麻烦,回答问题也会更容易。操场上:

import Foundation
import Combine
import PlaygroundSupport
enum MockEventType : CaseIterable {
case segment
case notSegment
}
struct MockSegment {
let type : MockEventType
let dateInterval : DateInterval = DateInterval.init(start: Date.now, duration: 3600)
}
func statistic() -> AnyPublisher<Float, Never> {
let future = Future<Float, Never>() {
fulfillPromise in
DispatchQueue.global(qos: .background).async {
sleep(UInt32.random(in: 1...3))
fulfillPromise(.success(Float.random(in: 100.0...150.0)))
}
}
return future
.eraseToAnyPublisher()
}
// Generate an endless stream of mock events.
let rawWorkouts = Timer.publish(every: 1.0, on: .current, in: .common)
.autoconnect()
.map{ _ in MockSegment(type: MockEventType.allCases.randomElement()!) }
let workoutSegments = rawWorkouts.filter { $0.type == .segment }
let dateIntervals =
workoutSegments
.map { $0.dateInterval }
let segmentsWorkoutPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
let stepsPublisher =
dateIntervals
.flatMap { _ in statistic() }
.assertNoFailure()
var bag = Set<AnyCancellable>()
Publishers.Zip3(workoutSegments, stepsPublisher, segmentsWorkoutPublisher)
.receive(on: DispatchQueue.main)
.sink(receiveValue: { pace, steps, hrs in
print("pace: (pace) steps: (steps), hrs: (hrs)")
})
.store(in: &bag)
PlaygroundSupport.PlaygroundPage.current.needsIndefiniteExecution = true

相关内容

  • 没有找到相关文章

最新更新