假设我用apache spark这样写HDFS:
var df = spark.readStream
.format("kafka")
//.option("kafka.bootstrap.servers", "kafka1:19092")
.option("kafka.bootstrap.servers", "localhost:29092")
.option("subscribe", "my_event")
.option("includeHeaders", "true")
.option("startingOffsets", "earliest")
.load()
df = df.selectExpr("CAST(topic AS STRING)", "CAST(partition AS STRING)", "CAST(offset AS STRING)", "CAST(value AS STRING)")
val emp_schema = new StructType()
.add("id", StringType, true)
.add("timestamp", TimestampType, true)
df = df.select(
functions.col("topic"),
functions.col("partition"),
functions.col("offset"),
functions.from_json(functions.col("value"), emp_schema).alias("data"))
df = df.select("topic", "partition", "offset", "data.*")
val query = df.writeStream
.format("csv")
.option("path", "hdfs://172.30.0.5:8020/test")
.option("checkpointLocation", "checkpoint")
.start()
query.awaitTermination()
这里hdfs://172.30.0.5:8020
是命名节点。似乎这火花程序写入数据成功nameode。
如何从hive中查询这些数据?我是否必须将数据写入hive可以看到的特殊文件夹中?我必须为此文件夹定义数据库吗?这是怎么做到的呢?那么test
在文件系统中的位置是什么?
test then在文件系统中的位置是什么?
在/test
注意:如果您在core-site.xml中正确配置了fs.defaultFS
,那么您不需要指定完整的namenode地址。
我是否必须将数据写入hive可以看到的特殊文件夹?
你可以,这将是最简单的,但文档涵盖了"managed"(一个专用的HDFS位置)和"外部";(任何其他目录,有其他限制)Hive表
https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html
如何从hive中查询这些数据?
见上面的链接。
就其价值而言,汇合的卡夫卡的连接器,可以写入数据HDFS和创建蜂巢表