如何将数据批量加载到Apache Phoenix 5.1.2.使用Apache Spark 3.2.1



我正在尝试将CSV文件(每个文件30-300 GB(大容量加载到Apache Phoenix表中。我正试图用Apache Spark插件来做到这一点(https://phoenix.apache.org/phoenix_spark.html)。然而,当我火花提交我的代码:

import sys
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName('From CSV to Phoenix Loader').getOrCreate()
csv_name = sys.argv[1]
table_name = sys.argv[2]
csv_file = spark.read 
.option("header", True) 
.option("delimiter", ",") 
.csv(f"hdfs://open1:9000/csv_files/{csv_name}")
csv_file.printSchema()
csv_file.write 
.format("phoenix") 
.mode("overwrite") 
.option("table", table_name) 
.option("zkUrl", "open1,open2,open3,open4,open5,open6,open7,open8,open9,open10,open11,open12:2181") 
.save()
spark.stop()
if __name__ == "__main__":
main()

我得到错误

Traceback (most recent call last):
File "load_from_csv_to_table.py", line 30, in <module>
main()
File "load_from_csv_to_table.py", line 19, in main
csv_file.write 
File "/home/hadoopuser/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 738, in save
self._jwrite.save()
File "/home/hadoopuser/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/home/hadoopuser/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 111, in deco
return f(*a, **kw)
File "/home/hadoopuser/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o48.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: phoenix. Please find packages at
http://spark.apache.org/third-party-projects.html

我的火花提交:

spark-submit --master yarn --deploy-mode cluster --jars /usr/local/phoenix/phoenix-spark-5.0.0-HBase-2.0.jar,/usr/local/phoenix/phoenix-client-hbase-2.4-5.1.2.jar hdfs://open1:9000/apps/python/load_from_csv_to_table.py data.csv TABLE.TABLE

问题是,我不知道哪些JAR应该附加到spark提交中。当我看着https://mvnrepository.com/artifact/org.apache.phoenix/phoenix-spark,我没有看到Apache Phoenix 5.1.2的正确JAR版本。最后一个版本是5.0.0,HBase 2.0.0从2018年开始。如何使用PySpark 3.2.1将数据批量加载到Apache Phoenix 5.1.2?我需要哪些JAR?

我还定义了spark-defaults.conf:

spark.executor.extraClassPath=/usr/local/phoenix/phoenix-client-hbase-2.4-5.1.2.jar:/usr/local/phoenix/phoenix-spark-5.0.0-HBase-2.0.jar
spark.driver.extraClassPath=/usr/local/phoenix/phoenix-client-hbase-2.4-5.1.2.jar:/usr/local/phoenix/phoenix-spark-5.0.0-HBase-2.0.jar

但我认为JAR是不合适的。

将其添加到SparkSession。

spark = SparkSession.builder.appName('From CSV to Phoenix Loader').config("spark.driver.extraClassPath", "/usr/local/phoenix/phoenix-spark-5.0.0-HBase-2.0.jar").getOrCreate()

最新更新