使用Spring Batch将文件解密到流中,转换并加密为文件



Aim-解密.pgp加密的文件,将数据作为流读取,根据供应商要求执行转换,作为流加密并写入文件。

逻辑-自定义读写器和任务集,将解密/加密的数据存储到ExecutionContext中并传递到不同的步骤。

适用于-小文件(~1MB)

面临问题-尝试使用(约10MB-10K记录)-读取步骤成功,但当开始将数据作为加密文件写入时-内存问题-java.lang.OutOfMemoryError:java堆空间

代码片段-

<job id="testJob" xmlns="http://www.springframework.org/schema/batch">
    <!-- Read Encrypted file and Decrypt -->
    <batch:step id="decryptFile" next="readAndWriteData">
        <batch:tasklet ref="fileDecryptionTasklet">
            <batch:listeners>
                <batch:listener ref="decryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    <!-- Read data from decryption step and write to Stream -->
    <batch:step id="readAndWriteData" next="encryptFile">
        <batch:tasklet>
            <batch:chunk reader="hrdsCustomReader" processor="Processor"
                writer="CustomWriter" commit-interval="${.ftp.comit.interval}" />
            <batch:listeners>
                <batch:listener ref="encryptFileListener" />
            </batch:listeners>
        </batch:tasklet>
    </batch:step>
    <!-- Write to vendor specific file -->
    <batch:step id="encryptFile">
        <batch:tasklet ref="fileEncryptionTasklet" />
    </batch:step>
</job>

Tasklet和自定义编写器代码片段-


@Override
public String read() throws Exception, UnexpectedInputException,
        ParseException {
    decryptedData = (String) stepExecution.getJobExecution()
            .getExecutionContext().get("DecryptedData");
    if (decryptedData != null)
        //logger.info("decryptedData in Custom Reader - n" + decryptedData);
    stepExecution.getJobExecution().getExecutionContext()
            .put("DecryptedData", null);
    return decryptedData;
}
 
public void write(List items) throws Exception {
    logger.info("Begin writing data as an Encrypted File");
      Iterator itr = items.iterator();
      while(itr.hasNext()) {
             String element =  itr.next();
             lineBuffer.append(element+LINE_SEPARATOR);
          }
      ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("EncryptedData", lineBuffer);
}

public RepeatStatus execute(StepContribution step, ChunkContext chunk)
        throws Exception {
    InputStream inputstream = new FileInputStream(inputdirectory);
    Message encryptMessage = MessageBuilder
            .withPayload(inputstream)
            .setHeader(
                    FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                    "decryptAndVerify")
            .setHeader(
                    FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,
                    EncryptionUtil.DECRYPT_STREAM_OPERATION)
            .setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,
                    filename).build();
    InputStream inputStream = pgptransformer
            .doTransformStream(encryptMessage);
    String strData = IOUtils.toString(inputStream, "UTF-8");
    inputstream.close();

    chunk.getStepContext().getStepExecution().getExecutionContext().put("DecryptedData", strData);
    return null;
}
public RepeatStatus execute(StepContribution步骤,ChunkContext区块)throws异常{lineBuffer=(StringBuffer)chunk.getStepContext().getJobExecutionContext().get("加密数据");byte[]字节=lineBuffer.toString().getBytes();InputStream InputStream=新ByteArrayInputStream(字节);Message encryptMessage=消息生成器.带有效载荷(inputStream).setHeader(PGPFileTransformer.OUTPUT_FILE_FOLDER,输出目录).setHeader(FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,"签名和加密").setHeader(FileEncryptionTransformer.ENCRYPTION_OPERATION_HEADER,EncryptionUtil.ENCRYPT_STREAM_OPERATION).setHeader(FileEncryptionTransformer.SOURCE_FILE_NAME_HEADER,filename).build();pgptransformer.doTransform(encryptMessage);inputStream.close();chunk.getStepContext().getStepExecution().geExecutionContext().put("EncryptedData",null);返回null;}

如果有人能帮助解决问题,不胜感激。

能够解密并处理<2分钟。

逻辑-以块为单位进行处理-将200条记录分成一行。

张贴下面的代码-

批次配置-

<job id="aic-batch-xxx-ftp" xmlns="http://www.springframework.org/schema/batch">

    <!-- Read data , decrypt , process and write to encrypted file -->
    <batch:step id="readAndWriteData">
        <batch:tasklet>
            <batch:chunk reader="xxxCustomReader" processor="xxxFileProccessor"
                writer="xxxCustomWriter" commit-interval="${aic.batch.xxx.ftp.comit.interval}" />
        </batch:tasklet>
    </batch:step>

</job>

读卡器逻辑-

StringBuffer decryptedData = new StringBuffer();
    String strLine = "";
    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");
    long startTime = System.currentTimeMillis();
    // Read & decrypt File Line By Line
    if ((strLine = bufferedReader.readLine()) != null) {
        strLine = strLine.replace("NEW_LINE", "rn");
        decryptedData.append((pgp.decryptString(strLine, keyStore,
                "xxx")));
        long endTime = System.currentTimeMillis();
        logger.debug("Total time taken = " + (endTime - startTime) + " msec");
        return decryptedData;
    }
    else
        return null;

写入器逻辑-

public void write(List<? extends StringBuffer> items) throws Exception {
    logger.debug("Begin writing data as an Encrypted File");
    @SuppressWarnings("unchecked")
    Iterator<StringBuffer> itr = (Iterator<StringBuffer>) items.iterator();
    while (itr.hasNext()) {
        StringBuffer element = itr.next();
        encrypt(element);
        count++;
    }
}
public void encrypt(StringBuffer element) throws PGPException, IOException {
    PGPLib pgp = new PGPLib();
    KeyStore keyStore = new KeyStore("xxx.keystore", "xxx");
    String strLine = element.toString();
    StringBuffer buffer = new StringBuffer("");
    int i = 0;
    long startTime = System.currentTimeMillis();
    if (i % 200 == 0) {
        if (i != 0) {
            String encryptString = pgp.encryptString(buffer.toString(),
                    keyStore,
                    "xxx");
            encryptString = encryptString.replace("rn", "NEW_LINE");
            bufferedWriter.write(encryptString);
            bufferedWriter.newLine();
        }
        buffer = new StringBuffer(strLine);
    } else {
        buffer.append("rn").append(strLine);
    }
    i++;
    if (buffer != null && buffer.length() > 0) {
        String encryptString = pgp.encryptString(buffer.toString(),
                keyStore, "xxx");
        encryptString = encryptString.replace("rn", "NEW_LINE");
        bufferedWriter.write(encryptString);
        bufferedWriter.newLine();
    }
    long endTime = System.currentTimeMillis();
    logger.debug("Total time taken = " + (endTime - startTime) + " msec");
}

最新更新