我正在使用QueryBatcher
查询文档,并使用ApplyTransformListener
应用转换。所有批次完成后,我想知道是否有任何批次失败。JobReport
似乎是这样做的方法。我的问题是,即使有失败,工作报告总是报告每批成功。用于测试目的批次大小为1,以便每个文档都在一批中处理。
final ApplyTransformListener transformListener = new ApplyTransformListener()
.withApplyResult(ApplyTransformListener.ApplyResult.REPLACE)
.withTransform(new ServerTransform(transformName))
.onSuccess(batch -> {
if (log.isTraceEnabled()) {
for (String item : batch.getItems()) {
log.trace("Batch #{}: item {} successfully executed.", batch.getForestBatchNumber(), item);
}
}
log.debug("Batch #{}: finished executed.", batch.getForestBatchNumber());
})
.onFailure((batch, throwable) -> {
log.error("Batch #{}: failed.", batch.getForestBatchNumber(), throwable);
})
.onSkipped(batch -> Arrays.stream(batch.getItems())
.forEach(it -> log.warn("Skipped processing document {}.", it))
);
final QueryBatcher batcher = dmm.newQueryBatcher(queryDef)
.withBatchSize(batchSize)
.withConsistentSnapshot()
.onUrisReady(transformListener);
try {
final JobTicket jobTicket = dmm.startJob(batcher);
batcher.awaitCompletion();
final JobReport jobReport = dmm.getJobReport(jobTicket);
if (jobReport.getFailureBatchesCount() > 0) {
// expected to be at least 1
throw new MagicException(String.format("%d batches failed to executed.", jobReport.getFailureBatchesCount()));
}
dmm.stopJob(jobTicket);
log.debug("Successfully executed {} batches.", jobReport.getSuccessBatchesCount());
} catch (final Exception ex) {
System.out.println(ex);
}
这些是生成的日志:
11:04:10.950 [pool-2-thread-2] TRACE - Batch #1: item test/Cat.xml successfully executed.
11:04:10.950 [pool-2-thread-2] DEBUG - Batch #1: finished executed.
11:04:10.952 [pool-2-thread-1] TRACE - Batch #1: item test/Cat3.xml successfully executed.
11:04:10.952 [pool-2-thread-1] DEBUG - Batch #1: finished executed.
11:04:10.971 [pool-2-thread-3] ERROR - Batch #2: failed.
com.marklogic.client.FailedRequestException: Local message: failed to apply resource at internal/apply-transform: Internal Server Error. Server Message: error (err:FOER0000): . See the MarkLogic server error log for further detail.
at com.marklogic.client.impl.OkHttpServices.checkStatus(OkHttpServices.java:4395) ~[marklogic-client-api-4.2.0.jar:?]
at com.marklogic.client.impl.OkHttpServices.postResource(OkHttpServices.java:3377) ~[marklogic-client-api-4.2.0.jar:?]
at com.marklogic.client.impl.OkHttpServices.postResource(OkHttpServices.java:3323) ~[marklogic-client-api-4.2.0.jar:?]
at com.marklogic.client.impl.OkHttpServices.postResource(OkHttpServices.java:3314) ~[marklogic-client-api-4.2.0.jar:?]
at com.marklogic.client.datamovement.ApplyTransformListener.processEvent(ApplyTransformListener.java:144) [marklogic-client-api-4.2.0.jar:?]
at com.marklogic.client.datamovement.impl.QueryBatcherImpl$QueryTask.run(QueryBatcherImpl.java:674) [marklogic-client-api-4.2.0.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_222]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_222]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
11:04:10.974 [main] DEBUG - Successfully executed 3/3 batches.
您看到的确实存在错误并记录在我的onFailure
侦听器中。
转换非常简单,仅用于测试目的。如果某个值不等于一个,则会引发错误:
xquery version "1.0-ml";
module namespace transform = "http://marklogic.com/rest-api/transform/magic-test/cat.xml";
declare function transform($context as map:map, $params as map:map, $content as document-node()) as document-node(){
if (xs:integer($content/cats/age) eq 1) then
document {
<cats>
{$content/cats/uri}
{$content/cats/age}
<name>Tiger</name>
</cats>
}
else fn:error()
};
这就是我的数据的外观:
<cats>
<uri>test/Cat</uri>
<name>cat</name>
<age>1</age>
</cats>
<cats>
<uri>test/Cat2</uri>
<name>cat two</name>
<age>2</age>
</cats>
<cats>
<uri>test/Cat3</uri>
<name>cat three</name>
<age>1</age>
</cats>
我正在使用java-client-api:4.2.0
。jobReport.getFailureBatchesCount()
为什么即使一批失败也不等于1?我是否需要另一个我不知道的频繁听众?
在marklogic-java-api github repo中提交了一个错误。