NiFi上的Python 2.X:json.loads中的Ñ(和其他)问题



我使用Jython InvokeScriptedProcessor将数据从json结构结构到sql结构。我在一个特定的函数上遇到了麻烦。json.loads。json。加载不识别特殊字符,如ñ,, í…

以奇数形式写。而我还没有达到拥有它的任何形式。

。(非常简单的)

{"id":"ÑUECO","value":3.141592,"datetime":"....","location":"ÑUECO"}

如果我们试着在sql中写成

INSERT INTO .... (id, value) VALUES ("...",3.141592);

它会失败。这让我很失望。我不能用任何返回选项返回数据,成功或失败,这与NiFi的版本无关。这是我的代码

def process(self, inputStream, outputStream):
# read input json data from flowfile content
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
data = json.loads(text) 

既不

data = json.loads(text.encode("utf-8"))

正常工作。文本以unicode格式出现。

def __generate_sql_transaction(input_data):
""" Generate SQL statement """
sql = """
BEGIN;"""
_id = input_data.get("id")
_timestamp = input_data.get("timestamp")
_flowfile_metrics = input_data.get("metrics")
_flowfile_metadata = input_data.get("metadata")
self.valid = __validate_metrics_type(_flowfile_metrics)
if self.valid is True:
self.log.error("generate insert")
sql += """
INSERT INTO
{0}.{1} (id, timestamp, metrics""".format(schema, table)
if _flowfile_metadata:
sql += ", metadata"
sql += """)
VALUES
('{0}', '{1}', '{2}'""".format(_id.encode("utf-8"), _timestamp, json.dumps(_flowfile_metrics))
self.log.error("generate metadata")
if _flowfile_metadata:
sql += ", '{}'".format(json.dumps(_flowfile_metadata).encode("utf-8"))
sql += """)
ON CONFLICT ({})""".format(on_conflict)
if not bool(int(self.update)):
sql += """
DO NOTHING;"""
else:
sql += """
DO UPDATE
SET"""
if bool(int(self.preference)):
sql += """
metrics = '{2}' || {0}.{1}.metrics;""".format(schema, table, json.dumps(_flowfile_metrics))
else:
sql += """
metrics = {0}.{1}.metrics || '{2}';""".format(schema, table, json.dumps(_flowfile_metrics))
else:
return ""
sql += """
COMMIT;"""
return sql

我将数据再次发送到NiFi:

output = __generate_sql_transaction(data)
self.log.error("post generate_sql_transaction")
self.log.error(output.encode("utf-8"))
# If no sql_transaction is generated because requisites weren't met,
# set the processor output with the original flowfile input.
if output == "":
output = text
# write new content to flowfile
outputStream.write(
output.encode("utf-8")
)

输出看起来像

INSERT INTO .... VALUES ("ÃUECO","2020-01-01T10:00:00",'{"value":3.1415}','{"location":"u00d1UECO"}');

我有"Ñueco"在元数据中也是如此,它不能很好地使用id或元数据

注意:似乎InvokeScriptedProcessor使用Groove而不是Python工作得很好。但是我的问题是我对Groovy一无所知…

有人发现类似的问题吗?你是怎么解决的?

更新:

输入例子:

{"id":"ÑUECO",
"metrics":{
"value":3.1415
},
"metadata":{
"location":"ÑUECO"
},
"timestamp":"2020-01-01 00:00:00+01:00"
}

所需输出:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÑUECO', 
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"ÑUECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

实际产出:

BEGIN;
INSERT INTO Table (id, timestamp, metrics, metadata)
VALUES ('ÃUECO', 
'2020-01-01T00:00:00+01:00',
'{"value":3.1415}',
'{"location":"u00d1UECO"}')
ON CONFLICT (id, timestamp)
DO UPDATE
SET
metrics='{"value":3.1415}' || Table.metrics;
COMMIT;

UPD

  1. jython不能正确处理字节串-所以,不要使用.encode('utf-8')

  2. 使用java方法用特定编码将内容写回流文件

下面是一个正确读写非ascii字符的示例,包括Ñ

用jython代替ExecuteScript处理器,替换_transform(text)函数体:

import traceback
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class FlowWriter(StreamCallback):
def _transform(self, text):
# transform incoming text here
return '@@@@' + text + '****'
def process(self, inputStream, outputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
new_text = self._transform(text)
IOUtils.write(new_text, outputStream, StandardCharsets.UTF_8)
flowFile = session.get()
if flowFile != None:
try:
flowFile = session.write(flowFile, FlowWriter())
flowFile = session.putAttribute(flowFile, "filename", 'headerfile.xml')
session.transfer(flowFile, REL_SUCCESS)
session.commit()
except Exception as e:
log.error("{}n{}".format(e,traceback.format_exc()))
session.rollback(True)  # put file back and penalize it

我最近找到了这个答案。

https://stackoverflow.com/a/35882335/7634711

这不是NiFi的问题。这是Python2的问题,以及它如何与json库一起工作。如果特殊字符出现在字典键中,Python3也会出现问题。

最新更新