Azure 数据工厂 - 清理批处理任务文件



我正在使用 Azure 数据工厂 v2,使用带有专用节点的批处理帐户池进行处理。随着时间的推移,我发现批处理活动由于 D:/上没有更多空间而失败节点上的临时驱动器。对于每个 ADF 作业,它会在节点上创建一个工作目录,作业完成后,我发现它不会清理文件。想知道其他人以前是否遇到过这种情况以及实施的最佳解决方案是什么。

编辑:现在似乎是ADF中的文件保留设置,当我提出问题时不存在。对于将来遇到相同问题的任何人来说,这是一个可能的解决方案。

想出了一个解决方案,发帖希望能帮助下一个出现的人。

我找到了适用于批处理的 Azure Python SDK,我创建了一个小脚本,该脚本将循环访问帐户上的所有池 + 节点,并删除工作项目录中超过 1 天的任何文件。

import azure.batch as batch
import azure.batch.operations.file_operations as file_operations
from azure.batch.batch_auth import SharedKeyCredentials
import azure.batch.operations
import msrest.service_client
from datetime import datetime
program_datetime = datetime.utcnow()
batch_account = 'batchaccount001'
batch_url = 'https://batchaccount001.westeurope.batch.azure.com'
batch_key = '<BatchKeyGoesHere>'
batch_credentials = SharedKeyCredentials(batch_account, batch_key)
#Create Batch Client with which to do operations
batch_client = batch.BatchServiceClient(credentials=batch_credentials,
                                        batch_url = batch_url
                                        )
service_client = msrest.service_client.ServiceClient(batch_credentials, batch_client.config)
#List out all the pools
pools = batch_client.pool.list()
pool_list = [p.id for p in pools]
for p in pool_list:
    nodes = batch_client.compute_node.list(p)
    node_list = [n.id for n in nodes]
    for n in node_list:
        pool_id = p
        node_id = n
        print(f'Pool = {pool_id}, Node = {node_id}')
        fo_client = azure.batch.operations.FileOperations(service_client,
                                                          config=batch_client.config,
                                                          serializer=batch_client._serialize,
                                                          deserializer=batch_client._deserialize)
        files = fo_client.list_from_compute_node(pool_id,
                                                 node_id,
                                                 recursive=True,
                                                 file_list_from_compute_node_options=None,
                                                 custom_headers=None,
                                                 raw=False
                                                )
        for file in files:
            # Check to make sure it's not a directory. Directories do not have a last_modified property.
            if not file.is_directory:
                file_datetime = file.properties.last_modified.replace(tzinfo=None)
                file_age_in_seconds = (program_datetime - file_datetime).total_seconds()
                # Delete anything older than a day in the workitems directory.
                if file_age_in_seconds > 86400 and file.name.startswith('workitems'):
                    print(f'{file_age_in_seconds} : {file.name}')
                    fo_client.delete_from_compute_node(pool_id, node_id, file.name)

我是 Azure 数据工厂的工程师。 我们使用了早于 2018-12-01.8.0 的 Azure 批处理 SDK,因此通过 ADF 创建的批处理任务默认为无限期保留期,如前所述。我们将推出一个修补程序,将 ADF 创建的 Batch 任务的保留期默认为 30 天,并在自定义活动的"属性"类型中引入了一个属性 retentionTimeInDays,客户可以在其 ADF 管道中设置该属性以替代此默认值。 推出后,https://learn.microsoft.com/en-us/azure/data-factory/transform-data-using-dotnet-custom-activity#custom-activity 的文档将更新更多详细信息。 感谢您的耐心等待。

任务清理是在删除任务或任务保留时间已过 (https://learn.microsoft.com/en-us/rest/api/batchservice/task/add#taskconstraints) 时完成的。这些中的任何一个都应该解决您遇到的问题。

注意:在最新的 REST API(2018-12-01.8.0)中,默认保留时间已从无限期减少到 7 天,默认允许任务清理。使用在此之前的版本创建的任务将不具有此新的默认值。

通过 ARM 模板进行部署时,可以使用typeProperties中的retentionTimeInDays配置。

请注意,您应该以 Double 而不是String 提供配置retentionTimeInDays

相关内容

  • 没有找到相关文章