我正试图用Azure数据工厂(ADF(解决一个数据问题,我几乎做到了,但我一直陷入困境。
我们正在尝试处理EventHub捕获文件,将它们分组在几个字段中,并将每个分组存储在一个单独的文件中。
因此,假设这是我们Avro文件中的数据(我跳过默认的EventHub捕获列,如EnqueuedTimeUtc和SequenceNumber(:
[
{"Body": "{"DataType": "Type1", "DataVersion": 1, "id": 1}"},
{"Body": "{"DataType": "Type1", "DataVersion": 1, "id": 2}"},
{"Body": "{"DataType": "Type1", "DataVersion": 2, "id": 3}"}
]
我想将这些数据拆分成单独的文件,根据数据类型和版本、数据日期和每行1个json对象创建一个路径。所以,像这样:
/Type1/1/2021-04-22.json
{"DataType": "Type1", "DataVersion": 1, "id": 1}
{"DataType": "Type1", "DataVersion": 1, "id": 2}
到目前为止,我有以下步骤:
- 读取Avro文件的源
- 解析JSON步骤,从Body中读取DataType和DataVersion,并将它们附加为附加列ParsedBody。DataType和ParsedBody。数据版本
- 按步骤分组,按ParsedBody分组。DataType和ParsedBody。DataVersion,并使用
collect(Body)
聚合 - 一个派生列步骤,它添加一个具有文件名(ParsedBody.DataType+"/"+ParsedBody.DataVersion+"/"+$year+">"+$month+">"+$day+".json"(的列
- 存储到blob的接收器,以filename列作为文件名
这将按文件存储组,但不是我想要的格式。它是这样存储的:
{
"DataType": "Type1",
"DataVersion": 1,
"Body": [
"{"DataType": "Type1", "DataVersion": 1, "id": 1}",
"{"DataType": "Type1", "DataVersion": 1, "id": 2}"
]
}
而且,正如我所说,我希望每行有一个json对象(而不是字符串(。
这在数据工厂中可行吗?还是我使用了错误的工具?如果是,怎么做?
所有这些原语(包括从avro读取(都在ADF数据流中。使用Parse-Tx解析json,使用flatten进行扁平化。使用接收器分区可以写入多个文件。