正在等待使用TableDependency和F#加载数据库行



我有一个F#项目,它将一些文件加载到外部子系统,然后使用表依赖关系来等待将一些行添加到表中作为副作用。

表依赖关系在下面的类型中用于监视数据库更改。当添加/更改行时,它会触发一个自定义事件:

// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() = 
let mutable someId = ""
// this property name matches the name of the table column (SomeId)
member this.SomeId 
with get () = someId
and set (value) = someId <- value
// AccountLoadWatcher
type AccountLoadWatcher() = 
let mutable _tableDependency = null
let event = new Event<_>()
interface IDisposable with
member this.Dispose() = 
_tableDependency.Stop()
_tableDependency.Dispose()
// custom event we can send when an account is loaded
[<CLIEvent>]
member this.AccountLoaded = event.Publish
member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) = 
let accountLoaded = sender.Entity
event.Trigger(accountLoaded.SomeId)
member this.Watch() = 
_tableDependency <- DbLib.getTableDependency "dbo" "AccountTable" 
null
_tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
_tableDependency.Start()

我想做的是获取上面的对象,然后等待所有我关心的带有id的行被加载。到目前为止,我拥有的是:

let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let mutable collected = Set.empty
let isInSet id = Set.contains id idsToWaitFor
let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
let accountLoadedHandler id = 
collected <- collected.Add id
printfn "Id loaded %s, waiting for %An" id (Set.difference idsToWaitFor collected)
loadToSubsystem csvFileRows |> ignore
// wait for all the watcher events; filtering each event object for ids we care about
watcher.AccountLoaded
|> Observable.takeWhile (fun _ -> notDone) 
|> Observable.filter (fun e -> isInSet e) 
|> Observable.subscribe accountLoadedHandler
|> ignore
doMoreWork()

但这只是继续做更多的工作,而不需要等待上面我需要的所有事件。

我需要使用任务还是异步?F#代理商?

假设您在示例中使用Observable.takeWhile,我假设您使用FSharp.Control.Rreactive包装来访问所有的反应组合子。

你的方法有一些好主意,比如使用takeWhile等待,直到你收集到所有ID,但使用突变是非常不幸的——由于可能的种族条件,这样做甚至可能不安全。

一个不错的选择是使用各种scan函数中的一个来收集事件发生时的状态。您可以使用Observable.scanInit以空集开始并添加所有ID;然后是Observable.takeWhile,以继续接受事件,直到您拥有所有正在等待的ID。要真正等待(并阻止(,可以使用Observable.wait。类似这样的东西:

let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let finalCollectedIDs = 
watcher.AccountLoaded
|> Observable.scanInit Set.empty (fun collected id -> Set.add id collected) 
|> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
|> Observable.wait
printfn "Completed. Final collected IDs are: %A" finalCollectedIDs 

相关内容

  • 没有找到相关文章

最新更新