我试图执行一个Pyspark语句,该语句在Python for循环中写入BigTable,这导致了以下错误(使用Dataproc提交作业(。任何客户端没有正确关闭(如这里所建议的(,如果是,有什么方法可以在Pyspark中关闭?
请注意,每次使用新的Dataproc作业手动重新执行脚本效果良好,因此作业本身是正确的。
感谢您的支持!
Pyspark脚本
from pyspark import SparkContext
from pyspark.sql import SQLContext
import json
sc = SparkContext()
sqlc = SQLContext(sc)
def create_df(n_start,n_stop):
# Data
row_1 = ['a']+['{}'.format(i) for i in range(n_start,n_stop)]
row_2 = ['b']+['{}'.format(i) for i in range(n_start,n_stop)]
# Spark schema
ls = [row_1,row_2]
schema = ['col0'] + ['col{}'.format(i) for i in range(n_start,n_stop)]
# Catalog
first_col = {"col0":{"cf":"rowkey", "col":"key", "type":"string"}}
other_cols = {"col{}".format(i):{"cf":"cf", "col":"col{}".format(i), "type":"string"} for i in range(n_start,n_stop)}
first_col.update(other_cols)
columns = first_col
d_catalogue = {}
d_catalogue["table"] = {"namespace":"default", "name":"testtable"}
d_catalogue["rowkey"] = "key"
d_catalogue["columns"] = columns
catalog = json.dumps(d_catalogue)
# Dataframe
df = sc.parallelize(ls, numSlices=1000).toDF(schema=schema)
return df,catalog
for i in range(0,2):
N_step = 100
N_start = 1
N_stop = N_start+N_step
data_source_format = "org.apache.spark.sql.execution.datasources.hbase"
df,catalog = create_df(N_start,N_stop)
df.write
.options(catalog=catalog,newTable= "5")
.format(data_source_format)
.save()
N_start += N_step
N_stop += N_step
Dataproc作业
gcloud dataproc jobs submit pyspark <my_script>.py
--cluster $SPARK_CLUSTER
--jars <path_to_jar>/bigtable-dataproc-spark-shc-assembly-0.1.jar
--region=us-east1
错误
...
ERROR com.google.bigtable.repackaged.io.grpc.internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=41, target=bigtable.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
...
如果您没有使用最新版本,请尝试更新到它。它看起来与最近修复的这个问题类似。我可以想象错误信息仍然会出现,但现在的工作已经完成,这意味着支持团队仍在努力,希望他们能在下一个版本中修复它。