我使用spark来处理我的数据,像这样:
dataframe_mysql = spark.read.format('jdbc').options(
url='jdbc:mysql://xxxxxxx',
driver='com.mysql.cj.jdbc.Driver',
dbtable='(select * from test_table where id > 100) t',
user='xxxxxx',
password='xxxxxx'
).load()
result = spark.sparkContext.parallelize(dataframe_mysql, 1)
但是我从spark得到了这个错误:
Traceback(最近一次调用):File "/private/var/www/http/hawk-scripts/hawk_etl/scripts/spark_rds_to_parquet.py"过程()文件"/private/var/www/http/hawk-scripts/hawk_etl/scripts/spark_rds_to_parquette .py",第36行,正在处理中result = spark.sparkContext.并行化(dataframe_mysql, 1) . map(函数)File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py",第574行,并行化File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/context.py",第611行,在_serialize_to_jvm .zip中File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",第211行,在dump_stream .zip文件中File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",第133行,在dump_stream中File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",第143行,在_write_with_length .zip中File "/Library/Frameworks/python .framework/Versions/3.9/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py",第427行,转储TypeError: cannot pickle '_thread。
RLock的对象
我用错了吗?我应该如何使用SparkContext。并行处理数据框架?
我明白了,dataframe_mysql
已经是Dataframe
了,如果想要得到RDD
,就用dataframe.rdd
代替parallelize