继续之前的帖子。
我正在迭代flatProduct.Catalogs
片并在golang中填充productCatalog
并发映射。我正在使用upsert方法,以便我只能将唯一的productID's
添加到我的productCatalog
映射中。
下面的代码被多个go routines
并行调用,这就是为什么我在这里使用并发映射来填充数据到它。此代码在后台运行,每30秒向并发映射中填充数据。
var productRows []ClientProduct
err = json.Unmarshal(byteSlice, &productRows)
if err != nil {
return err
}
for i := range productRows {
flatProduct, err := r.Convert(spn, productRows[i])
if err != nil {
return err
}
if flatProduct.StatusCode == definitions.DONE {
continue
}
r.products.Set(strconv.Itoa(flatProduct.ProductId, 10), flatProduct)
for _, catalogId := range flatProduct.Catalogs {
catalogValue := strconv.FormatInt(int64(catalogId), 10)
r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
productID := newValue.(int64)
if valueInMap == nil {
return map[int64]struct{}{productID: {}}
}
oldIDs := valueInMap.(map[int64]struct{})
// value is irrelevant, no need to check if key exists
// I think problem is here
oldIDs[productID] = struct{}{}
return oldIDs
})
}
}
下面是我在同一类中的getter,上面的代码在那里。主应用程序线程使用这些getter从map中获取数据或获取整个map。
func (r *clientRepository) GetProductMap() *cmap.ConcurrentMap {
return r.products
}
func (r *clientRepository) GetProductCatalogMap() *cmap.ConcurrentMap {
return r.productCatalog
}
func (r *clientRepository) GetProductData(pid string) *definitions.FlatProduct {
pd, ok := r.products.Get(pid)
if ok {
return pd.(*definitions.FlatProduct)
}
return nil
}
这就是我如何从这个productCatalog
cmap中读取数据,但是我的系统在下面的范围语句上崩溃-
// get productCatalog map which was populated above
catalogProductMap := clientRepo.GetProductCatalogMap()
productIds, ok := catalogProductMap.Get("211")
data, _ := productIds.(map[int64]struct{})
// I get panic here after sometime
for _, pid := range data {
...
}
错误我得到as -fatal error: concurrent map iteration and map write
.
我认为问题是r.productCatalog
是一个concurrentmap,但oldIDs[productID]
是一个正常的映射,这是导致问题,而我在上面的for循环迭代。
我如何解决我看到的这个种族问题?我可以想到的一种方法是将oldIDs[productID]
作为并发地图,但如果我采用这种方法,我的内存就会增加很多,最终会出现OOM。下面是我所尝试的工作,它解决了竞争条件,但它增加了很多内存,这不是我想要的-
r.productCatalog.Upsert(catalogValue, flatProduct.ProductId, func(exists bool, valueInMap interface{}, newValue interface{}) interface{} {
productID := newValue.(int64)
if valueInMap == nil {
// return map[int64]struct{}{productID: {}}
return cmap.New()
}
// oldIDs := valueInMap.(map[int64]struct{})
oldIDs := valueInMap.(cmap.ConcurrentMap)
// value is irrelevant, no need to check if key exists
// oldIDs[productID] = struct{}{}
oldIDs.Set(strconv.FormatInt(productID, 10), struct{}{})
return oldIDs
})
我可以做的任何其他方法,这不会增加内存,也修复了我所看到的竞争条件?
注意我仍然使用没有泛型的cmap的v1版本,它处理字符串作为键。
与普通的map[int64]struct{}
类型不同,您可以定义一个结构体来保存map和一个互斥锁来控制对map的访问:
type myMap struct{
m sync.Mutex
data map[int64]struct{}
}
func (m *myMap) Add(productID int64) {
m.m.Lock()
defer m.m.Unlock()
m.data[productID] = struct{}{}
}
func (m *myMap) List() []int64 {
m.m.Lock()
defer m.m.Unlock()
var res []int64
for id := range m.data {
res = append(res, id)
}
// sort slice if you need
return res
}
在上面的示例实现中,您必须小心地将*myMap
指针(与普通myMap
结构相反)存储在cmap.ConcurrentMap
结构中。