Swift Combine:如何从发布者列表中创建单个发布者



使用Apple的新组合框架,我想从列表中的每个元素中提出多个请求。然后,我想要减少所有响应的单一结果。基本上,我想从发布者列表转到保留响应列表的单个发布者。

我已经尝试列出发布者列表,但是我不知道如何将该列表减少到单个发布者中。而且我尝试制作包含列表的发布者,但我无法将发布者列出列表。

请查看" CreateRedients"功能

func createIngredient(ingredient: Ingredient) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    return apollo.performPub(mutation: CreateIngredientMutation(name: ingredient.name, optionalProduct: ingredient.productId, quantity: ingredient.quantity, unit: ingredient.unit))
            .eraseToAnyPublisher()
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    // first attempt
    let results = ingredients
            .map(createIngredient)
    // results = [AnyPublisher<CreateIngredientMutation.Data, Error>]
    // second attempt
    return Publishers.Just(ingredients)
            .eraseToAnyPublisher()
            .flatMap { (list: [Ingredient]) -> Publisher<[CreateIngredientMutation.Data], Error> in
                return list.map(createIngredient) // [AnyPublisher<CreateIngredientMutation.Data, Error>]
            }
}

我不确定如何将出版商数组转换为包含数组的发布者。

类型'[AnyPublisher]'的结果值不符合关闭结果类型'Publisher'

本质上,在您的特定情况下,您正在考虑这样的东西:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    Publishers.MergeMany(ingredients.map(createIngredient(ingredient:)))
        .collect()
        .eraseToAnyPublisher()
}

此"收集"上游发布者产生的所有元素,一旦完成后,它们就会产生一个带有所有结果的数组,并最终完成了自己。

请记住,如果上游发布者之一失败或产生多个结果 - 元素的数量可能与订户数量不符,因此您可能需要其他操作员来减轻此情况。p>更通用的答案,通过一种方法可以使用Entwinest框架进行测试:

import XCTest
import Combine
import EntwineTest
final class MyTests: XCTestCase {
    
    func testCreateArrayFromArrayOfPublishers() {
        typealias SimplePublisher = Just<Int>
        // we'll create our 'list of publishers' here. Each publisher emits a single
        // Int and then completes successfully – using the `Just` publisher.
        let publishers: [SimplePublisher] = [
            SimplePublisher(1),
            SimplePublisher(2),
            SimplePublisher(3),
        ]
        // we'll turn our array of publishers into a single merged publisher
        let publisherOfPublishers = Publishers.MergeMany(publishers)
        // Then we `collect` all the individual publisher elements results into
        // a single array
        let finalPublisher = publisherOfPublishers.collect()
        // Let's test what we expect to happen, will happen.
        // We'll create a scheduler to run our test on
        let testScheduler = TestScheduler()
        // Then we'll start a test. Our test will subscribe to our publisher
        // at a virtual time of 200, and cancel the subscription at 900
        let testableSubscriber = testScheduler.start { finalPublisher }
        // we're expecting that, immediately upon subscription, our results will
        // arrive. This is because we're using `just` type publishers which
        // dispatch their contents as soon as they're subscribed to
        XCTAssertEqual(testableSubscriber.recordedOutput, [
            (200, .subscription),            // we're expecting to subscribe at 200
            (200, .input([1, 2, 3])),        // then receive an array of results immediately
            (200, .completion(.finished)),   // the `collect` operator finishes immediately after completion
        ])
    }
}

我认为Publishers.MergeMany在这里可能有帮助。在您的示例中,您可能会这样使用:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).eraseToAnyPublisher()
}

将为您提供一个向您发送Output的单个值的发布者。

但是,如果您在所有发布者完成的结尾都要一次在数组中特别想要Output,则可以将collect()MergeMany一起使用:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<[CreateIngredientMutation.Data], Error> {
    let publishers = ingredients.map(createIngredient(ingredient:))
    return Publishers.MergeMany(publishers).collect().eraseToAnyPublisher()
}

以及以上任何一个示例,您都可以简化为一行,即:

func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    Publishers.MergeMany(ingredients.map(createIngredient(ingredient:))).eraseToAnyPublisher()
}

您还可以在Sequence上定义自己的自定义merge()扩展方法,并使用它稍微简化代码:

extension Sequence where Element: Publisher {
    func merge() -> Publishers.MergeMany<Element> {
        Publishers.MergeMany(self)
    }
}
func createIngredients(ingredients: [Ingredient]) -> AnyPublisher<CreateIngredientMutation.Data, Error> {
    ingredients.map(createIngredient).merge().eraseToAnyPublisher()
}

要棘手地添加答案,这是一种保留数组中元素顺序的解决方案。它通过整个链条通过每个元素的索引,并按索引对收集的数组进行分类。

复杂性应该是O(n log n(,因为分类。

import Combine
extension Publishers {
    private struct EnumeratedElement<T> {
        let index: Int
        let element: T
        init(index: Int, element: T) {
            self.index = index
            self.element = element
        }
        init(_ enumeratedSequence: EnumeratedSequence<[T]>.Iterator.Element) {
            index = enumeratedSequence.offset
            element = enumeratedSequence.element
        }
    }
    static func mergeMappedRetainingOrder<InputType, OutputType>(
        _ inputArray: [InputType],
        mapTransform: (InputType) -> AnyPublisher<OutputType, Error>
    ) -> AnyPublisher<[OutputType], Error> {
        let enumeratedInputArray = inputArray.enumerated().map(EnumeratedElement.init)
        let enumeratedMapTransform: (EnumeratedElement<InputType>) -> AnyPublisher<EnumeratedElement<OutputType>, Error> = { enumeratedInput in
            mapTransform(enumeratedInput.element)
                .map { EnumeratedElement(index: enumeratedInput.index, element: $0)}
                .eraseToAnyPublisher()
        }
        let sortEnumeratedOutputArrayByIndex: ([EnumeratedElement<OutputType>]) -> [EnumeratedElement<OutputType>] = { enumeratedOutputArray in
            enumeratedOutputArray.sorted { $0.index < $1.index }
        }
        let transformToNonEnumeratedArray: ([EnumeratedElement<OutputType>]) -> [OutputType] = {
            $0.map { $0.element }
        }
        return Publishers.MergeMany(enumeratedInputArray.map(enumeratedMapTransform))
            .collect()
            .map(sortEnumeratedOutputArrayByIndex)
            .map(transformToNonEnumeratedArray)
            .eraseToAnyPublisher()
    }
}

解决方案的单元测试:

import XCTest
import Combine
final class PublishersExtensionsTests: XCTestCase {
    // MARK: - Private properties
    private var cancellables = Set<AnyCancellable>()
    // MARK: - Tests
    func test_mergeMappedRetainingOrder() {
        let expectation = expectation(description: "mergeMappedRetainingOrder publisher")
        let numbers = (1...100).map { _ in Int.random(in: 1...3) }
        let mapTransform: (Int) -> AnyPublisher<Int, Error> = {
            let delayTimeInterval = RunLoop.SchedulerTimeType.Stride(Double($0))
            return Just($0)
                .delay(for: delayTimeInterval, scheduler: RunLoop.main)
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        }
        let resultNumbersPublisher = Publishers.mergeMappedRetainingOrder(numbers, mapTransform: mapTransform)
        resultNumbersPublisher.sink(receiveCompletion: { _ in }, receiveValue: { resultNumbers in
            XCTAssertTrue(numbers == resultNumbers)
            expectation.fulfill()
         }).store(in: &cancellables)
        waitForExpectations(timeout: 5)
    }
}

您可以一行进行:

.flatMap(Publishers.Sequence.init(sequence:))

最新更新