我有一个混乱的JSON,我正在尝试使用jq进行清理



我有一些混乱的JSON。

  • 某些节点在行之间不一致。在某些行中,这些节点是数组,在某些行中,这些节点是对象或字符串。
  • 这里的示例只有两个级别,但实际数据嵌套了更多级别。

例:

[
{
"id": 1,
"person": {
"addresses": {
"address": {
"city": "FL"
}
},
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
}
},
{
"id": 2,
"person": {
"addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"phones": {
"type": "mobile",
"number": "555-555-5555"
},
"email": {
"type": "work",
"email": "jane.doe@gmail.com"
}
}
}
]

我想使节点保持一致,以便如果任何节点是任何节点中的数组,则其余节点应转换为数组。

一旦数据一致,分析和重组数据就会更容易。

预期成果:

[
{
"id": 1,
"person": {
"addresses": [
{
"address": {
"city": "FL"
}
}
],
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
}
},
{
"id": 2,
"person": {
"addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"email": [
{
"type": "work",
"email": "jane.doe@gmail.com"
}
]
}
}
]

在使数组保持一致之后,我想展平数据,以便将对象展平,但数组仍然是数组。这

预期成果

[
{
"id": 1,
"person.addresses": [
{
"address": {
"city": "FL"
}
}
],
"person.phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"person.email": [
{
"type": "work",
"email": "john.doe@gmail.com"
},
{
"type": "work",
"email": "john.doe@work.com"
}
]
},
{
"id": 2,
"person.addresses": [
{
"type": "home",
"address": {
"city": "FL"
}
}
],
"person.phones": [
{
"type": "mobile",
"number": "555-555-5555"
}
],
"person.email": [
{
"type": "work",
"email": "jane.doe@gmail.com"
}
]
}
]

我能够使用 jq 部分做到这一点。当有一条或两条路径需要修复时,它有效,但是当有两条以上的路径时,它似乎会中断。

我采取的方法

  • 确定所有可能的路径
  • 对每个路径的数据类型进行分组和计数
  • 确定存在混合数据类型的情况
  • 按深度递减对路径进行排序
  • 排除没有混合类型的路径
  • 排除其中一个混合类型不是数组的路径
  • 对于每个路径,对原始数据应用修复
  • 这将生成一个包含 N 个副本的流,每个 N 个转换一个副本
  • 提取应包含已清理结果的最后一个副本

到目前为止我的实验

def fix(data; path):
data |= map(. | getpath(path)?=([getpath(path)?]|flatten));
def hist:
length as $l
| group_by (.)
| map( .
| (.|length) as $c
| {(.[0]):{
"count": $c,
"diff": ($l - $c)
}} )
| (length>1) as $mixed
| {
"types": .[],
"count": $l,
"mixed":$mixed
};
def summary:
map( .
| path(..) as $p
| {
path:$p,
type: getpath($p)|type,
key:$p|join(".")
}
)
| flatten
| group_by(.key)
| map( .
| {
key: .[0].key,
path: .[0].path,
depth: (.[0].path|length),
type:([(.[] | .type)]|hist)
}
)
| sort_by(.depth)
| reverse;
. as $data
| .
| summary
| map( . 
| select(.type.mixed)
| select(.type.types| keys| contains(["array"]))
| .path)
| map(. as $path | $data | fix($data;$path))
| length as $l
| .[$l-1]

仅存在最后一次转换。我认为$data没有通过我的修复程序进行更新,这可能是根本原因,或者我只是做错了。

这是 e 这不起作用的地方

以下响应首先解决第一个任务,即:

使节点保持一致,以便如果有的话...节点是任何节点中的数组,那么其余节点应该转换为数组。

以通用方式:

def paths_to_array:
[paths as $path
| select( any(.[]; (getpath($path[1:] )? | type) == "array"))
| $path] ;
# If a path to a value in .[] is an array, 
# then ensure all corresponding values are also arrays
def make_uniform:
reduce (paths_to_array[][1:]) as $path (.;
map( (getpath($path)? // null) as $value
| if $value and ($value|type != "array")
then setpath($path; [$value])
else . end )  ) ;
make_uniform

对于第二个任务,让我们定义一个实用程序函数:

# Input is assumed to be an object:
def flatten_top_level_keys:
[ to_entries[]
| if (.value|type) == "object" 
then .key as $k
| (.value|to_entries)[] as $kv
| {key: ($k + "." + $kv.key), value: $kv.value} 
else .
end ]
| from_entries;

这可以与walk/1结合使用以实现递归 平坦。

换句话说,可以获得组合问题的解 由:

make_uniform
| walk( if type == "object" then flatten_top_level_keys else . end )

效率

上述make_uniform定义在生产线中存在明显的效率问题:

reduce (paths_to_array[][1:]) as $path (.;  

使用jq 的unique是解决它的一种方法,但unique是使用排序实现的,在这种情况下,这引入了另一个低效率。 所以让我们用这个老栗子:

# bag of words
def bow(stream): 
reduce stream as $word ({}; .[$word|tostring] += 1);

现在我们可以更有效地定义make_uniform

def make_uniform:
def uniques(s): bow(s) | keys_unsorted[] | fromjson;
reduce uniques(paths_to_array[][1:]) as $path (.;
map( (getpath($path)? // null) as $value
| if $value and ($value|type != "array") 
then setpath($path; [$value]) 
else . end )  ) ;

使用一些python以及peak在上面的解决方案中给出的JQ脚本,我能够清理我混乱的数据。

我仍然认为,鉴于我提出的问题,peak给出的答案是正确的答案。尽管解决方案非常好且运行良好,但需要花费大量时间才能完成。所花费的时间取决于节点的数量、节点的深度以及它找到的一个或多个数组。

我有两个不同的文件需要修复,两个文件都有大约 5000 行数据。在其中一个上,jq脚本大约需要 6 个小时才能完成,我不得不在 16 小时后终止另一个。

下面的解决方案基于原始解决方案,结合使用 python 和jq并行处理一些步骤。查找数组的路径仍然是最耗时的部分。

设置 我将脚本拆分为以下内容

# paths_to_array.jq
def paths_to_array:
[paths as $path
| select( any(.[]; (getpath($path[1:] )? | type) == "array"))
| $path[1:]]
| unique
| map(. | select([.[]|type]|contains(["number"])|not));
paths_to_array

细微调整以排除介于两者之间的任何路径。我只想要所有以数组结尾的路径。 我还从路径中排除了最顶层的数组索引以减少路径数量

# flatten.jq
def update_array($path):
(getpath($path)? // null) as $value
| (if $value and ($value|type != "array")
then . as $data | (try (setpath($path; [$value]))
catch $data)
else . end);
def make_uniform($paths):
map( .
| reduce($paths[]) as $path (
. ; update_array($path)
)
);
# Input is assumed to be an object:
def flatten_top_level_keys:
[ to_entries[]
| if (.value|type) == "object"
then .key as $k
| (.value|to_entries)[] as $kv
| {key: ($k + "." + $kv.key), value: $kv.value}
else .
end ]
| from_entries;

我不得不从 jq 内置中添加 walk 函数,因为 pythonn 的 jq 库不包含它。 我拆分了make_uniform函数,以便更好地理解脚本,并且由于路径在两者之间包含数组索引时遇到的问题,我添加了 try catch。否则,这与原始解决方案中的代码几乎相同

# apply.jq
make_uniform({path})
| map( .
| walk( if type == "object" then
flatten_top_level_keys
else . end ))

我不得不拆分它,因为我正在使用{path}为路径注入数据,当它在完整脚本中时,我在 python 中使用.format()时出现错误。


import math
import os
import JSON
from jq import jq
import multiprocessing as mp
def get_script(filename):
"""Utility function to read the jq script"""
with open(filename, "r") as f:
script = f.read()
return script
def get_data(filename):
"""Utility function to read json data from file"""
with open(filename, 'r') as f:
data = json.load(f)
return data
def transform(script, data):
"""Wrapper to be used by the parallel processor"""
return jq(script).transform(data)
def parallel_jq(script, data, rows=100, processes=8):
"""Executes the JQ script on data in parallel chuncks specified by rows"""
pool = mp.Pool(processes=processes)
size = math.ceil(len(data) / rows)
segments = [pool.apply_async(transform,
args=(script,
data[index*rows:(index+1)*rows]))
for index in range(size) ]
result = []
for seg in segments:
result.extend(seg.get())
return result
def get_paths_to_arrays(data, dest="data"):
"""Obtain the paths to arrays"""
filename = os.path.join(dest, "paths_to_arrays.json")
if os.path.isfile(filename):
paths = get_data(filename)
else:
script = get_script('jq/paths_to_array.jq')
paths = parallel_jq(script, data)
paths = jq("unique|sort_by(length)|reverse").transform(paths)
with open(filename, 'w') as f:
json.dump(paths, f, indent=2)
return paths
def flatten(data, paths, dest="data"):
"""Make the arrays uniform and flatten the result"""
filename = os.path.join(dest, "uniform_flat.json")
script = get_script('jq/flatten.jq')
script += "n" + get_script('jq/apply.jq').format(path=json.dumps(paths))
data = parallel_jq(script, data)
with open(filename, 'w') as f:
json.dump(data, f, indent=2)
if __name__ == '__main__':
entity = 'messy_data'
sourcefile = os.path.join('data', entity+'.json')
dest = os.path.join('data', entity)
data =  get_data(sourcefile)
# Finding paths with arrays
paths = get_paths_to_arrays(data, dest)
# Fixing array paths and flattening
flatten(data, paths, dest)

正如我之前提到的,即使使用并行处理,get_paths_to_arrays也需要相当长的时间。

  • get_paths_to_arrays花了 3811.834 秒 => 只是一个多小时。
  • 扁平花了 38 秒

相关内容

  • 没有找到相关文章

最新更新