我正在运行一个spark作业,我需要从HDFS表中读取,让我们说HadoopCluster-1。现在,我希望将聚合数据框放入另一个HadoopCluster-2中的表中。最好的方法是什么?
- 我正在考虑以下方法:在将数据写入目标表之前,使用addResource读取hdfs-site.xml和core-site.xml。然后将所有配置值复制到Map<String,String>然后在我的dataset.sparkSession.SparkContext.hadoopConfiguration()中设置这些配置值。
这是实现我的目标的好方法吗?
如果您想从cluster1读取hive表作为数据框,并在转换数据框后将其作为hive表写入cluster2,您可以尝试以下方法:
- 确保在两个集群上都运行hiveserver2。命令:
hive——service hiveserever2
hive——service metastore
-
确保hive正确配置了用户名/密码。你可以把用户名/密码都标记为空,但你会得到一个错误,你可以通过引用这个链接来解决这个问题。
-
现在从cluster1中读取hive table作为spark dataframe,并在转换后写入cluster2的hive table
// spark-scala code val sourceJdbcMap = Map( "url"->"jdbc:hive2://<source_host>:<port>", //default port is 10000 "driver"->"org.apache.hive.jdbc.HiveDriver", "user"->"<username>", "password"->"<password>", "dbtable"->"<source_table>") val targetJdbcMap = Map( "url"->"jdbc:hive2://<target_host>:<port>", //default port is 10000 "driver"->"org.apache.hive.jdbc.HiveDriver", "user"->"<username>", "password"->"<password>", "dbtable"->"<target_table>") val sourceDF = spark.read.format("jdbc").options(sourceJdbcMap).load() val transformedDF = //transformation goes here... transformedDF.write.options(targetJdbcMap).format("jdbc").save()
我能够从一个启用HA的Hadoop集群hdfs位置读取数据,并使用Spark按照以下步骤写入另一个启用HA的Hadoop集群hdfs位置:
1)检查两个服务器中的KDC是否属于相同或不同的领域。如果相同,则跳过此步骤,否则在两个KDC之间设置跨域身份验证。可以这样写:https://community.cloudera.com/t5/Community-Articles/Setup-cross-realm-trust-between-two-MIT-KDC/ta-p/247026
场景1:这是一个重复的读写操作
2)按照下面的步骤编辑源集群的hdfs-site.xml文件:https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.6.4/bk_administration/content/distcp_between_ha_clusters.html
3)在应用程序启动时,在spark conf中添加以下属性:spark.kerberos.access.hadoopFileSystems=hdfs://targetCluster-01.xyz.com:8020
基本上,该值应该是活动命名节点的InetSocketAddress。
4)在代码中,给出目标hdfs位置的绝对路径。例如:df.write.mode(SaveMode.Append).save("hdfs://targetCluster-01.xyz.com/usr/tmp/targetFolder")
注意:在步骤4中,您也可以提供逻辑路径,如hdfs://targetCluster/usr/tmp/targetFolder
因为我们已经在hdfs-site.xml中添加了目标namservice .
场景2:这是一个特殊的请求,您只需要执行一次读写操作
跳过上面提到的第2步。
按照步骤#3和步骤#4执行。
p :作业的用户应该能够访问这两个集群。