并行启动与针对 Azure 表存储的查询



我有以下方法,可以对 Azure 表存储上的表中的RowKey执行startsWith查询。我现在想在RowKey上使用startsWith运行并行查询。

是否可以创建一个仅调用现有方法的并行方法,或者我是否必须创建现有方法的并行版本?

这是我目前的startWith方法:

public async Task<IEnumerable<T>> RowKeyStartsWith<T>
(string searchString,
string tableName,
string partitionKey,
string columnName = "RowKey") where T : ITableEntity, new()
{
// Make sure we have a search string
if (string.IsNullOrEmpty(searchString)) return null;
// Get CloudTable
var table = GetTable(tableName);
char lastChar = searchString[searchString.Length - 1];
char nextLastChar = (char)((int)lastChar + 1);
string nextSearchStr = searchString.Substring(0, searchString.Length - 1) + nextLastChar;
// Define query segment(s)
string prefixCondition = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(columnName, QueryComparisons.GreaterThanOrEqual, searchString),
TableOperators.And,
TableQuery.GenerateFilterCondition(columnName, QueryComparisons.LessThan, nextSearchStr)
);
string filterString = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey),
TableOperators.And,
prefixCondition
);
// Create final query
var query = new TableQuery<T>().Where(filterString);
// Declare result variable
var result = new List<T>();
// Execute query asynchronously
TableContinuationToken continuationToken = null;
do
{
Task<TableQuerySegment<T>> querySegment = table.ExecuteQuerySegmentedAsync(query, continuationToken);
TableQuerySegment<T> segment = await querySegment;
result.AddRange(segment.ToList());
continuationToken = segment.ContinuationToken;
} while (continuationToken != null);
return result;
}

是否可以创建一个仅调用现有方法的并行方法,还是必须创建现有方法的并行版本?

根据我的理解,您可以重用现有方法并使用多个任务执行查询,如下所示:

//for storing the query results
ConcurrentDictionary<string, object> resultDics = new ConcurrentDictionary<string, object>();
//simulate your seaching parameters
List<RowKeyStartsWithParamModel> rowKeySearchs = Enumerable.Range(1, 10)
.Select(i => new RowKeyStartsWithParamModel()
{
SearchString = i.ToString(),
TableName = "tablename",
ColumnName = "Rowkey",
ParationKey = "partionKey"
}).ToList();
//create multiple tasks to execute your jobs
var tasks = rowKeySearchs.Select(item => Task.Run(async () =>
{   
//invoke your existing RowKeyStartsWith
var results=await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
//add retrieved results
resultDics.TryAdd(item.SearchString, results);
}));
//synchronously wait all tasks to be executed completely.
Task.WaitAll(tasks.ToArray());
//print all retrieved results
foreach (var item in resultDics)
{
Console.WriteLine($"{item.Key},{JsonConvert.SerializeObject(item.Value)}");
}

此外,您可以按如下方式利用并行:

Parallel.ForEach(rowKeySearchs, async(item) =>
{
var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
resultDics.TryAdd(item.SearchString, results);
});

注意:由于每次迭代都在委托中使用await,因此在Parallel.ForEach之后无法同步接收查询结果。

为了使用上述代码片段同步检索结果,您可以利用以下方法:

1( 在每次迭代Parallel.ForEach下调用RowKeyStartsWith时同步检索结果,如下所示:

var results = RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName).Result;

2( 您可以利用 WaitHandle 同步等待查询结果,直到所有 WaitHandle 完成。

var waitHandles = rowKeySearchs.Select(d => new EventWaitHandle(false, EventResetMode.ManualReset)).ToArray();
Parallel.ForEach(rowKeySearchs, async (item,loopState,index) =>
{
var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
resultDics.TryAdd(item.SearchString, results);
waitHandles[index].Set(); //release
});
WaitHandle.WaitAll(waitHandles); //block the current thread until all EventWaitHandles released

相关内容

  • 没有找到相关文章

最新更新