我目前正在玩Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用pooledMapConcurrentlyN
的代码,所以基本上是一个具有有限并发性的并行遍历版本。
我可以把我的例子简化为:
foo :: Sem r Int
foo = do
res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
pure $ sum res
action :: String -> Sem r Int
action = pure. length
这不会编译,因为没有MonadUnliftIO (Sem r)
的实例。当我使用traverse
时,它确实会编译,但我正在寻找一个并发版本。我不确定我现在该走哪条路。
我看到以下选项:
- 实现一个
MonadUnliftIO (Sem r)
实例。我看到在这个GitHub问题中有一些关于添加/实现这样一个实例的讨论。然而,我不清楚这样做是否是个好主意 - 使用
pooledMapConcurrentlyN
之外的其他东西,这给了我一个等效的行为。我知道par dual包中有parTraverse
,但这需要一个ParDual
实例。parallel
包也可以使解决方案成为可能,但我不熟悉,所以我不知道这是否可能 - 将平行导线测量建模为效果。我试过了,但没能实现效果。我尝试的效果定义如下:
data ParTraverse m a where
TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)
我对GADT和Polysemy都不太熟悉,所以我可能错过了一些明显的东西。
编辑:正如下面的答案所指出的,最合适的解决方案是将其建模为效果,并在效果解释中处理并发性,而不是业务逻辑。这意味着我正在寻找一种类似于上述ParTraverse
效应的高阶效应:
data ParTraverse m a where
TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)
makeSem ''ParTraverse
parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ case
TraverseP f ta -> do
_something
我不确定这个类型的签名是否正确(动作应该是a -> Sem r b
类型吗?traverse
的签名对m
有Applicative
约束,我该如何建模?(
关于ParTraverse
的实现,这是我在github上回答的,对于t
的[]
专用版本:
pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
...
data ParTraverse m a where
TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
makeSem ''ParTraverse
parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
interpretH case
TraverseP f ta -> do
taT <- traverse pureT ta
fT <- bindT f
tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
ins <- getInspectorT
pureT (catMaybes (inspect ins <$> catMaybes tb))
对interpretH
内部使用的组合子的一些解释,我们在Tactical
环境中操作:
- 由于我们处理的是函数
a -> m b
,其中m
在解释器内被实例化为Sem rInitial
,因此我们必须使用bindT
来获得类似于f a -> Sem r (f b)
的函数,f
是解释器的一元状态 - 我们不能直接在
Sem rInitial
上运行pooledMapConcurrently
,因为Member (Final IO)
只针对r
给出 ta
包含f
的输入,但由于我们将其提升为预期f a
,因此我们还必须使用traverse
对ta
的每个元素调用pureT
,因为它是一个单元操作bindT
(和runT
(产生的函数产生的Sem
s在头部仍然具有当前效果ParTraverse
,因为该效果必须在封装的Sem
(作为a -> m b
传入(中进行解释。这甚至允许对内部程序使用不同的解释器。在我们的例子中,我们只是对f
的结果再次运行parTraverseToIO
。之后,我们必须将这个Sem
提升回Tactical
环境中(这只是头部的另一个效果(,所以我们使用raise
- 由于我们提升的
f
会产生f (Maybe b)
,因此我们需要对此进行解包,以获得正确的返回类型。为此,我们可以使用检查器,它将f
转换为Maybe
,得到Maybe (Maybe b)
,然后我们可以将其展开为列表
为了完整起见,这里是pooledMapConcurrently
的实现,由KinghHomeless编写:
pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ s wv ins ->
(<$ s) <$> pooledMapConcurrentlyIO i (a -> ins <$> wv (f a <$ s)) t
然后我会尝试:
- 不要在业务逻辑中影响并发性
- 在解释器中使用pooledMapconcurrentlyIO+嵌入
所以你会有这样的
data GetThings m a where
GetThings :: [InfoToFetchThing] -> GetThings m [Thing]
runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret case
GetThings infos -> do
...
embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos
当然,你也可以自定义很多——用Traversable
代替列表,把<fetch-action>
作为参数传递给解释器,把你想要的线程数量作为参数传递到解释器,等等
编辑:由于要执行的操作也需要在Sem r中,而不是在IO中,因此您可以使用with WeavingToFinal(可能(从Sem r获得IO,如链接中所述。