在Apache Nifi ExecuteStreamCommand中运行Python代码



我正在尝试在nifi executeStreamCommand处理器中运行Python代码。

代码包括诸如pandas和numpy之类的非纯Python模块,因此使用nifi executescript不是一个选项。

问题涉及在流文件中读取和修改流文件内容。

显然可以使用stdin读取传入的流量文件并用stdout书写,请参阅以下问题:使用executestreamCommand

的Python脚本

,但我无法实现此功能。

1。尝试简单地从stdin中读取CSV并修改它,但是当发送到PutFile处理器时,文件是相同的。

import sys
import pandas as pd
import io
df = pd.read_csv(io.StringIO(sys.stdin.read(1)))
df2 = pd.DataFrame([[5, 6], [7, 8]], columns=list('AB'))
df2 = df.append(df2)

2。尝试将其他代码包装在函数中,并在假设函数输出将返回stdout中,但结果相同。

def convert_csv_dataframe():
    a = pd.read_csv(io.StringIO(sys.stdin.read(1)))
    a.replace(["ABC", "AB"], "A", inplace=True)
    return a
convert_csv_dataframe()

如果任何人可以提供帮助,这将不胜感激。

编辑:

此代码有效。这个问题是在Nifi中。我是从"原始"关系而不是"输出流"关系中读到的。请注意,Stdin正在阅读一行,但不要认为这应该有所作为。我唯一的问题是:我可以从executestreamCommand中引用流量文件本身(不是它的内容)?

import sys
a = sys.stdin.readline()
a = a.upper()
sys.stdout.write(a)

我认为您需要在脚本中的某个地方写入stdout。我对Python的了解不多,但是两个示例看起来都像您从stdin阅读,然后在内存中修改数据,但切勿将其写回。

最新更新