Apache Beam如何根据日期值过滤数据



我试图从CSV文件中读取记录,并根据日期过滤记录。我以以下方式实现了这一点。但这是正确的方法吗?

步骤如下:

创建管道
  1. 从文件
  2. 读取数据
  3. 执行必要的过滤
  4. 创建MapElement对象并将OrderRequest转换为字符串
  5. 将OrderRequest实体映射到字符串
  6. 将输出写入文件
  7. 代码:

// Creating pipeline
Pipeline pipeline = Pipeline.create();
// For transformations Reading from a file
PCollection<String> orderRequest = pipeline
.apply(TextIO.read().from("src/main/resources/ST/STCheck/OrderRequest.csv"));
PCollection<OrderRequest> pCollectionTransformation = orderRequest
.apply(ParDo.of(new DoFn<String, OrderRequest>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
String rowString = c.element();
if (!rowString.contains("order_id")) {
String[] strArr = rowString.split(",");
OrderRequest orderRequest = new OrderRequest();
orderRequest.setOrder_id(strArr[0]);
// Condition to check if the
String source1 = strArr[1];
DateTimeFormatter fmt1 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d1 = fmt1.parseDateTime(source1);
System.out.println(d1);
String source2 = "4/24/2017";
DateTimeFormatter fmt2 = DateTimeFormat.forPattern("mm/dd/yyyy");
DateTime d2 = fmt2.parseDateTime(source2);
System.out.println(d2);
orderRequest.setOrder_date(strArr[1]);
System.out.println(strArr[1]);
orderRequest.setAmount(Double.valueOf(strArr[2]));
orderRequest.setCounter_id(strArr[3]);
if (DateTimeComparator.getInstance().compare(d1, d2) > -1) {
c.output(orderRequest);
}
}
}
}));
// Create a MapElement Object and convert the OrderRequest to String
MapElements<OrderRequest, String> mapElements = MapElements.into(TypeDescriptors.strings())
.via((OrderRequest orderRequestType) -> orderRequestType.getOrder_id() + " "
+ orderRequestType.getOrder_date() + " " + orderRequestType.getAmount() + " "
+ orderRequestType.getCounter_id());
// Mapping the OrderRequest Entity to String
PCollection<String> pStringList = pCollectionTransformation.apply(mapElements);
// Now Writing the elements to a file
pStringList.apply(TextIO.write().to("src/main/resources/ST/STCheck/OrderRequestOut.csv").withNumShards(1)
.withSuffix(".csv"));
// To run pipeline
pipeline.run();
System.out.println("We are done!!");

Pojo类:

public class OrderRequest  implements Serializable{
String order_id;
String order_date;
double amount;
String counter_id;
}

虽然我得到正确的结果,这是一个正确的方式吗?我的两个主要问题是

1) How to i access individual columns? So that, I can specify conditions based on that column value.
2) Can we specify headers when reading the data?

是的,您可以使用TextIO.read()处理这样的CSV文件,只要它们不包含嵌入换行符的字段,并且您可以识别/跳过标题行。您的管道看起来不错,但是作为一个小的样式问题,我可能会让第一个ParDo只执行解析,然后是一个查看日期以过滤掉内容的Filter。

如果您想自动推断标题行,您可以在主程序中打开读取第一行(使用标准java库或Beams FileSystems类)并手动提取它,将其传递到解析DoFn中。

我同意更列的方法会更自然。我们在Python中将其作为Dataframes API,现在可用于一般用途。你可以这样写

with beam.Pipeline() as p:
df = p | beam.dataframe.io.read_csv("src/main/resources/ST/STCheck/OrderRequest.csv")
filtered = df[df.order_date > limit]
filtered.write_csv("src/main/resources/ST/STCheck/OrderRequestOut.csv")

最新更新