这是vb.net中的一个函数参与者吗



我一直在努力寻找一种方法来控制一个理想情况下允许并发的程序。我最终决定演员是我的最佳选择。虽然vb.net有TDF库,但它明确表示它是用于粗粒度操作的。我的大多数操作都相当精细。因此,我最终决定使用以下代码:

Imports System.Collections.Concurrent
Public Class LightActor
Private ReadOnly MessageQueue As New BlockingCollection(Of MessageBase)
Private Started As Boolean = False
Private Dirty As Boolean = False
Private RunningTask As Task
Public Sub Post(msg As MessageBase)
    MessageQueue.Add(msg)
End Sub
Public Sub StartProcessing()
    SyncLock MessageQueue
        If Not Started Then
            RunningTask = Task.Factory.StartNew(AddressOf Process)
            Started = True
        End If
    End SyncLock
End Sub
Private Sub Process()
    If MessageQueue.Count > 0 Then
        Dirty = True
        ProcessMessage(MessageQueue.Take)
    Else
        If Dirty Then
            Dirty = False
            ProcessEmpty()
        Else
            Dirty = True
            ProcessMessage(MessageQueue.Take)
        End If
    End If
    RunningTask = RunningTask.ContinueWith(AddressOf Process)
End Sub
Protected Overridable Sub ProcessEmpty()
    Console.WriteLine("empty")
End Sub
Protected Overridable Sub ProcessMessage(msg As MessageBase)
    Console.WriteLine(msg.Signal.ToString)
End Sub
End Class

我的问题是,只要我注意不要使用共享变量,并将外部通信限制为Post方法,一次只处理一条消息是否安全?

编辑:另外,我可以制作大量这样的线程吗?或者它们会阻塞所有线程池线程吗?如果是的话,有办法解决这个问题吗?

第二版:它们阻塞了线程,所以我想问题是如何在不阻塞线程的情况下使其发挥作用。

经过进一步的测试,我确定Dataflow库比我想象的要轻得多。我可以很容易地实例化100000个演员,并向每个演员发布一条消息,然后他们再向另一个演员发布消息。在我的机器上,200000条帖子在大约184毫秒内发生。在可预见的未来,这对我来说已经足够了。我曾经使用过以下三个类,以防其他人正在寻找一些相当裸露的参与者(NotifyingObject只是一个实现INotifyPropertyChanged的基类):

Public Class MessageBase
    Public ReadOnly Signal As eSignal
    Public Sub New(Signal As eSignal)
        Me.Signal = Signal
    End Sub
End Class
Public Class Actor
    Inherits NotifyingObject
    Public Sub New()
        _MessageQueue = New ActionBlock(Of MessageBase)(AddressOf Process)
    End Sub
    Public ReadOnly Property MessageQueue As ITargetBlock(Of MessageBase)
        Get
            Return _MessageQueue
        End Get
    End Property
    Protected _MessageQueue As ActionBlock(Of MessageBase)
    Public ReadOnly Property isQueueEmpty As Boolean
        Get
            Return Me._MessageQueue.InputCount = 0
        End Get
    End Property
    Public Sub Post(message As MessageBase)
        _MessageQueue.Post(message)
    End Sub
    Protected Overridable Sub Process(message As MessageBase)
    End Sub
End Class
Public Class ActsOnEmpty
    Inherits Actor
    Public Sub New()
        _MessageQueue = New ActionBlock(Of MessageBase)(AddressOf ProcessOnEmpty)
    End Sub
    Protected Overrides Sub Process(message As MessageBase)
    End Sub
    Private Sub ProcessOnEmpty(message As MessageBase)
        Process(message)
        If isQueueEmpty Then
            ProcessEmpty()
        End If
    End Sub
    Protected Overridable Sub ProcessEmpty()
    End Sub
End Class

最新更新