在启动期间加载读取文件的数据,然后处理新文件并从映射中清除旧状态



我正在做一个项目,在启动期间我需要读取某些文件并将其存储在映射中的内存中,然后定期查找新文件(如果有),然后在启动期间替换我在地图内存中的任何内容。基本上每次如果有一个full state的新文件,那么我想将内存映射对象刷新到这个新对象,而不是附加到它。

下面的方法loadAtStartupAndProcessNewChanges在服务器启动期间调用,该方法读取文件并将数据存储在内存中。此外,它还启动一个 go-routinedetectNewFiles,定期检查是否有任何新文件并将其存储在deltaChan通道上,稍后由另一个 go-routineprocessNewFiles访问该通道以再次读取该新文件并将数据存储在同一映射中。如果有任何错误,我们会将其存储在err通道上。loadFiles是将读取内存中的文件并将其存储在map中的函数。

type customerConfig struct {
deltaChan   chan string
err         chan error
wg          sync.WaitGroup
data        *cmap.ConcurrentMap
}
// this is called during server startup.
func (r *customerConfig) loadAtStartupAndProcessNewChanges() error {
path, err := r.GetPath("...", "....")
if err != nil {
return err
}
r.wg.Add(1)
go r.detectNewFiles(path)
err = r.loadFiles(4, path)
if err != nil {
return err
}
r.wg.Add(1)
go r.processNewFiles()
return nil
}

此方法基本上可以确定是否有任何新文件需要消耗,如果有,那么它会将其放在deltaChan通道上,稍后将由processNewFilesgo-routine 使用并读取内存中的文件。如果有任何错误,那么它将向错误通道添加错误。

func (r *customerConfig) detectNewFiles(rootPath string) {
}

这将读取所有s3文件并将其存储在内存中并返回错误。在这种方法中,我清除了地图的先前状态,以便它可以从新文件中获得新状态。此方法在服务器启动期间调用,每当我们需要从 go-routine 处理新文件时也会调用processNewFiles此方法。

func (r *customerConfig) loadFiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
r.data.Clear()
g, ctx := errgroup.WithContext(context.Background())
sem := make(chan struct{}, workers)
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
return r.read(spn, file, bucket)
})
}
if err := g.Wait(); err != nil {
return err
}
return nil
}

此方法读取文件并添加到data并发映射中。

func (r *customerConfig) read(file string, bucket string) error {
// read file and store it in "data" concurrent map 
// and if there is any error then return the error
var err error
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 8)
if err != nil {
return errs.Wrap(err)
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return nil
}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &invMods)
if err != nil {
return errs.Wrap(err)
}
for i := range invMods {
key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
hasInventory := false
if invMods[i].Available > 0 {
hasInventory = true
}
r.data.Set(key, hasInventory)
}
}
return nil
}

此方法将选取delta channel上的内容,如果有任何新文件,它将通过调用该方法开始读取loadFiles新文件。如果有任何错误,那么它将向错误通道添加错误。

// processNewFiles - load new files found by detectNewFiles
func (r *customerConfig) processNewFiles() {
// find new files on delta channel
// and call "loadFiles" method to read it
// if there is any error, then it will add it to the error channel.
}

如果error channel上有任何错误,那么它将从以下方法记录这些错误 -

func (r *customerConfig) handleError() {
// read error from error channel if there is any
// then log it
}

问题陈述

上面的逻辑对我没有任何问题,但我的代码中有一个小错误,我无法弄清楚如何解决它。如您所见,我有一个并发映射,我正在read方法中填充该映射,并在loadFiles方法中清除整个映射。因为每当增量通道上有新文件时,我都不想在映射中保留以前的状态,这就是为什么我要从映射中删除所有内容,然后从新文件添加新状态的原因。

现在,如果该方法read有任何错误,则会发生错误,因为我已经清除了data地图中的所有数据,这些数据将具有空地图,这不是我想要的。基本上,如果有任何错误,那么我想在data地图中保留以前的状态。如何在上述当前设计中解决此问题。

注意:我使用的是戈朗concurrent map

我认为你的设计过于复杂。它可以简单地解决,从而提供您想要的所有好处:

  • 安全并发访问
  • 重新加载检测到的更改
  • 访问配置可为您提供最新的成功加载的配置
  • 最新的配置始终可以立即访问,即使由于检测到的更改而加载新配置需要很长时间
  • 如果加载新配置失败,则保留以前的"快照"并保持当前状态
  • 作为奖励,它要简单得多,甚至不使用第三方库

让我们看看如何实现这一点:


有一个CustomerConfig结构来保存您想要缓存的所有内容(这是"快照"):

type CustomerConfig struct {
Data map[string]bool
// Add other props if you need:
LoadedAt time.Time
}

提供一个加载要缓存的配置的函数。注意:此函数是无状态的,它不访问/操作包级变量:

func loadConfig() (*CustomerConfig, error) {
cfg := &CustomerConfig{
Data:     map[string]bool{},
LoadedAt: time.Now(),
}
// Logic to load files, and populate cfg.Data
// If an error occurs, return it
// If loading succeeds, return the config
return cfg, nil
}

现在让我们创建我们的"缓存管理器"。缓存管理器存储实际/当前配置(快照),并提供对它的访问。为了安全并发访问(和更新),我们使用sync.RWMutex。还具有停止管理器(停止并发刷新)的方法:

type ConfigCache struct {
configMu sync.RWMutex
config   *CustomerConfig
closeCh  chan struct{}
}

创建缓存会加载初始配置。还启动了一个 goroutine,负责定期检查更改。

func NewConfigCache() (*ConfigCache, error) {
cfg, err := loadConfig()
if err != nil {
return nil, fmt.Errorf("loading initial config failed: %w", err)
}
cc := &ConfigCache{
config:  cfg,
closeCh: make(chan struct{}),
}
// launch goroutine to periodically check for changes, and load new configs
go cc.refresher()
return cc, nil
}

refresher()定期检查更改,如果检测到更改,则调用loadConfig()加载要缓存的新数据,并将其存储为当前/实际配置(同时锁定configMu)。它还监视closeCh在请求时停止:

func (cc *ConfigCache) refresher() {
ticker := time.NewTicker(1 * time.Minute) // Every minute
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Check if there are changes
changes := false // logic to detect changes
if !changes {
continue // No changes, continue
}
// Changes! load new config:
cfg, err := loadConfig()
if err != nil {
log.Printf("Failed to load config: %v", err)
continue // Keep the previous config
}
// Apply / store new config
cc.configMu.Lock()
cc.config = cfg
cc.configMu.Unlock()
case <-cc.closeCh:
return
}
}
}

关闭缓存管理器(刷新程序 goroutine)非常简单:

func (cc *ConfigCache) Stop() {
close(cc.closeCh)
}

最后一个缺失的部分是你如何访问当前配置。这是一个简单的GetConfig()方法(也使用configMu,但在只读模式下):

func (cc *ConfigCache) GetConfig() *CustomerConfig {
cc.configMu.RLock()
defer cc.configMu.RUnlock()
return cc.config
}

这是您可以使用它的方式:

cc, err := NewConfigCache()
if err != nil {
// Decide what to do: retry, terminate etc.
}
// Where ever, whenever you need the actual (most recent) config in your app:
cfg := cc.GetConfig()
// Use cfg

在关闭应用程序(或要停止刷新)之前,可以调用cc.Stop()

添加了RWMutex,用于通过worker goroutine进行collectedData并发写保护

type customerConfig struct {
...
m sync.RWMutex
}

而不是更新方法中的mapreadread方法只返回数据和错误

func (r *customerConfig) read(file string, bucket string) ([]CompModel, error) {
// read file data and return with error if any
var err error
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return (nil, errs.Wrap(err))
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 8)
if err != nil {
return (nil, errs.Wrap(err))
}
if pr.GetNumRows() == 0 {
spn.Infof("Skipping %s due to 0 rows", file)
return (nil, errors.New("No Data"))
}
var invMods = []CompModel{}
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return (nil, errs.Wrap(err))
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return (nil, errs.Wrap(err))
}
var jsonData []CompModel
err = json.Unmarshal(byteSlice, &jsonData)
if err != nil {
return (nil, errs.Wrap(err))
}
invMods = append(invMods, jsonData...)
}
return invMods, nil
}

然后loadFiles您可以通过以下方式收集返回的数据read方法,如果没有错误,则清除并更新地图 保持旧数据原样

func (r *customerConfig) loadFiles(workers int, path string) error {
var err error
...
var files []string
files = .....
// reset the map so that it can have fresh state from new files.
// r.data.Clear() <- remove the clear from here
g, ctx := errgroup.WithContext(context.Background())
sem := make(chan struct{}, workers)
collectedData := []CompModel{}
for _, file := range files {
select {
case <-ctx.Done():
break
case sem <- struct{}{}:
}
file := file
g.Go(func() error {
defer func() { <-sem }()
data, err:= r.read(spn, file, bucket)
if err != nil {
return err
}
r.m.Lock()
append(collectedData, data...)
r.m.Unlock()
return nil
})
}
if err := g.Wait(); err != nil {
return err
}
r.data.Clear()
for i := range collectedData {
key := strconv.FormatInt(collectedData[i].ProductID, 10) + ":" + strconv.Itoa(int(collectedData[i].Iaz))
hasInventory := false
if collectedData[i].Available > 0 {
hasInventory = true
}
r.data.Set(key, hasInventory)
}
return nil
}

注意:由于代码不可运行,因此仅更新了方法以供参考,并且我没有包含用于更新您可能需要处理的切片的互斥锁。


只需 3 个功能即可实现相同的功能 - 检测、读取、加载、检测将按间隔检查新文件并推送到增量通道(如果找到),加载将获取文件路径以从增量通道读取并调用读取方法以获取数据和错误,然后检查如果没有错误,然后清除地图并使用新内容更新,否则记录错误, 所以你会有 2 个 Go 例程和 1 个函数,它们将由加载例程调用

package main
import (
"fmt"
"time"
"os"
"os/signal"
"math/rand"
)
func main() {
fmt.Println(">>>", center("STARTED", 30), "<<<")
c := &Config{
InitialPath: "Old Path",
DetectInterval: 3000,
}
c.start()
fmt.Println(">>>", center("ENDED", 30), "<<<")
}
// https://stackoverflow.com/questions/41133006/how-to-fmt-printprint-this-on-the-center
func center(s string, w int) string {
return fmt.Sprintf("%[1]*s", -w, fmt.Sprintf("%[1]*s", (w + len(s))/2, s))
}
type Config struct {
deltaCh chan string
ticker *time.Ticker
stopSignal chan os.Signal
InitialPath string
DetectInterval time.Duration
}
func (c *Config) start() {
c.stopSignal = make(chan os.Signal, 1)
signal.Notify(c.stopSignal, os.Interrupt)
c.ticker = time.NewTicker(c.DetectInterval * time.Millisecond)
c.deltaCh = make(chan string, 1)
go c.detect()
go c.load()
if c.InitialPath != "" {
c.deltaCh <- c.InitialPath
}
<- c.stopSignal
c.ticker.Stop()
}
// Detect New Files
func (c *Config) detect() {
for {
select {
case <- c.stopSignal:
return
case <- c.ticker.C:
fmt.Println(">>>", center("DETECT", 30), "<<<")
c.deltaCh <- fmt.Sprintf("PATH %f", rand.Float64() * 1.5)
}
}
}
// Read Files
func read(path string) (map[string]int, error) {
data := make(map[string]int)
data[path] = 0
fmt.Println(">>>", center("READ", 30), "<<<")
fmt.Println(path)
return data, nil
}
// Load Files
func (c *Config) load() {
for {
select {
case <- c.stopSignal:
return
case path := <- c.deltaCh:
fmt.Println(">>>", center("LOAD", 30), "<<<")
data, err := read(path)
if err != nil {
fmt.Println("Log Error")
} else {
fmt.Println("Success", data)
}
fmt.Println()
}
}
}

注意:示例代码中不包含地图,可以轻松更新以包含地图

只需分配新的一张地图。喜欢这个:

var mu sync.Mutex
before := map[string]string{} // Some map before reading
after := make(map[string]string)
// Read files and fill `after` map
mu.Lock()
before = after
mu.Unlock()

不要在loadFile方法中清除地图,而是在read中执行类似操作

func (r *customerConfig) read(file string, bucket string) error {
m := cmap.New() // create a new map
// ...
for {
rows, err := pr.ReadByNumber(r.cfg.RowsToRead)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var invMods []CompModel
err = json.Unmarshal(byteSlice, &invMods)
if err != nil {
return errs.Wrap(err)
}
for i := range invMods {
key := strconv.FormatInt(invMods[i].ProductID, 10) + ":" + strconv.Itoa(int(invMods[i].Iaz))
hasInventory := false
if invMods[i].Available > 0 {
hasInventory = true
}
m.Set(key, hasInventory)
}
}
r.data = m // Use the new map
return nil
}

最新更新