理想情况下,截取的以下代码可以工作:
import kudu
from kudu.client import Partitioning
df = … #some spark dataframe
# Connect to Kudu master server
client = kudu.connect(host=‘…‘, port=7051)
# infer schema from spark dataframe
schema = df.schema
# Define partitioning schema
partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
# Create new table
client.create_table('dev.some_example', schema, partitioning)
但是client.create_table需要一个kudu.schema.schema,而不是来自数据帧的结构。然而,在Scala中,您可以做到这一点(从https://kudu.apache.org/docs/developing.html):
kuduContext.createTable(
"dev.some_example", df.schema, Seq("key"),
new CreateTableOptions()
.setNumReplicas(1)
.addHashPartitions(List("key").asJava, 3))
现在我想知道我是否可以用PySpark做同样的事情,而不用用kudu模式生成器手动定义每一列?
所以我给自己写了一个助手函数,将PySpark Dataframe模式转换为kudu.schema.schema。我希望这能帮助到别人。感谢反馈!
附带说明,您可能需要添加或编辑数据类型映射。
import kudu
from kudu.client import Partitioning
def convert_to_kudu_schema(df_schema, primary_keys):
builder = kudu.schema.SchemaBuilder()
data_type_map = {
"StringType":kudu.string,
"LongType":kudu.int64,
"IntegerType":kudu.int32,
"FloatType":kudu.float,
"DoubleType":kudu.double,
"BooleanType":kudu.bool,
"TimestampType":kudu.unixtime_micros,
}
for sf in df_schema:
pk = False
nullable=sf.nullable
if (sf.name in primary_keys):
pk = True
nullable = False
builder.add_column(
name=sf.name,
nullable=nullable,
type_=data_type_map[str(sf.dataType)]
)
builder.set_primary_keys(primary_keys)
return builder.build()
你可以这样称呼它:
kudu_schema = convert_to_kudu_schema(df.schema,primary_keys=["key1","key2"])
我仍然对更优雅的解决方案持开放态度。(