在线程中遍历resultset会导致异常.使用JDBC连接到Hive的查询集合



对Hive及其特性进行评估。有一个用例,我需要在一个单独的线程中迭代结果集。我可以有许多resultset并生成一个线程来处理它们中的每一个。下面是我为这个用例编写的代码:

public class ConcurrentRSIteration2 {
private static String[] tableNames = 
    { 
        "random_data1",
        "random_data2",
        "random_data3",
        "random_data4"
    };
public static void main(String args[]) throws Exception {
    String driverName = "org.apache.hive.jdbc.HiveDriver";
    Class.forName(driverName);
    Connection con = DriverManager.getConnection(
            "jdbc:hive2://127.0.0.1:10000/default", "hive", "");
    int length = tableNames.length;
    StringBuilder[] sql = new StringBuilder[length];
    PreparedStatement[] stmt = new PreparedStatement[length];
    Thread[] rsIterators = new Thread[length];
    for (int i = 0; i < length; i++) {
        sql[i] = new StringBuilder().
                    append("select * from ").
                    append(tableNames[i]);
        stmt[i] = con.prepareStatement(sql[i].toString());
        RSIterator2 rsIterator = new RSIterator2(stmt[i].executeQuery());
        rsIterators[i] = new Thread(rsIterator);
    }
    for (int i = 0; i < length; i++) {
        rsIterators[i].start();
    }
}
}
class RSIterator2 implements Runnable {
private ResultSet rs;
public RSIterator2(ResultSet rs) {
    this.rs = rs;
}
@Override
public void run() {
    try {
        System.out.println(this.hashCode() + " : " + rs);
        System.out.println(this.hashCode() + " : RS iteration started.");
        int i = 0;
        while (rs.next()) {
            i++;
        }
        System.out.println(this.hashCode() + " : RS iteration done.");
    } catch (SQLException e) {
        e.printStackTrace();
    } finally {
        try {
            rs.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}
}

下面是异常的堆栈跟踪。

org.apache.thrift.transport.TTransportException
    at org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
    at org.apache.thrift.transport.TSaslTransport.readLength(TSaslTransport.java:376)
    at org.apache.thrift.transport.TSaslTransport.readFrame(TSaslTransport.java:453)
    at org.apache.thrift.transport.TSaslTransport.read(TSaslTransport.java:435)
    at org.apache.thrift.transport.TSaslClientTransport.read(TSaslClientTransport.java:37)
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
    at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
    at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:69)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.recv_FetchResults(TCLIService.java:501)
    at org.apache.hive.service.cli.thrift.TCLIService$Client.FetchResults(TCLIService.java:488)
    at org.apache.hive.jdbc.HiveQueryResultSet.next(HiveQueryResultSet.java:360)
    at hivetrial.RSIterator2.run(ConcurrentRSIteration2.java:60)
    at java.lang.Thread.run(Unknown Source)`

我是Hive的新手,可能忽略了一些事情。

你的整个方法都是建立在一个谬误之上的。您正在使用单个连接来执行多个查询。因此,数据库服务器将按照执行查询的顺序对返回的所有数据进行排序。使用多个线程来处理单个流是没有意义的。

您也永远不会关闭语句或连接。

java.sql包中的类不是线程安全的。

ResultSet与其同伴Statement分离是一个坏主意。您应该查询,将行加载到对象或数据结构中,然后在finally块中以单独的try/catch块关闭它们。

如果我没有指出连接池,那就是疏忽了。为什么要限制自己只做一件事呢?

最新更新