Spring Batch ItemReader使用PostgreSQL函数游标错误



我需要一个Spring Batch ItemReader,使用PostgreSQL函数作为数据源,但我没有成功(PostgreSQL数据库在Greenplum设备中)。我创建了一个ItemReader调用PostgreSQL函数的例子,该函数返回一个游标,但ItemReader失败,声明游标不存在。以下是我设置的所有组件:

-数据库表和数据:

CREATE TABLE test_user
(
    test_user_sys_id numeric NOT NULL,
    ssn character varying(9) NOT NULL,
    create_user_id character varying(30) NOT NULL,
    create_ts timestamp(6) without time zone NOT NULL DEFAULT now()
)
WITH (
OIDS=FALSE
)
DISTRIBUTED BY (ssn);
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (1,'111111111','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (2,'222222222','DataAdmin');
INSERT INTO test_user (test_user_sys_id, ssn, create_user_id)  VALUES (3,'333333333','DataAdmin');

-数据库功能:

CREATE or REPLACE FUNCTION get_user_func_no_arg(
    p_id_min NUMERIC        -- Minimum ID value
)
RETURNS REFCURSOR 
AS
$BODY$
DECLARE
    resultCursor REFCURSOR := 'resultCursor';
BEGIN
    OPEN resultCursor FOR
    SELECT test_user_sys_id, ssn
    FROM test_user
    WHERE test_user_sys_id > p_id_min;
    RETURN resultCursor;
END;
$BODY$
LANGUAGE plpgsql VOLATILE;

-localLaunchContext.xml(环境特定信息):

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="
               http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd">
    <!-- Import the Spring Batch config files -->
    <import resource="springBatchConfig.xml" />
    <!-- Define the PostgreSQL source -->
    <bean id="postgresql_dataSource"
        class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="org.postgresql.Driver"/>
        <property name="url" value="THE URL"/>
        <property name="username" value="THE USER"/>
        <property name="password" value="THE PASSWORD"/>
    </bean>
    <!-- Define a resourceless transaction manager for the in-memory job repository -->
    <bean id="repositoryTransactionManager"
    class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
    <!-- Define a transaction manager for PostgreSQL to hopefully make the cursor work -->
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="postgresql_dataSource"/> 
    </bean>
    <!-- Define in-memory job repository  -->
    <bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
        <property name="transactionManager" ref="repositoryTransactionManager"/>
    </bean>
    <!-- Define the synchronous job launcher -->
    <bean id="syncJobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
    </bean>
</beans>

-springBatchConfig.xml(作业信息):

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:batch="http://www.springframework.org/schema/batch" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
                    http://www.springframework.org/schema/batch 
                    http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
                    http://www.springframework.org/schema/util 
                    http://www.springframework.org/schema/util/spring-util-3.0.xsd">
    <!-- ================================================== -->
    <!-- Components for TestUser Job           -->
    <!-- ================================================== -->
    <!-- Test User Stored Procedure ItemReader -->
    <bean id="testUserItemReader"
        class="org.springframework.batch.item.database.StoredProcedureItemReader"
        scope="step">
        <property name="dataSource" ref="postgresql_dataSource" />
        <property name="procedureName" value="get_user_func_no_arg" />
        <property name="parameters">
            <list>
                <bean class="org.springframework.jdbc.core.SqlParameter">
                    <constructor-arg index="0" value="p_id_min " />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.NUMERIC" />
                    </constructor-arg>
                </bean>
                <bean class="org.springframework.jdbc.core.SqlOutParameter">
                    <constructor-arg index="0" value="resultCursor" />
                    <constructor-arg index="1">
                        <util:constant static-field="java.sql.Types.OTHER" />
                    </constructor-arg>
                </bean>
            </list>
        </property>
        <property name="refCursorPosition" value="2" />
        <property name="rowMapper">
            <bean class="dao.mapper.TestUserRowMapper" />
        </property>
        <property name="preparedStatementSetter" ref="preparedStatementSetter" />
    </bean>
    <!-- ItemProcessor is not needed, since the stored procedure provides the results -->
    <!-- TestUser ItemWriter -->
    <bean id="testUserItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
        <property name="resource" value="file:output.txt" />
        <property name="lineAggregator">
            <bean class="org.springframework.batch.item.file.transform.PassThroughLineAggregator"/>
        </property>
    </bean>
    <!-- TestUser Job definition -->
    <batch:job id="TestUserJob" incrementer="jobParametersIncrementer">
        <batch:step id="TestUser_step1">
            <batch:tasklet>
            <batch:transaction-attributes isolation="READ_COMMITTED" propagation="MANDATORY" timeout="200"/>
                <batch:chunk reader="testUserItemReader"
                    writer="testUserItemWriter" commit-interval="2" />
            </batch:tasklet>
        </batch:step>
    </batch:job>
    <!-- ================================================== -->
    <!-- Common Beans that are used in multiple scenarios -->
    <!-- ================================================== -->
    <!-- Increments the Job ID -->
    <bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
    <!-- Prepared statement setter to provide minimum user ID input to the PostgreSQl function. -->
    <bean id="preparedStatementSetter" class="dao.setter.TestUserPreparedStatementSetter"
        scope="step">
        <property name="minId">
            <value>#{jobParameters['minId']}</value>
        </property>
    </bean>
</beans>

-TestUser.java(ItemReader数据对象):

package domain;
// Data object to support a user
public class TestUser {
    private int id;
    private String ssn;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getSsn() {
        return ssn;
    }
    public void setSsn(String ssn) {
        this.ssn = ssn;
    }
    @Override
    public String toString() {
        return "TestUser [id=" + id + ", ssn=" + ssn + "]";
    }
}

-TestUserPreparedStatementSetter.java(设置PostgreSQL函数的输入参数):

package dao.setter;
import java.sql.CallableStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import org.springframework.jdbc.core.PreparedStatementSetter;
// Spring Batch PreparedStatementSetter to set parameters for stored procedures
public class TestUserPreparedStatementSetter implements PreparedStatementSetter {
    private int minId;
    // Setters
    public void setMinId(int minId) {
        this.minId = minId;
    }
    // Set the parameters for the stored procedure
    public void setValues(PreparedStatement ps) throws SQLException {
        CallableStatement cs = (CallableStatement) ps;
        cs.setInt(1, this.minId);
        cs.registerOutParameter(2, java.sql.Types.OTHER);
    }
}

-TestUserRowMapper.java(将ItemReader光标行映射到TestUser数据对象):

package dao.mapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import domain.TestUser;
//Spring RowMapper to support stored procedure results
public class TestUserRowMapper  implements RowMapper<TestUser>{
    // Map the results to the DAO
    public TestUser mapRow(ResultSet resultSet, int rowNum) throws SQLException
    {
        TestUser resultData = new TestUser();
        resultData.setId(resultSet.getInt(1));
        resultData.setSsn(resultSet.getString(2));
        return resultData;
    }
}

-调用命令:

java -classpath ".;lib*;bin" org.springframework.batch.core.launch.support.CommandLineJobRunner localLaunchContext.xml TestUserJob minId=1

当我运行这个时,我得到以下错误:

Caused by: org.postgresql.util.PSQLException: ERROR: cursor "resultCursor" does not exist
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2198)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1927)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:255)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:561)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:405)
    at org.postgresql.jdbc2.AbstractJdbc2Connection.execSQLQuery(AbstractJdbc2Connection.java:363)
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.internalGetObject(AbstractJdbc2ResultSet.java:207)
    at org.postgresql.jdbc3.AbstractJdbc3ResultSet.internalGetObject(AbstractJdbc3ResultSet.java:36)
    at org.postgresql.jdbc4.AbstractJdbc4ResultSet.internalGetObject(AbstractJdbc4ResultSet.java:300)
    at org.postgresql.jdbc2.AbstractJdbc2ResultSet.getObject(AbstractJdbc2ResultSet.java:2704)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:456)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:412)
    at org.springframework.batch.item.database.StoredProcedureItemReader.openCursor(StoredProcedureItemReader.java:205)
    ... 29 more

我反复尝试将REFCURSOR指定为函数的输出参数和许多其他内容,但都没有成功。使用JDK 1.7、PostgreSQL JDBC驱动程序JAR PostgreSQL-9.3-1102.jdbc4和PostgreSQL 8.2.15(在Greenplum 4.2.8.1 build 2下)会出现这种情况。

欢迎提出任何建议。提前谢谢。

经过大量调查,我找到了修复方法-将数据源更改为支持禁用自动提交的数据源:

<!-- Define the PostgreSQL source -->
<bean id="postgresql_dataSource"
    class="org.apache.commons.dbcp.BasicDataSource">
    <property name="driverClassName" value="org.postgresql.Driver"/>
    <property name="url" value="THE URL"/>
    <property name="username" value="THE USER"/>
    <property name="password" value="THE PASSWORD"/>
    <property name="defaultAutoCommit" value="false"/>
</bean>

最新更新