Pyspark读取jdbc给出错误.如何修复?



我正在pyspark中使用JDBC连接RDS MySQL。我已经尝试了几乎所有我在Stackoverflow上发现的调试,但仍然无法使其工作。

spark = SparkSession.builder.config("spark.jars", mysql_jar) 
.master("local[*]").appName("PySpark_MySQL_test").getOrCreate()
df= spark.read.format("jdbc").option("url", "jdbc:mysql://hostname.amazonaws.com:1150/dbname?user=user_name&password=password") 
.option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "table_name").load()

我试过在python的pymysql库中使用相同的连接细节,它连接并带回结果。
但是这里我得到下面的错误,我无法解决它。



raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o38.load.
: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at com.mysql.cj.jdbc.exceptions.SQLError.createCommunicationsException(SQLError.java:174)
at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:64)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:827)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:447)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:237)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:199)
at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.create(ConnectionProvider.scala:68)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:62)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:355)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:325)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$3(DataFrameReader.scala:307)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:307)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:225)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure

我也遇到过同样的问题。现在它成功了。其核心原因是spark使用主节点连接mysql,使用工作节点执行任务。所以你可以连接mysql,而引发通信错误。基于这个理论,你可以在mysql上打开安全规则,让所有的spark节点都可以连接到mysql

来这里寻求Docker答案的人可以试试下面的解决方案。使用下面的配置

source_df = spark.read.format('jdbc').options(
url='jdbc:mysql://host.docker.internal:3306/superset?useSSL=false&allowPublicKeyRetrieval=true',
driver='com.mysql.cj.jdbc.Driver',
dbtable='table',
user='root',
password='root').load()

我已经尝试了主机与localhost,127.0.0.1,甚至IPAddress从docker检查,但没有工作,然后将其更改为host.docker.internal,它工作。

相关内容

最新更新