Flink如何使用-FasterXML/jackson数据格式文本-将CSV转换为POJO



我在我的类上收到一个CSV,我需要获取值来装箱POJO。我不必打开一个";文件.csv";Flink将逗号分隔的元素传递到EventDeserializationSchema;事件类";处理每一个事件。

这里有一个例子:

IN:"亚当,史密斯,66岁12:01:00.000"->输出:pojo

为此,我使用:https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

这是我的事件类,应该做这个技巧,实际上现在什么都没做。

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
public class Event implements Serializable {
CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();

CsvSchema schema = CsvSchema.emptySchema().withHeader();
CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
ObjectMapper mapper = new CsvMapper();
mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);

return Pojo
}

这是我的Pojo课程:

public class Pojo {

public String firstName;
public String lastName;
private int age;
public String time;
public Pojo(String firstName, String lastName, int age, String time) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.time =time;

}
}

如果能帮助全班同学归还Pojo,我们将不胜感激。

这是一个JSON示例:https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

单击EvenClasshttps://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaEvent.java#L27

要使其工作,您需要有一个默认的构造函数和字段的getter/setter。我不明白你要在Event中做什么,也不明白为什么还有Pojo,但假设你想将传入的字符串反序列化到Event中,这样的东西应该可以工作:

  1. EventPojo类:
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
  1. 此问题中的EventDeserializationSchema已实现deserialize()
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age", CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}

相关内容

  • 没有找到相关文章

最新更新