在多义词中使用pooledMapConcurrentlyN



我目前正在玩Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用pooledMapConcurrentlyN的代码,所以基本上是一个具有有限并发性的并行遍历版本。

我可以把我的例子简化为:

foo :: Sem r Int
foo = do
res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
pure $ sum res
action :: String -> Sem r Int 
action = pure. length

这不会编译,因为没有MonadUnliftIO (Sem r)的实例。当我使用traverse时,它确实会编译,但我正在寻找一个并发版本。我不确定我现在该走哪条路。

我看到以下选项:

  • 实现一个MonadUnliftIO (Sem r)实例。我看到在这个GitHub问题中有一些关于添加/实现这样一个实例的讨论。然而,我不清楚这样做是否是个好主意
  • 使用pooledMapConcurrentlyN之外的其他东西,这给了我一个等效的行为。我知道par dual包中有parTraverse,但这需要一个ParDual实例。parallel包也可以使解决方案成为可能,但我不熟悉,所以我不知道这是否可能
  • 将平行导线测量建模为效果。我试过了,但没能实现效果。我尝试的效果定义如下:
data ParTraverse m a where
TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

我对GADT和Polysemy都不太熟悉,所以我可能错过了一些明显的东西。


编辑:正如下面的答案所指出的,最合适的解决方案是将其建模为效果,并在效果解释中处理并发性,而不是业务逻辑。这意味着我正在寻找一种类似于上述ParTraverse效应的高阶效应:

data ParTraverse m a where
TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)
makeSem ''ParTraverse
parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ case
TraverseP f ta -> do
_something

我不确定这个类型的签名是否正确(动作应该是a -> Sem r b类型吗?traverse的签名对mApplicative约束,我该如何建模?(

关于ParTraverse的实现,这是我在github上回答的,对于t[]专用版本:

pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
...
data ParTraverse m a where
TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]
makeSem ''ParTraverse
parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
interpretH case
TraverseP f ta -> do
taT <- traverse pureT ta
fT <- bindT f
tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
ins <- getInspectorT
pureT (catMaybes (inspect ins <$> catMaybes tb))

interpretH内部使用的组合子的一些解释,我们在Tactical环境中操作:

  • 由于我们处理的是函数a -> m b,其中m在解释器内被实例化为Sem rInitial,因此我们必须使用bindT来获得类似于f a -> Sem r (f b)的函数,f是解释器的一元状态
  • 我们不能直接在Sem rInitial上运行pooledMapConcurrently,因为Member (Final IO)只针对r给出
  • ta包含f的输入,但由于我们将其提升为预期f a,因此我们还必须使用traverseta的每个元素调用pureT,因为它是一个单元操作
  • bindT(和runT(产生的函数产生的Sems在头部仍然具有当前效果ParTraverse,因为该效果必须在封装的Sem(作为a -> m b传入(中进行解释。这甚至允许对内部程序使用不同的解释器。在我们的例子中,我们只是对f的结果再次运行parTraverseToIO。之后,我们必须将这个Sem提升回Tactical环境中(这只是头部的另一个效果(,所以我们使用raise
  • 由于我们提升的f会产生f (Maybe b),因此我们需要对此进行解包,以获得正确的返回类型。为此,我们可以使用检查器,它将f转换为Maybe,得到Maybe (Maybe b),然后我们可以将其展开为列表

为了完整起见,这里是pooledMapConcurrently的实现,由KinghHomeless编写:

pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ s wv ins ->
(<$ s) <$> pooledMapConcurrentlyIO i (a -> ins <$> wv (f a <$ s)) t

然后我会尝试:

  1. 不要在业务逻辑中影响并发性
  2. 在解释器中使用pooledMapconcurrentlyIO+嵌入

所以你会有这样的

data GetThings m a where
GetThings :: [InfoToFetchThing] -> GetThings m [Thing]
runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret case
GetThings infos -> do
...
embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos

当然,你也可以自定义很多——用Traversable代替列表,把<fetch-action>作为参数传递给解释器,把你想要的线程数量作为参数传递到解释器,等等

编辑:由于要执行的操作也需要在Sem r中,而不是在IO中,因此您可以使用with WeavingToFinal(可能(从Sem r获得IO,如链接中所述。

相关内容

  • 没有找到相关文章

最新更新