前言
我在spring-boot
应用程序中使用spring batch
。Spring Boot 版本是2.3.3.RELEASE
.
我有一个multi-step
的工作,它在第一步中验证xml文件头,然后在chunk oriented step
中read transaction
,对每个事务进行一些business logic
,然后将其write
回xml
文件。在第三步也是最后一步中,我delete the input file
.我的Chunk size
当前设置为500
交易。
问题
一切都很好,除了RUNNABLE
状态下的thread is locked
几乎10 minutes
between write and next read
。我拿了thread dump
,但无法从中得到太多,尽管我观察到thread stack trace
一直在same
。
"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
java.lang.Thread.State: RUNNABLE
at java.util.zip.ZipFile.read(Native Method)
at java.util.zip.ZipFile.access$1400(ZipFile.java:60)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
- locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$$71be73fd.write(<generated>)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$$48e7f3da.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$$35f2661a.write(<generated>)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)
StaxEventItemWriter config
@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack,
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}
Jaxb2Marshaller 配置
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;
import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;
import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {
@Override
public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
try {
Marshaller marshaller = createMarshaller();
if (StaxUtils.isStaxResult(result)) {
marshalStaxResult(marshaller, graph, result);
}
}
catch (JAXBException ex) {
throw convertJaxbException(ex);
}
}
private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
if (streamWriter != null) {
jaxbMarshaller.marshal(graph, streamWriter);
}
else {
XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
if (eventWriter != null) {
ObjectFactory fact = new ObjectFactory();
TransactionPositionReport transaction = fact.createTransactionPositionReport();
transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
transaction.setMifir(((TransactionPositionReport)graph).getMifir());
transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);
marshaller.marshal(element, eventWriter);
}
else {
throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
}
}
}
}
观察
如果我reduce
从500
到100
的chunk batch size
,等待是reduced to 2 minutes(approx)
编写器需要的时间与块大小成比例,这表明编写器在编写每个项目时效率非常低。
如果每次检查时堆栈跟踪都相似,则其根本原因是OmegaXmlJaxb2Marshaller::marshalStaxResult
中的JAXBContext::newInstance
调用。为每个要写入的项目创建一个新的编组器开销太大。
你应该移动代码块
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
到OmegaXmlJaxb2Marshaller
的构造函数,并marshaller
实例变量。
然后,编组器将只创建一次,编写器应该快得多。