Spring XD on YARN:无法将kafka源流到hdfs sink
我有一个正确的HDFS资源管理器启动和运行。我能够在YARN中成功地启动管理服务器和容器。
但是我无法将Kafka(source)流到HDFS(sink)
我配置了为Kafka(source)和hdfs(sink)提供的自定义模块。但是当我为一个主题生成kafka消息时,YARN集群中什么都没有发生。
设置细节:
HDFS/YARN apache version 2.6.0
Spring XD on YARN——Spring - XD -1.2.0. release - YARN .zip
我刚刚用ambari部署了XD集群,然后在YARN上使用XD,两者都工作得很好。对于纱线,我使用spring-xd-1.2.1.RELEASE-yarn.zip,只使用ambari部署的xd-shell。我只是从集群中获取设置,并使用ambari的postgred db,在那里我创建了xdjob
数据库和springxd
用户。
我的配置/服务器。在bin/xd-yarn push
出现之前,yml是这样的。
xd:
appmasterMemory: 512M
adminServers: 1
adminMemory: 512M
adminJavaOpts: -XX:MaxPermSize=128m
adminLocality: false
containers: 3
containerMemory: 512M
containerJavaOpts: -XX:MaxPermSize=128m
containerLocality: false
spring:
yarn:
applicationBaseDir: /xd/yarn/
---
xd:
container:
groups: yarn
---
spring:
yarn:
siteYarnAppClasspath: "/etc/hadoop/conf,/usr/hdp/current/hadoop-client/*,/usr/hdp/current/hadoop-client/lib/*,/usr/hdp/current/hadoop-hdfs-client/*,/usr/hdp/current/hadoop-hdfs-client/lib/*,/usr/hdp/current/hadoop-yarn-client/*,/usr/hdp/current/hadoop-yarn-client/lib/*"
siteMapreduceAppClasspath: "/usr/hdp/current/hadoop-mapreduce-client/*,/usr/hdp/current/hadoop-mapreduce-client/lib/*"
config:
mapreduce.application.framework.path: '/hdp/apps/2.2.6.0-2800/mapreduce/mapreduce.tar.gz#mr-framework'
mapreduce.application.classpath: '$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/hdp/2.2.6.0-2800/hadoop/lib/hadoop-lzo-0.6.0.2.2.4.2-2.jar:/etc/hadoop/conf/secure'
---
spring:
hadoop:
fsUri: hdfs://ambari-2.localdomain:8020
resourceManagerHost: ambari-3.localdomain
resourceManagerPort: 8050
resourceManagerSchedulerAddress: ambari-3.localdomain:8030
jobHistoryAddress: ambari-3.localdomain:10020
---
zk:
namespace: xd
client:
connect: ambari-3.localdomain:2181
sessionTimeout: 60000
connectionTimeout: 30000
initialRetryWait: 1000
retryMaxAttempts: 3
---
xd:
customModule:
home: ${spring.hadoop.fsUri}/xd/yarn/custom-modules
---
xd:
transport: kafka
messagebus:
kafka:
brokers: ambari-2.localdomain:6667
zkAddress: ambari-3.localdomain:2181
---
spring:
datasource:
url: jdbc:postgresql://ambari-1.localdomain/xdjob
username: springxd
password: springxd
driverClassName: org.postgresql.Driver
validationQuery: select 1
---
server:
port: 0
---
spring:
profiles: admin
management:
port: 0
你可以使用time
源,但我使用http
源。找到正确的地址,使用runtime containers
和runtime modules
来查看http
源在哪里运行。在xd-yarn
shell中,您可以使用admininfo
查询zk以查看xd admin运行的位置。这是必需的,以便您可以将xd-shell
连接到运行在yarn
上的admin。
xd:>admin config server http://ambari-2.localdomain:50254
Successfully targeted http://ambari-2.localdomain:50254
让我们使用sink/source创建kafka流。
xd:>stream create httpToKafkaStream --definition "http | kafka --topic=mytopic --brokerList=ambari-2.localdomain:6667" --deploy
xd:>stream create kafkaToHdfsStream --definition "kafka --zkconnect=ambari-3.localdomain:2181 --topic=mytopic --outputType=text/
xd:>http post --target http://ambari-5.localdomain:9000 --data "message1"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message2"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message3"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message4"
xd:>http post --target http://ambari-5.localdomain:9000 --data "message5"
xd:>hadoop fs cat /xd/kafkaToHdfsStream/kafkaToHdfsStream-0.txt
message1
message2
message3
message4
message5
注意:你需要创建mytopic
,目前kafka源如果不存在将无法启动