单体变压器中的分布式进程



我正在为所谓的cloud-haskell实现一个基于八卦的集群成员资格后端,还是Distributed.Process..无论如何,我试图在没有iorefMVars的情况下摆脱处理状态,而是使用状态转换器并将Processmonad 放在底部,如下所示:

type ClusterT = StateT ClusterState
type Cluster a = ClusterT Process a

这在使用Control.Distributed.Process.Lifted(https://hackage.haskell.org/package/distributed-process-lifted)时效果很好,允许您执行以下操作:

mystatefulcomp :: Cluster ()
mystatefulcomp = do
msg <- expect :: Cluster String
old_state <- get
say $ "My old state was " ++ (show old_state)
put $ modifyState curr_state msg
mystatefulcomp
main = do
Right transport <- createTransport '127.0.0.1' '3000' (n -> ('127.0.0.1', n) defaultTCPParameters
node <- newLocalNode transport initRemoteTable
runProcess node (evalStateT mystatefulcomp initialstate)
where initialstate = ClusterState.empty

这工作得很好,允许我很好地构建我的程序,我可以保持我的状态功能并在Clustermonad 中运行它。

当我尝试使用receiveWaitmatch接收消息时,这一切都会中断。

让我们重写statefulcomp以使用receiveWait执行其他操作

doSomethingWithString :: String -> Cluster ()
doSomethingWithString str = do
s < get
put $ modifyState s str    
mystatefulcomp :: Cluster ()
mystatefulcomp = do
old_state <- get
receiveWait [ match doSomthingWithString ]
new_state <- get
say $ "old state " ++ (show old_state) ++ " new " ++ (show new_state)

这不起作用,因为match函数是(a -> Process b) -> Match b类型,但我们希望它是(a -> Cluster b) -> Match b类型。这就是我如履薄冰的地方。据我了解Control.Distributed.Process.LiftedrexposControl.Distributed.Process函数提升到 tansformer 堆栈中,允许您使用expectsay等函数,但不重新置入matchmatchIf等。

我真的很努力,试图找到一种解决方法或一种方法,以MonadProcess m => (a -> m b) -> Match b的形式重新实施match及其朋友。

任何见解都是可取的。

编辑

所以在 som 摆弄之后,我想出了以下内容

doSomethingWithString :: String -> Cluster ()
doSomethingWithString str = do
s < get
put $ modifyState s str
doSomethingWithInt :: Int -> Cluster ()
...
mystatefulcomp :: Cluster ()
mystatefulcomp = do
old_state <- get
id =<< receiveWait [ match $ return . doSomethingWithString
, match $ return . doSomethingWithInt ]
new_state <- get
say $ "old state " ++ (show old_state) ++ " new " ++ (show new_state)

这工作得很好,但我仍然很好奇这有多好的设计

正如Michael Snoyman在一系列博客文章(即5个链接)中指出的那样,将StateT包裹在IO周围是一个坏主意。你只是偶然发现了一个浮出水面的实例。

mystatefulcomp :: Cluster ()
mystatefulcomp = do
old_state <- get
receiveWait [ match doSomethingWithString ]
new_state <- get

问题是如果doSomethingWithString抛出错误,最终会new_state什么。old_state?异常前doSomethingWithString的一些中间状态?您会看到,我们想知道的事实使这种方法不亚于仅将状态存储在IORefMVar中。

除了有问题的语义之外,如果不重写distributed-process以在任何地方使用MonadBaseControl,这甚至无法实现。这正是distributed-process-lifted无法交付的原因,因为它只是包裹了distributed-process的原语。

所以,我在这里要做的是传递一个data Config = Config { clusterState :: MVar ClusterState }环境(哦,看,Process也这样做了!可能与ReaderT以理智的方式与IO交互,此外,您可以轻松地将任意数量的嵌套Process事件提升到ReaderT Config Process自己。

重复Michael博客文章的信息:StateT总体上还不错(也就是说,在纯变压器堆栈中),只是对于我们以某种方式包装IO的情况。我鼓励你阅读这些帖子,它们对我来说非常鼓舞人心,所以它们又来了:

  1. https://www.fpcomplete.com/blog/2017/06/readert-design-pattern
  2. https://www.fpcomplete.com/blog/2017/06/understanding-resourcet
  3. https://www.fpcomplete.com/blog/2017/06/tale-of-two-brackets
  4. https://www.fpcomplete.com/blog/2017/07/announcing-new-unliftio-library
  5. https://www.fpcomplete.com/blog/2017/07/the-rio-monad

最新更新