我正在使用Apache Flink在scala中运行一个简单的脚本。 我从Apache Kafka制作人那里读到了数据。这是我的代码。
import java.util.Properties
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{read, write}
object App {
def main(args : Array[String]) {
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int)
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
val stream1 = env
.addSource(consumer1)
.flatMap(raw => JsonMethods.parse(raw).toOption)
env.execute()
}
}
我在flatMap上收到"缺少参数类型"错误(但与尝试使用其他函数(如map或过滤器(时遇到的错误相同(。 我不知道如何解决这个问题。 有什么帮助吗?
如果
你应该使用StreamExecutionEnvironment
的 scala API 版本。
将导入更改为:
import java.util.Properties
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods