我有一些混乱的JSON。
- 某些节点在行之间不一致。在某些行中,这些节点是数组,在某些行中,这些节点是对象或字符串。
- 这里的示例只有两个级别,但实际数据嵌套了更多级别。
例:
[
{
"id": 1,
"person": {
"addresses": {
"address": {
"city": "FL"
}
},
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
}
},
{
"id": 2,
"person": {
"addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"phones": {
"type": "mobile",
"number": "555-555-5555"
},
"email": {
"type": "work",
"email": "jane.doe@gmail.com"
}
}
}
]
我想使节点保持一致,以便如果任何节点是任何节点中的数组,则其余节点应转换为数组。
一旦数据一致,分析和重组数据就会更容易。
预期成果:
[
{
"id": 1,
"person": {
"addresses": [
{
"address": {
"city": "FL"
}
}
],
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
}
},
{
"id": 2,
"person": {
"addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "jane.doe@gmail.com"
}
]
}
}
]
在使数组保持一致之后,我想展平数据,以便将对象展平,但数组仍然是数组。这
预期成果
[
{
"id": 1,
"person.addresses": [
{
"address": {
"city": "FL"
}
}
],
"person.phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"person.email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
},
{
"id": 2,
"person.addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"person.phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"person.email": [
{
"type": "work",
"email": "jane.doe@gmail.com"
}
]
}
]
我能够使用 jq 部分做到这一点。当有一条或两条路径需要修复时,它有效,但是当有两条以上的路径时,它似乎会中断。
我采取的方法
- 确定所有可能的路径
- 对每个路径的数据类型进行分组和计数
- 确定存在混合数据类型的情况
- 按深度递减对路径进行排序
- 排除没有混合类型的路径
- 排除其中一个混合类型不是数组的路径
- 对于每个路径,对原始数据应用修复
- 这将生成一个包含 N 个副本的流,每个 N 个转换一个副本
- 提取应包含已清理结果的最后一个副本
到目前为止我的实验
def fix(data; path):
data |= map(. | getpath(path)?=([getpath(path)?]|flatten));
def hist:
length as $l
| group_by (.)
| map( .
| (.|length) as $c
| {(.[0]):{
"count": $c,
"diff": ($l - $c)
}} )
| (length>1) as $mixed
| {
"types": .[],
"count": $l,
"mixed":$mixed
};
def summary:
map( .
| path(..) as $p
| {
path:$p,
type: getpath($p)|type,
key:$p|join(".")
}
)
| flatten
| group_by(.key)
| map( .
| {
key: .[0].key,
path: .[0].path,
depth: (.[0].path|length),
type:([(.[] | .type)]|hist)
}
)
| sort_by(.depth)
| reverse;
. as $data
| .
| summary
| map( .
| select(.type.mixed)
| select(.type.types| keys| contains(["array"]))
| .path)
| map(. as $path | $data | fix($data;$path))
| length as $l
| .[$l-1]
仅存在最后一次转换。我认为$data没有通过我的修复程序进行更新,这可能是根本原因,或者我只是做错了。
这是 e 这不起作用的地方
以下响应首先解决第一个任务,即:
使节点保持一致,以便如果有的话...节点是任何节点中的数组,那么其余节点应该转换为数组。
以通用方式:
def paths_to_array:
[paths as $path
| select( any(.[]; (getpath($path[1:] )? | type) == "array"))
| $path] ;
# If a path to a value in .[] is an array,
# then ensure all corresponding values are also arrays
def make_uniform:
reduce (paths_to_array[][1:]) as $path (.;
map( (getpath($path)? // null) as $value
| if $value and ($value|type != "array")
then setpath($path; [$value])
else . end ) ) ;
make_uniform
对于第二个任务,让我们定义一个实用程序函数:
# Input is assumed to be an object:
def flatten_top_level_keys:
[ to_entries[]
| if (.value|type) == "object"
then .key as $k
| (.value|to_entries)[] as $kv
| {key: ($k + "." + $kv.key), value: $kv.value}
else .
end ]
| from_entries;
这可以与walk/1
结合使用以实现递归 平坦。
换句话说,可以获得组合问题的解 由:
make_uniform
| walk( if type == "object" then flatten_top_level_keys else . end )
效率
上述make_uniform
定义在生产线中存在明显的效率问题:
reduce (paths_to_array[][1:]) as $path (.;
使用jq 的unique
是解决它的一种方法,但unique
是使用排序实现的,在这种情况下,这引入了另一个低效率。 所以让我们用这个老栗子:
# bag of words
def bow(stream):
reduce stream as $word ({}; .[$word|tostring] += 1);
现在我们可以更有效地定义make_uniform
:
def make_uniform:
def uniques(s): bow(s) | keys_unsorted[] | fromjson;
reduce uniques(paths_to_array[][1:]) as $path (.;
map( (getpath($path)? // null) as $value
| if $value and ($value|type != "array")
then setpath($path; [$value])
else . end ) ) ;
使用一些python以及peak在上面的解决方案中给出的JQ脚本,我能够清理我混乱的数据。
我仍然认为,鉴于我提出的问题,peak给出的答案是正确的答案。尽管解决方案非常好且运行良好,但需要花费大量时间才能完成。所花费的时间取决于节点的数量、节点的深度以及它找到的一个或多个数组。
我有两个不同的文件需要修复,两个文件都有大约 5000 行数据。在其中一个上,jq脚本大约需要 6 个小时才能完成,我不得不在 16 小时后终止另一个。
下面的解决方案基于原始解决方案,结合使用 python 和jq并行处理一些步骤。查找数组的路径仍然是最耗时的部分。
设置 我将脚本拆分为以下内容
# paths_to_array.jq
def paths_to_array:
[paths as $path
| select( any(.[]; (getpath($path[1:] )? | type) == "array"))
| $path[1:]]
| unique
| map(. | select([.[]|type]|contains(["number"])|not));
paths_to_array
细微调整以排除介于两者之间的任何路径。我只想要所有以数组结尾的路径。 我还从路径中排除了最顶层的数组索引以减少路径数量
# flatten.jq
def update_array($path):
(getpath($path)? // null) as $value
| (if $value and ($value|type != "array")
then . as $data | (try (setpath($path; [$value]))
catch $data)
else . end);
def make_uniform($paths):
map( .
| reduce($paths[]) as $path (
. ; update_array($path)
)
);
# Input is assumed to be an object:
def flatten_top_level_keys:
[ to_entries[]
| if (.value|type) == "object"
then .key as $k
| (.value|to_entries)[] as $kv
| {key: ($k + "." + $kv.key), value: $kv.value}
else .
end ]
| from_entries;
我不得不从 jq 内置中添加 walk 函数,因为 pythonn 的 jq 库不包含它。 我拆分了make_uniform函数,以便更好地理解脚本,并且由于路径在两者之间包含数组索引时遇到的问题,我添加了 try catch。否则,这与原始解决方案中的代码几乎相同
# apply.jq
make_uniform({path})
| map( .
| walk( if type == "object" then
flatten_top_level_keys
else . end ))
我不得不拆分它,因为我正在使用
{path}
为路径注入数据,当它在完整脚本中时,我在 python 中使用.format()
时出现错误。
import math
import os
import JSON
from jq import jq
import multiprocessing as mp
def get_script(filename):
"""Utility function to read the jq script"""
with open(filename, "r") as f:
script = f.read()
return script
def get_data(filename):
"""Utility function to read json data from file"""
with open(filename, 'r') as f:
data = json.load(f)
return data
def transform(script, data):
"""Wrapper to be used by the parallel processor"""
return jq(script).transform(data)
def parallel_jq(script, data, rows=100, processes=8):
"""Executes the JQ script on data in parallel chuncks specified by rows"""
pool = mp.Pool(processes=processes)
size = math.ceil(len(data) / rows)
segments = [pool.apply_async(transform,
args=(script,
data[index*rows:(index+1)*rows]))
for index in range(size) ]
result = []
for seg in segments:
result.extend(seg.get())
return result
def get_paths_to_arrays(data, dest="data"):
"""Obtain the paths to arrays"""
filename = os.path.join(dest, "paths_to_arrays.json")
if os.path.isfile(filename):
paths = get_data(filename)
else:
script = get_script('jq/paths_to_array.jq')
paths = parallel_jq(script, data)
paths = jq("unique|sort_by(length)|reverse").transform(paths)
with open(filename, 'w') as f:
json.dump(paths, f, indent=2)
return paths
def flatten(data, paths, dest="data"):
"""Make the arrays uniform and flatten the result"""
filename = os.path.join(dest, "uniform_flat.json")
script = get_script('jq/flatten.jq')
script += "n" + get_script('jq/apply.jq').format(path=json.dumps(paths))
data = parallel_jq(script, data)
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
if __name__ == '__main__':
entity = 'messy_data'
sourcefile = os.path.join('data', entity+'.json')
dest = os.path.join('data', entity)
data = get_data(sourcefile)
# Finding paths with arrays
paths = get_paths_to_arrays(data, dest)
# Fixing array paths and flattening
flatten(data, paths, dest)
正如我之前提到的,即使使用并行处理,get_paths_to_arrays也需要相当长的时间。
- get_paths_to_arrays花了 3811.834 秒 => 只是一个多小时。
- 扁平花了 38 秒