我在sftp服务器上有一个很大的XML文件(可能有大约一百万条记录)。我不想把整个文件加载到内存中。这样做的目的是,我的路由获取文件,拆分它,并使用堆栈构建器遍历元素,将其映射到JAXB对象,并将其发送到队列(或spring批处理),以便稍后持久化到数据库。
input.xml(仅2条记录为例)
<data>
<PRODUCTNUMBER>
<PRODUCTNUMBER>8D0201075E</PRODUCTNUMBER>
<CURRGROSSPRICE>427.90</CURRGROSSPRICE>
<NEXTGROSSPRICE>0.00</NEXTGROSSPRICE>
<NEXTPRICEDATE>1900-01-01 00:00:00</NEXTPRICEDATE>
<PRODUCTNAME_FR>Some description</PRODUCTNAME_FR>
<PRODUCTNAME_NL>Some description</PRODUCTNAME_NL>
</PRODUCTNUMBER>
<PRODUCTNUMBER>
<PRODUCTNUMBER>99630211802</PRODUCTNUMBER>
<CURRGROSSPRICE>3.78</CURRGROSSPRICE>
<NEXTGROSSPRICE>0.00</NEXTGROSSPRICE>
<NEXTPRICEDATE>1900-01-01 00:00:00</NEXTPRICEDATE>
<PRODUCTNAME_FR>Some description</PRODUCTNAME_FR>
</PRODUCTNUMBER>
</data>
<<p>骆驼路线/strong>from("sftp:localhost:22/in")
.split(stax(PartRecords.class)).streaming()
.marshal().json(JsonLibrary.Jackson, true)
.to("rabbitmq://rabbitmq:5672/myExchange?queue=partQueue&routingKey=queue.part")
.end();
PartRecord.java
@XmlRootElement(name = "PRODUCTNUMBER")
@XmlAccessorType(XmlAccessType.FIELD)
@Getter
@Setter
@ToString
public class PartRecord implements Serializable {
@XmlElement(name = "PRODUCTNUMBER")
private String productNumber;
@XmlElement(name = "CURRGROSSPRICE")
private BigDecimal currentPrice;
@XmlElement(name = "PRODUCTNAME_NL")
private String partDescriptionNL;
@XmlElement(name = "PRODUCTNAME_FR")
private String partDescriptionFR;
}
PartRecords.java
@XmlRootElement(name = "data")
@XmlAccessorType(XmlAccessType.FIELD)
@ToString
public class PartRecords implements Serializable {
@XmlElement(name = "PRODUCTNUMBER")
private List<PartRecord> partRecords;
public List<PartRecord> getPartRecords() {
if (partRecords == null) {
partRecords = new ArrayList<>();
}
return partRecords;
}
}
路由工作正常,消息被放在队列中,但不是每条记录有1条消息,而是将json中的整个文件放在队列中。我想这是正常行为,所以我需要额外的东西。我不知道每条记录有1条消息是否是个好主意,但我想象有1条消息包含整个文件也不性能。
当前行为输出
{
"partRecords" : [ {
"productNumber" : "8D0201075E",
"currentPrice" : 427.90,
"partDescriptionNL" : "Some description",
"partDescriptionFR" : "Some description"
}, {
"productNumber" : "99630211802",
"currentPrice" : 3.78,
"partDescriptionNL" : null,
"partDescriptionFR" : "Some description"
}]
}
我做错了什么?我使用的是Spring Boot v2.5.4, Apache Camel v3.11.1。提前谢谢。
在路由器中使用PartRecord而不是PartRecords:
from("sftp:localhost:22/in")
.split(stax(PartRecord.class)).streaming()
.marshal().json(JsonLibrary.Jackson, true)
.to("rabbitmq://rabbitmq:5672/myExchange?queue=partQueue&routingKey=queue.part")
.end();