ZipFileInputStream 读取方法在 Spring Batch 中使用 StaxEventItemWrite



前言

我在spring-boot应用程序中使用spring batch。Spring Boot 版本是2.3.3.RELEASE.

我有一个multi-step的工作,它在第一步中验证xml文件头,然后在chunk oriented stepread transaction,对每个事务进行一些business logic,然后将其writexml文件。在第三步也是最后一步中,我delete the input file.我的Chunk size当前设置为500交易。

问题

一切都很好,除了RUNNABLE状态下的thread is locked几乎10 minutesbetween write and next read。我拿了thread dump,但无法从中得到太多,尽管我观察到thread stack trace一直在same

"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
java.lang.Thread.State: RUNNABLE
at java.util.zip.ZipFile.read(Native Method)
at java.util.zip.ZipFile.access$1400(ZipFile.java:60)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
- locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$$71be73fd.write(<generated>)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$$48e7f3da.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$$35f2661a.write(<generated>)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)

StaxEventItemWriter config

@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, 
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));

OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);

return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}

Jaxb2Marshaller 配置

import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;
import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;
import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {

@Override
public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
try {
Marshaller marshaller = createMarshaller();
if (StaxUtils.isStaxResult(result)) {
marshalStaxResult(marshaller, graph, result);
}
}
catch (JAXBException ex) {
throw convertJaxbException(ex);
}
}

private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
if (streamWriter != null) {
jaxbMarshaller.marshal(graph, streamWriter);
}
else {
XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
if (eventWriter != null) {
ObjectFactory fact = new ObjectFactory();
TransactionPositionReport transaction = fact.createTransactionPositionReport();
transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
transaction.setMifir(((TransactionPositionReport)graph).getMifir());
transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);

QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);
marshaller.marshal(element, eventWriter);
}
else {
throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
}
}
}
}

观察

如果我reduce500100chunk batch size,等待是reduced to 2 minutes(approx)

编写器需要的时间与块大小成比例,这表明编写器在编写每个项目时效率非常低。

如果每次检查时堆栈跟踪都相似,则其根本原因是OmegaXmlJaxb2Marshaller::marshalStaxResult中的JAXBContext::newInstance调用。为每个要写入的项目创建一个新的编组器开销太大。

你应该移动代码块

JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);

OmegaXmlJaxb2Marshaller的构造函数,并marshaller实例变量。

然后,编组器将只创建一次,编写器应该快得多。

相关内容

  • 没有找到相关文章

最新更新