填充并发地图,同时高效地并行迭代镶木地板文件



我正在处理镶木地板文件,在那里我并行读取所有这些文件,并在遍历每个文件中的所有行后填充并发映射。文件总数为50,每个文件大小最大约为60MB。

我需要并行化我的for循环,该循环并行读取所有这些parquet文件,并通过并行读取所有拼花文件来填充地图。data结构内部的这些并发映射将由多个读取器线程同时读取,也由多个写入器在for循环内部并行写入。我想确保他们是安全的,操作是原子的。我还得到了getter方法来访问那些并发映射。

下面是我得到的代码,但我不确定这是并行化的正确方法,还是我错过了一些非常基本的东西?

import (
//....
pars3 "github.com/xitongsys/parquet-go-source/s3"
"github.com/xitongsys/parquet-go/reader"
cmap "github.com/orcaman/concurrent-map"
//....
)
type Data interface {
GetCustomerMap() *cmap.ConcurrentMap
GetCustomerMetricsMap() *cmap.ConcurrentMap
}
type data struct {
// will have getter methods to access these below map
customers          *cmap.ConcurrentMap
customerMetrics    *cmap.ConcurrentMap
}
//loadParquet.. This will be called by background thread periodically
func (r *data) loadParquet(path string, bucket string, key string) error {
var err error
var files []string
files, err = r.s3Client.ListObjects(bucket, key, ".parquet")
if err != nil {
return err
}
var waitGroup sync.WaitGroup 
// Set number of effective goroutines we want to wait upon 
waitGroup.Add(len(files)) 
// parallelize below for loop in such a way so that I can populate my map in thread safe way?
// And same map will be accessed by multiple reader threads too.
// This writes to our map happens from background thread but there are lot of reader threads reading from the map.
for i, file := range files {
err = func() error {
fr, err := pars3.NewS3FileReader(context.Background(), bucket, file, r.s3Client.GetSession().Config)
if err != nil {
return errs.Wrap(err)
}
defer xio.CloseIgnoringErrors(fr)
pr, err := reader.NewParquetReader(fr, nil, 4)
if err != nil {
return errs.Wrap(err)
}
// confuse on this for loop?
// do we need to parallelize here too?
for {
rows, err := pr.ReadByNumber(100)
if err != nil {
return errs.Wrap(err)
}
if len(rows) <= 0 {
break
}
byteSlice, err := json.Marshal(rows)
if err != nil {
return errs.Wrap(err)
}
var rows []ParquetProduct
err = json.Unmarshal(byteSlice, &rows)
if err != nil {
return errs.Wrap(err)
}
// read rows struct and put inside concurrent map.
// Need to populate map in such a way so that it is atomic and thread safe
// from multiple parallel writes inside this for loop 
// and multiple reads will happen from reader threads on these maps
for i := range rows {
// ...
// ...
r.customers.Set(.....)
r.customerMetrics.Set(....)
}
}
return nil
}()
if err != nil {
return err
}
go task(&waitGroup) // Achieving maximum concurrency 
}
// Wait until all goroutines have completed execution. 
waitGroup.Wait()  
return nil
}
//GetCustomerMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMap() *cmap.ConcurrentMap {
return r.customers
}
//GetCustomerMetricsMap.. These will be accessed by multiple reader threads to get data out of map.
func (r *data) GetCustomerMetricsMap() *cmap.ConcurrentMap {
return r.customerMetrics
}

我正在使用这个镶木地板库去读取文件。

在Golang中,并发访问映射往往是一种反模式,通常需要使用锁来防止恐慌,例如:Golang fatal error: concurrent map writes

相反,您可以使用chan(通道(类型,让每个读取文件的go例程将其数据写入该通道,然后有一个监听该通道并将数据添加到映射的single-go例程。在这种方法中,映射只有一个读取器/写入器,没有锁,并且每个go例程在写入结果时都不会被阻塞(如果通道是缓冲的(。

这种模式的一个例子可以在这里看到:https://play.golang.com/p/u-uYDaWiQiD

将doWork((替换为您的读取文件的函数,并监听输出通道中的字节、文件以及您想要的任何类型,这样您就可以将它们放置到地图中。

最新更新