无法使用Flink Table API打印CSV文件



我正在尝试读取一个包含34个字段的文件,该文件将使用Netbeans在控制台上打印。然而,我所能打印的只是模式。因为在这个与csvreader一起使用的特定版本的Flink中缺少打印选项。

请查看代码并帮助我了解应该更正的地方。我本打算使用内置APICSVReader,但结果发现它不支持超过22个字段,因此不得不使用表API。也尝试过使用CsvTableSource1.5.1 Flink版本,但语法不好。由于.field("%CPU", Types.FLOAT())不断为类型float提供错误,无法识别符号。我的主要目标是能够读取CSV文件,然后发送到Kafka主题,但在此之前,我想检查文件是否已读取,但还没有成功。

import org.apache.flink.table.api.Table;
import  org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.sinks.CsvTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.api.java.Slide;

public class CsvReader {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = new CsvTableSource("/home/merlin/Experiments/input_container/container_data1.csv",
new String[] { "%CPU", "MEM", "VSZ", "RSS", "timestamp",
"OOM_Score", "io_read_count", "io_write_count", "io_read_bytes", "io_write_bytes",
"io_read_chars", "io_write_chars", "num_fds", "num_ctx_switches_voluntary", "num_ctx_switches_involuntary",
"mem_rss", "mem_vms", "mem_shared", "mem_text", "mem_lib", "mem_data", "mem_dirty", "mem_uss", "mem_pss",
"mem_swap", "num_threads", "cpu_time_user", "cpu_time_system", "cpu_time_children_user",
"cpu_time_children_system", "container_nr_sleeping", "container_nr_running",
"container_nr_stopped", "container_nr_uninterruptible","container_nr_iowait" },
new TypeInformation<?>[] {
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT(),
Types.FLOAT()
});// lenient
tEnv.registerTableSource("container", csvTableSource);
Table result = tEnv
.scan("container");
System.out.println(result);
result.printSchema();

}
}
/*tEnv.toAppendStream(result, Row.class).print();         
result.writeToSink(null);print();
env.execute();*/

这是输出

root
|-- %CPU: Float
|-- MEM: Float
|-- VSZ: Float
|-- RSS: Float
|-- timestamp: Float
|-- OOM_Score: Float
|-- io_read_count: Float
|-- io_write_count: Float
|-- io_read_bytes: Float
|-- io_write_bytes: Float
|-- io_read_chars: Float
|-- io_write_chars: Float
|-- num_fds: Float
|-- num_ctx_switches_voluntary: Float
|-- num_ctx_switches_involuntary: Float
|-- mem_rss: Float
|-- mem_vms: Float
|-- mem_shared: Float
|-- mem_text: Float
|-- mem_lib: Float
|-- mem_data: Float
|-- mem_dirty: Float
|-- mem_uss: Float
|-- mem_pss: Float
|-- mem_swap: Float
|-- num_threads: Float
|-- cpu_time_user: Float
|-- cpu_time_system: Float
|-- cpu_time_children_user: Float
|-- cpu_time_children_system: Float
|-- container_nr_sleeping: Float
|-- container_nr_running: Float
|-- container_nr_stopped: Float
|-- container_nr_uninterruptible: Float
|-- container_nr_iowait: Float

这是另一个版本的代码,也不起的作用

package wikiedits;
import static com.sun.xml.internal.fastinfoset.alphabet.BuiltInRestrictedAlphabets.table;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableEnvironment;
public class Csv {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
//TableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("home/merlin/Experiments/input_container/container_data1.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
// env.execute();
}
}

新编辑如果我必须把它传递到Kafka主题上,然后传递到函数调用中?这就是我尝试的:

DataStreamSink<Row> stream = addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties));
//DataStreamSink<Row> stream = tEnv.toAppendStream(table, Row.class).print();
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
public String map(String value) throws Exception {
SendToRestAPI sendrest= new SendToRestAPI(value);
String String1= sendrest.converttoJson();  
return "Stream Value: " + String1;
}
})
.addSink(new FlinkKafkaProducer09<>( "my-second-topic", new SimpleStringSchema(), properties)); /*.print();*/
env.execute();
}
}

stream.map行抛出错误:

Cannot find symbol:method Map

您必须将Table转换为DataStream才能打印。最简单的方法是将其转换为DataStream<Row>,如下所示:

DataStream<Row> stream = tEnv.toAppendStream(result, Row.class);
// print the stream & execute the program
stream.print();
env.execute();

有关更多详细信息,请参阅文档。

此代码:

package example.flink;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.sources.CsvTableSource;
import org.apache.flink.types.Row;
public class TestFlink {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment (env);
CsvTableSource csvTableSource = CsvTableSource
.builder()
.path("container_data.csv")
.field("%CPU", Types.FLOAT)
.field("MEM", Types.FLOAT)
.field("VSZ", Types.FLOAT)
.field("RSS", Types.FLOAT)
.field("timestamp", Types.FLOAT)
.field("OOM_Score", Types.FLOAT)
.field("io_read_count", Types.FLOAT)
.field("io_write_count", Types.FLOAT)
.field("io_read_bytes", Types.FLOAT)
.field("io_write_bytes", Types.FLOAT)
.field("io_read_chars", Types.FLOAT)
.field("io_write_chars", Types.FLOAT)
.field("num_fds", Types.FLOAT)
.field("num_ctx_switches_voluntary", Types.FLOAT)
.field("num_ctx_switches_involuntary", Types.FLOAT)
.field("mem_rss", Types.FLOAT)
.field("mem_vms", Types.FLOAT)
.field("mem_shared", Types.FLOAT)
.field("mem_text", Types.FLOAT)
.field("mem_lib", Types.FLOAT)
.field("mem_data", Types.FLOAT)
.field("mem_dirty", Types.FLOAT)
.field("mem_uss", Types.FLOAT)
.field("mem_pss", Types.FLOAT)
.field("mem_swap", Types.FLOAT)
.field("num_threads", Types.FLOAT)
.field("cpu_time_user", Types.FLOAT)
.field("cpu_time_system", Types.FLOAT)
.field("cpu_time_children_user", Types.FLOAT)
.field("cpu_time_children_system", Types.FLOAT)
.field("container_nr_sleeping", Types.FLOAT)
.field("container_nr_running", Types.FLOAT)
.field("container_nr_stopped", Types.FLOAT)
.field("container_nr_uninterruptible", Types.FLOAT)
.field("container_nr_iowait", Types.FLOAT)
.fieldDelimiter(",")
.lineDelimiter("n")
.ignoreFirstLine()
.ignoreParseErrors()
.commentPrefix("%")
.build();
// name your table source
tEnv.registerTableSource("container", csvTableSource);
Table table = tEnv.scan("container");
DataStream<Row> stream = tEnv.toAppendStream(table, Row.class);
// define the sink as common print on console here
stream.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}

有了这些库(有些可能是多余的(:

compile group: 'org.slf4j', name: 'slf4j-simple', version: '1.7.25'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-table_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-scala_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.5.1'
compile group: 'org.apache.flink', name: 'flink-runtime_2.11', version: '1.5.1'

至少正在运行。我不确定它是否提供了您需要的输出,但它几乎完全是您在最新编辑中所做的,但它是在IDE中工作的。这有帮助吗?

如果您的分隔符仍然是空格,请记住更改.fieldDelimiter(",")

相关内容

  • 没有找到相关文章

最新更新