从字典行定义Python Apache Beam模式



我想在Apache Beam(Python(中获得行模式,以便与SQL转换一起使用。然而,我遇到了下面解释的问题。

模式定义如下:

class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)

下面的例子正确地推断了模式:

with beam.Pipeline(options=pipeline_options) as p:

pcol = (p
| "Create" >> beam.Create(
[
RowSchema(colA='a1', colB='b1'),
RowSchema(colA='a2', colB=None)])
.with_output_types(RowSchema)
| beam.Map(print)
)

然而,下面的尝试提出了";ValueError:类型名称和字段名称必须是有效的标识符:'run<当地人>。RowSchema";

with beam.Pipeline(options=pipeline_options) as p:
pcol = (p
| "Create" >> beam.Create(
[
{'colA': 'a1', 'colB': 'b1'},
{'colA': 'a2', 'colB': None}])
| 'ToRow' >> beam.Map(
lambda x: RowSchema(**x)) 
.with_output_types(RowSchema)
| beam.Map(print)
)

全栈跟踪:

Traceback (most recent call last):

File "/usr/lib/python3.9/runpy.py", line 197, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/usr/lib/python3.9/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "home/src/main.py", line 326, in <module>
run()
File "home/src/main.py", line 267, in run
| 'ToRow' >> beam.Map(lambda x: RowSchema(**x)).with_output_types(RowSchema)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1661, in Map
pardo = FlatMap(wrapper, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1606, in FlatMap
pardo = ParDo(CallableWrapperDoFn(fn), *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/core.py", line 1217, in __init__
super().__init__(fn, *args, **kwargs)
File "home/lib/python3.9/site-packages/apache_beam/transforms/ptransform.py", line 861, in __init__
self.fn = pickler.loads(pickler.dumps(self.fn))
File "home/lib/python3.9/site-packages/apache_beam/internal/pickler.py", line 51, in loads
return desired_pickle_lib.loads(
File "home/lib/python3.9/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
return dill.loads(s)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "home/lib/python3.9/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "home/lib/python3.9/site-packages/dill/_dill.py", line 788, in _create_namedtuple
t = collections.namedtuple(name, fieldnames)
File "/usr/lib/python3.9/collections/__init__.py", line 390, in namedtuple
raise ValueError('Type names and field names must be valid '
ValueError: Type names and field names must be valid identifiers: 'run.<locals>.RowSchema'

如果我将模式定义更改为,则失败的尝试有效

RowSchema = typing.NamedTuple('RowSchema', [('colA', str), ('colB', typing.Optional[str])])

根据下面的一些参考,错误片段的格式似乎是正确的。

参考文献:

  • Apache Beam使用NamedTuple(Python(推断模式
  • https://beam.apache.org/documentation/programming-guide/#inferring-模式
  • https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang_sql.py
  • https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/sql_taxi.py

在Python 3.9、Beam 2.37.0以及包括DirectRunner、DataflowRunner和PortableRunner在内的多个运行程序上进行了测试。

只需将模式定义移动到run函数之外即可解决此问题。

class RowSchema(typing.NamedTuple):
colA: str
colB: typing.Optional[str]
coders.registry.register_coder(RowSchema, coders.RowCoder)
def run(argv=None, save_main_session=True):
...
with beam.Pipeline(options=pipeline_options) as p:
...

最新更新