如何使用Apache Camel聚合CSV行



我有一个类似的CSV:

County  City  Area  Street
county1 city1 area1 street1
county1 city1 area2 street2
county1 city1 area3 street7
county1 city2 area2 street2
county1 city2 area6 street1
county2 city1 area3 street3
county2 city1 area3 street2
...

在CSV解析期间,我需要聚合相同的县/市以创建如下所示的最终结构:

county1/city1: [ [area1, street1], [area2, street2], [area3, street7] ]
county1/city2: [ [area2, street2], [area6, street1] ]
county2/city1: [ [area3, street3], [area3, street2] ]

基本上是按县/市分组。

我用Camel尝试了不同的东西,这是最新的:

class CsvAppender {
    CsvRow append(CsvRow existing, CsvRow next) {
        next.previous = existing
        next
    }
}
@CsvRecord(separator = "\t")
class CsvRow {
    @DataField(pos = 1)
    private String county
    @DataField(pos = 2)
    private String city
    @DataField(pos = 3)
    private String area
    @DataField(pos = 4)
    private String street
    CsvRow previous
    boolean sameAggregateWithPrevious() {
        previous?.county == county && previous?.city == city
    }
    public String toString() {
        "${county} ${city} ${area} ${street}"
    }
}
class CsvRouteBuilder extends RouteBuilder {
    void configure() {
        CsvAppender appender = new CsvAppender()
        Closure predicate = { exchange ->
            def body = exchange.getIn().getBody(CsvRow.class)
            def currentAggregate = exchange.getIn().getHeader('CurrentAggregate')
            def nextAggregate = exchange.getIn().getHeader('NextAggregate')
            if (!currentAggregate) {
                currentAggregate = body.previous ? [ body.previous ] : []
                nextAggregate = []
            } else if (exchange.getIn().getHeader('AggregateComplete')) {
                currentAggregate = nextAggregate
                nextAggregate = []
            }
            def aggregateComplete = body.sameAggregateWithPrevious()
            if (aggregateComplete) {
                nextAggregate << body
            } else {
                currentAggregate << body
            }
            exchange.getIn().setHeaders(['CurrentAggregate': currentAggregate,
                                         'NextAggregate': nextAggregate,
                                         'AggregateComplete': aggregateComplete])
            aggregateComplete
        }
        from("file:/tmp/folder?noop=true")
            .split(body().tokenize('n')).streaming()
            .unmarshal().bindy(BindyType.Csv, CsvRow.class)
                .aggregate(constant(true), AggregationStrategies.bean(appender, "append")).completionPredicate(predicate)
                .process({
                    it.getOut().setBody(it.getIn().getHeader('CurrentAggregate')) })
                .convertBodyTo(String.class)
            .to("jms:myCsvSplitter")
    }
}

无论如何,我的解决方案并不完全工作,因为有时"previous"元素为空,代码看起来太冗长。

任何想法如何聚合csv文件正确?

我有一些粗略的工作代码,应该是足够好的,以帮助您。它是用Java编写的,而不是用Groovy编写的,因为我的Groovy不太好用。不过翻译起来应该很容易。

首先是聚合器:

public class MyAgregationStrategy implements AggregationStrategy {
    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        CsvRow newBody = (CsvRow)newExchange.getIn().getBody();
        Map<String, List<CsvRow>> map = null;
        if (oldExchange == null) {
            map = new HashMap<String, List<CsvRow>>();
            ArrayList list = new ArrayList<CsvRow>();
            list.add(newBody);
            map.put(newBody.getCounty(), list);
            newExchange.getIn().setBody(map);
            return newExchange;
        } else {
            map = oldExchange.getIn().getBody(Map.class);
            List list = map.get(newBody.getCounty());
            if ( list == null ) {
                list = new ArrayList<CsvRow>();
            }
            list.add(newBody);
            map.put(newBody.getCounty(), list);
            oldExchange.setProperty("CamelSplitComplete", newExchange.getProperty("CamelSplitComplete"));
            return oldExchange;
        }
    }
}

这将行存储在地图的列表中,以县为关键字。

然后路由:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("file:/c:/dev/test?noop=true")
        .split(body().tokenize("n"))
        .log("Read line ${body}")
        .unmarshal()
        .bindy(BindyType.Csv, CsvRow.class)
            .aggregate(constant(true), new MyAgregationStrategy()).completionPredicate(simple("${property.CamelSplitComplete} == true"))
        .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                Map results = (Map) exchange.getIn().getBody();
                System.out.println("Got results for " + results.size() + " counties");
            }
        });
    }
}

它使用CamelSplitComplete属性来检测分裂何时完成。在最后的处理过程中,您可以对地图做任何您喜欢的事情。或者,您可以更改聚合器策略,以便根据需要聚合结果。

最新更新