我有以下方法,可以对 Azure 表存储上的表中的RowKey
执行startsWith
查询。我现在想在RowKey
上使用startsWith
运行并行查询。
是否可以创建一个仅调用现有方法的并行方法,或者我是否必须创建现有方法的并行版本?
这是我目前的startWith
方法:
public async Task<IEnumerable<T>> RowKeyStartsWith<T>
(string searchString,
string tableName,
string partitionKey,
string columnName = "RowKey") where T : ITableEntity, new()
{
// Make sure we have a search string
if (string.IsNullOrEmpty(searchString)) return null;
// Get CloudTable
var table = GetTable(tableName);
char lastChar = searchString[searchString.Length - 1];
char nextLastChar = (char)((int)lastChar + 1);
string nextSearchStr = searchString.Substring(0, searchString.Length - 1) + nextLastChar;
// Define query segment(s)
string prefixCondition = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition(columnName, QueryComparisons.GreaterThanOrEqual, searchString),
TableOperators.And,
TableQuery.GenerateFilterCondition(columnName, QueryComparisons.LessThan, nextSearchStr)
);
string filterString = TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, partitionKey),
TableOperators.And,
prefixCondition
);
// Create final query
var query = new TableQuery<T>().Where(filterString);
// Declare result variable
var result = new List<T>();
// Execute query asynchronously
TableContinuationToken continuationToken = null;
do
{
Task<TableQuerySegment<T>> querySegment = table.ExecuteQuerySegmentedAsync(query, continuationToken);
TableQuerySegment<T> segment = await querySegment;
result.AddRange(segment.ToList());
continuationToken = segment.ContinuationToken;
} while (continuationToken != null);
return result;
}
是否可以创建一个仅调用现有方法的并行方法,还是必须创建现有方法的并行版本?
根据我的理解,您可以重用现有方法并使用多个任务执行查询,如下所示:
//for storing the query results
ConcurrentDictionary<string, object> resultDics = new ConcurrentDictionary<string, object>();
//simulate your seaching parameters
List<RowKeyStartsWithParamModel> rowKeySearchs = Enumerable.Range(1, 10)
.Select(i => new RowKeyStartsWithParamModel()
{
SearchString = i.ToString(),
TableName = "tablename",
ColumnName = "Rowkey",
ParationKey = "partionKey"
}).ToList();
//create multiple tasks to execute your jobs
var tasks = rowKeySearchs.Select(item => Task.Run(async () =>
{
//invoke your existing RowKeyStartsWith
var results=await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
//add retrieved results
resultDics.TryAdd(item.SearchString, results);
}));
//synchronously wait all tasks to be executed completely.
Task.WaitAll(tasks.ToArray());
//print all retrieved results
foreach (var item in resultDics)
{
Console.WriteLine($"{item.Key},{JsonConvert.SerializeObject(item.Value)}");
}
此外,您可以按如下方式利用并行:
Parallel.ForEach(rowKeySearchs, async(item) =>
{
var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
resultDics.TryAdd(item.SearchString, results);
});
注意:由于每次迭代都在委托中使用await
,因此在Parallel.ForEach
之后无法同步接收查询结果。
为了使用上述代码片段同步检索结果,您可以利用以下方法:
1( 在每次迭代Parallel.ForEach
下调用RowKeyStartsWith
时同步检索结果,如下所示:
var results = RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName).Result;
2( 您可以利用 WaitHandle 同步等待查询结果,直到所有 WaitHandle 完成。
var waitHandles = rowKeySearchs.Select(d => new EventWaitHandle(false, EventResetMode.ManualReset)).ToArray();
Parallel.ForEach(rowKeySearchs, async (item,loopState,index) =>
{
var results = await RowKeyStartsWith<string>(item.SearchString, item.TableName, item.ParationKey, item.ColumnName);
resultDics.TryAdd(item.SearchString, results);
waitHandles[index].Set(); //release
});
WaitHandle.WaitAll(waitHandles); //block the current thread until all EventWaitHandles released