如何使用Spark Scala API读取Kafka中的消息



我无法在msgItr中接收消息,而使用kafka命令的命令promt中,我能够看到分区中的消息。 请让我知道这里发生了什么。 我应该怎么做才能收到消息。

我尝试打印,但没有打印。可能是因为它是一个RDD,并且它在执行器节点上打印了一些东西。

val ssc = new StreamingContext(conf, Seconds(props.getProperty("spark.streaming.batchDuration").toInt))
val topics = Set(props.getProperty("kafkaConf.topic"))
// TODO: Externalize StorageLevel to props file
val storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
//"zookeeper.connect" -> "fepp-cdhmn-d2.fepoc.com"
val kafkaParams = Map[String, Object](
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
"zookeeper.connect" -> props.getProperty("kafkaConf.zookeeper.connect"),
"bootstrap.servers" -> props.getProperty("kafkaConf.bootstrap.servers"),
"group.id" -> props.getProperty("kafkaConf.group.id"),
"zookeeper.connection.timeout.ms" -> props.getProperty("kafkaConf.zookeeper.connection.timeout.ms"),
"security.protocol" -> props.getProperty("kafkaConf.security.protocol"),
"ssl.protocol" -> props.getProperty("kafkaConf.ssl.protocol"),
"ssl.keymanager.algorithm" -> props.getProperty("kafkaConf.ssl.keymanager.algorithm"),
"ssl.enabled.protocols" -> props.getProperty("kafkaConf.ssl.enabled.protocols"),
"ssl.truststore.type" -> props.getProperty("kafkaConf.ssl.truststore.type"),
"ssl.keystore.type" -> props.getProperty("kafkaConf.ssl.keystore.type"),
"ssl.truststore.location" -> props.getProperty("kafkaConf.ssl.truststore.location"),
"ssl.truststore.password" -> props.getProperty("kafkaConf.ssl.truststore.password"),
"ssl.keystore.location" -> props.getProperty("kafkaConf.ssl.keystore.location"),
"ssl.keystore.password" -> props.getProperty("kafkaConf.ssl.keystore.password"),
"ssl.key.password" -> props.getProperty("kafkaConf.ssl.key.password"),
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> props.getProperty("kafkaConf.auto.offset.reset"),
"enable.auto.commit" -> (props.getProperty("kafkaConf.enable.auto.commit").toBoolean: java.lang.Boolean),
"key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer" -> "org.apache.kafka.common.serialization.StringSerializer"
//"heartbeat.interval.ms" -> props.getProperty("kafkaConf.heartbeat.interval.ms"),
//"session.timeout.ms" -> props.getProperty("kafkaConf.session.timeout.ms")
)
// Must use the direct api as the old api does not support SSL
log.debug("Creating direct kafka stream")
val kafkaStream = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent,
Subscribe[String, String](topics, kafkaParams))

val res = kafkaStream.foreachRDD((kafkaRdd: RDD[ConsumerRecord[String, String]]) => {
val numPartitions = kafkaRdd.getNumPartitions
log.info(s"Processing RDD with '$numPartitions' partitions.")
// Only one partition for the kafka topic is supported at this time
if (numPartitions != 1) {
throw new RuntimeException("Kafka topic must have 1 partition")
}
val offsetRanges = kafkaRdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaRdd.foreachPartition((msgItr: Iterator[ConsumerRecord[String, String]]) => {
val log = LogManager.getRootLogger()

msgItr.foreach((kafkaMsg: ConsumerRecord[String, String]) => {
// Hbase connection Fails here. because of authentication with below error

2018-09-19 15:28:01 INFO  ZooKeeper:100 - Client environment:user.home=/home/service_account
2018-09-19 15:28:01 INFO  ZooKeeper:100 - Client environment:user.dir=/data/09/yarn/nm/usercache/service_account/appcache/application_1536891989660_9297/container_e208_1536891989660_9297_01_000002
2018-09-19 15:28:01 INFO  ZooKeeper:438 - Initiating client connection, connectString=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181 sessionTimeout=90000 watcher=hconnection-0x16648f570x0, quorum=depp-cdhmn-d1.domnnremvd.com:2181,depp-cdhmn-d2.domnnremvd.com:2181,depp-cdhmn-d3.domnnremvd.com:2181, baseZNode=/hbase
2018-09-19 15:28:01 INFO  ClientCnxn:975 - Opening socket connection to server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-19 15:28:01 INFO  ClientCnxn:852 - Socket connection established, initiating session, client: /999.99.999.999:33314, server: depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181
2018-09-19 15:28:01 INFO  ClientCnxn:1235 - Session establishment complete on server depp-cdhmn-d3.domnnremvd.com/999.99.999.777:2181, sessionid = 0x365cb965ff33958, negotiated timeout = 60000
false
false
2018-09-19 15:28:02 WARN  UserGroupInformation:1923 - PriviledgedActionException as:service_account (auth:SIMPLE) cause:javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 WARN  RpcClientImpl:675 - Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
2018-09-19 15:28:02 ERROR RpcClientImpl:685 - SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211)
at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:181)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:618)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$700(RpcClientImpl.java:163)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:744)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:741)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:741)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:907)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:874)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1243)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:227)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:336)
at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$BlockingStub.isMasterRunning(MasterProtos.java:58383)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.isMasterRunning(ConnectionManager.java:1712)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStubNoRetries(ConnectionManager.java:1650)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$StubMaker.makeStub(ConnectionManager.java:1672)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation$MasterServiceStubMaker.makeStub(ConnectionManager.java:1701)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.getKeepAliveMasterService(ConnectionManager.java:1858)
at org.apache.hadoop.hbase.client.MasterCallable.prepare(MasterCallable.java:38)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:134)
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4313)
at org.apache.hadoop.hbase.client.HBaseAdmin.executeCallable(HBaseAdmin.java:4305)
at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:533)
at org.apache.hadoop.hbase.client.HBaseAdmin.listTableNames(HBaseAdmin.java:517)
at com.company.etl.HbaseConnect.mainMethod(HbaseConnect.scala:39)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:205)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2$$anonfun$apply$3.apply(App.scala:178)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.foreach(KafkaRDD.scala:189)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:178)
at com.company.etl.App$$anonfun$1$$anonfun$apply$2.apply(App.scala:161)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122)
at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
... 43 more

这是因为 kerberos 身份验证。

设置系统属性。

System.setProperty("java.security.auth.login.config","/your/conf/directory/kafkajaas.conf");
System.setProperty("sun.security.jgss.debug","true");
System.setProperty("javax.security.auth.useSubjectCredsOnly","false");
System.setProperty("java.security.krb5.conf", "/your/krb5/conf/directory/krb5.conf");

您可以从Cloudera Kafka读取数据。(制片人(

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
.option("subscribe", "test")
.option("kafka.security.protocol","SASL_PLAINTEXT")
.option("kafka.sasl.kerberos.service.name","kafka")

您可以将数据写入 Cloudera Kafka 主题(消费者(

val query = blacklistControl.select(to_json(struct("Column1","Column2")).alias("value"))
.writeStream
.format("kafka")
.option("checkpointLocation", "/your/empty/directory")
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092")
.option("kafka.security.protocol","SASL_PLAINTEXT")
.option("kafka.sasl.kerberos.service.name","kafka")
.option("topic", "topic_xdr")
.start()

我遇到了完全相同的问题。正在发生的事情是执行器节点正在尝试写入 hbase 并且没有凭据。您需要做的是将密钥表文件传递给执行程序,并在执行器块中显式调用 KDC 身份验证

UserGroupInformation.loginUserFromKeytab("hdfs-user@MYCORP.NET", "/home/hdfs-user/hdfs-user.keytab"(;

从堆栈跟踪来看,看起来 kafka 已通过sasl的身份验证。 支持的 SASL 机制包括:

  1. GSSAPI (Kerberos(
  2. OAUTHBEARER
  3. 平原

从堆栈跟踪中,kafka 是使用GSSAPI配置的,您需要 相应地进行身份验证。您正在为SSL进行身份验证,而不是SASL. 查看此链接,了解进行身份验证的步骤。

最新更新