背景
我已经简化了这种情况,但这是一般的问题。
我正在使用Azure数据工厂将自定义API的数据从Azure Data Warehouse的表中摄取。我正在使用IDOTNETACTIVITY运行调用API并将数据加载到数据仓库中的C#代码。该活动在Azure批处理中运行。
在活动本身中,在调用自定义API之前,我在Azure Blob存储中加载了一个文件列表。然后,我致电文件中的每个人的自定义API。这些呼叫依次依次进行。问题是这种方法需要太长。文件大小可能会增加,因此所需的时间只会变得更糟。
我试图提高性能的事情
- 使API调用异步并分批打电话给3.奇怪的是,这速度较慢。看来批处理过程无法处理异步/等待这一切。
- 我们看到的另一个陌生感是,Morelinq的批处理命令根本没有起作用。我已经检查了源代码:https://github.com/morelinq/morelinq/blob/master/morelinq/batch.cs。这使用了收益率返回,但我不知道为什么它不起作用,甚至与异步/等待问题有关。
主要问题
Azure批处理是否支持异步/等待?
其他问题
- 如果Azure不支持异步/等待,那么解决此问题的更好方法是什么?即使用工作经理并旋转更多节点。
-
任何人都可以阐明为什么Morelinq的批量在Azure批处理中不起作用吗?这是受影响代码的片段:
List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword); var customResults = new List<CustomApiResult>(); foreach (var personIdsBatch in personIds.Batch(100)) { customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch)); }
根据我的理解, personIds.Batch(100)
只是将 personIds
划分为大小(100(桶。
//method1
foreach (var personIdsBatch in personIds.Batch(100))
{
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIdsBatch));
}
//method2
customResults.AddRange(GetCustomResultsByBatch(address, username, password, personIds));
上述两种方法都会依次呼叫您的自定义API,而method1
添加了处理相同任务的其他逻辑。
Azure批处理是否支持异步/等待?
基于您的代码,我定义了IDotNetActivity
实现如下,您可以参考:
public class MyDotNetActivity : IDotNetActivity
{
public IDictionary<string, string> Execute(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
return ExecuteAsync(linkedServices, datasets, activity, logger).Result;
}
async Task<IDictionary<string, string>> ExecuteAsync(IEnumerable<LinkedService> linkedServices, IEnumerable<Dataset> datasets, Activity activity, IActivityLogger logger)
{
List<int> personIds = await GetPersonIds("{clientAddress}", "{clientUsername}", "{clientPassword}");
var tasks = new List<Task<List<CustomApiResult>>>();
foreach (var personIdsBatch in personIds.Batch(100))
{
tasks.AddRange(GetCustomResultsByBatch("{address}", "{username}", "{password}", "{personIdsBatch}"));
}
var taskResults = await Task.WhenAll(tasks);
List<CustomApiResult> customResults = taskResults.SelectMany(r=>r).ToList();
//process the custom api results
return new Dictionary<string, string>();
}
async Task<List<CustomApiResult>> GetCustomResultsByBatch(string address, string username, string password, IEnumerable<int> personIdsBatch)
{
//Get Custom Results By Batch
return new List<CustomApiResult>();
}
async Task<List<int>> GetPersonIds(string clientAddress, string clientUsername, string clientPassword)
{
//load a list of people from a file in Azure Blob storage
return new List<int>();
}
}
另外,我假设您可以并行利用并行执行您的同步作业:
:List<int> personIds = GetPersonIds(clientAddress, clientUsername, clientPassword);
var customResults = new List<CustomApiResult>();
Parallel.ForEach(personIds.Batch(100),
new ParallelOptions()
{
MaxDegreeOfParallelism=5
},
(personIdsBatch) =>
{
var results = GetCustomResultsByBatch(address, username, password, personIdsBatch);
lock (customResults)
{
customResults.AddRange(results);
}
});