使用cassandra数据库查询作为Flink程序的源



我有一个Cassandra数据库,它必须在Flink程序中从类似套接字的steam接收数据,以便进行流处理。因此,我编写了一个简单的客户端程序,从Cassandra读取数据并将数据发送到套接字;此外,我还在服务器库中编写了Flink程序。事实上,我的客户端程序很简单,不使用任何Flink指令;它只是将字符串格式的Cassandra行发送到套接字,服务器必须接收该行。首先,我运行Flink程序来监听客户端,然后运行客户端程序。客户端从服务器接收到此流(因为服务器发送数据流数据,而客户端无法正确接收):

嗨,客户org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

之后,两个程序都保持运行,不会发送和接收任何数据,也不会出现错误。

Flink程序如下:公共类WordCount_in_cassandra{

private static int myport=9999;
private static String hostname="localhost";
//static ServerSocket variable
private static ServerSocket server;
private static int count_row=0;
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up the execution environment
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
//create the socket server object
server = new ServerSocket(myport);
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
while (true){
System.out.println("Waiting for client request");
//creating socket and waiting for client connection
Socket socket = server.accept();
DataStream<String> stream = env.socketTextStream(hostname, 
myport);
stream.print();
//write object to Socket
oos.writeObject("Hi Client " + stream.toString());
oos.close();
socket.close();
// parse the data, group it, window it, and aggregate the 
counts
DataStream<Tuple2<String, Long>> counts = stream
.flatMap(new FlatMapFunction<String, Tuple2<String, 
Long>>() {
@Override
public void flatMap(String value, 
Collector<Tuple2<String, Long>> out) {
// normalize and split the line
String[] words = value.toLowerCase().split("\W+");
// emit the pairs
for (String word : words) {
if (!word.isEmpty()) {
out.collect(new Tuple2<String, Long>(word, 1L));
}
}
}
})
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
// emit result
if (params.has("output")) {
counts.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use -- 
output to specify output path.");
counts.print();
}
//terminate the server if client sends exit request
if (stream.equals("exit")){
System.out.println("row_count : "+count_row);
break;
}
// execute program
env.execute("Streaming WordCount");
}//while true
System.out.println("Shutting down Socket server!!");
server.close();
}//main
}

客户端程序如下:

public class client_code {
private static Cluster cluster = 
Cluster.builder().addContactPoint("127.0.0.1")
.withPort(9042).build();
private static Session session = cluster.connect("mar1");
public static void main(String[] args) throws UnknownHostException, 
IOException, ClassNotFoundException, InterruptedException {
String serverIP = "localhost";
int port=9999;
Socket socket = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
ResultSet result = session.execute("select * from tlbtest15");
for (Row row : result) {
//establish socket connection to server
socket = new Socket(serverIP, port);
//write to socket using ObjectOutputStream
oos = new ObjectOutputStream(socket.getOutputStream());
System.out.println("Sending request to Socket Server");
if (row==result) oos.writeObject("exit");
else oos.writeObject(""+row+"");
//read the server response message
ois = new ObjectInputStream(socket.getInputStream());
String message = (String) ois.readObject();
System.out.println("Message: " + message);
//close resources
ois.close();
oos.close();
Thread.sleep(100);
}
cluster.close();
}
}

你能告诉我如何解决我的问题吗?

如有任何帮助,我们将不胜感激。

构建Flink应用程序的方式存在一些问题。一些评论:

  • Flink DataStream API用于描述在调用env.execute()时发送到集群执行的数据流图。将其封装在while(true)循环中是没有意义的
  • socketTextStream建立客户端连接。您的服务器似乎没有执行任何有用的操作
  • stream.equals("exit")--流是一个数据流,而不是字符串。如果您想在流元素具有特定值时做一些特殊的事情,则需要通过使用一个进行一次事件处理的流操作来以不同的方式进行。至于关闭Flink作业,流式作业通常被设计为无限期运行,或者运行到有限输入源到达其末端,此时它们会自行关闭

你可以大大简化事情。我会重新开始,首先用这样的命令行替换您的客户端:

cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999

在这种情况下,nc(netcat)将充当服务器,允许Flink作为客户端。这将使事情变得更容易,因为env.socketTextTream就是这样使用的。

然后,您将能够使用普通的Flink应用程序处理结果。socketTextStream将生成一个流,其中包含作为文本行的查询结果,每行一行。

相关内容

  • 没有找到相关文章

最新更新