如何并行而不是按顺序执行多个查询



我正在查询我的所有10个表,从中获取用户id,并将所有用户id加载到HashSet中,这样我就可以拥有唯一的用户id。

截至目前,它是按顺序排列的。我们转到一个表,从中提取所有user_id,并将其加载到哈希集中,然后加载到第二个和第三个表中,然后继续。

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();
        // is there any way to make this parallel?
        for (int table = 0; table < 10; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";
            try {
                SimpleStatement query = new SimpleStatement(sql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                ResultSet res = session.execute(query);
                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();
                    String user_id = r.getString("user_id");
                    userList.add(user_id);
                }
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));
            }
        }
        return userList;
    }

有没有什么方法可以使它成为多线程的,这样对于每个表,它们都可以并行地从我的表中获取数据?最后,我需要userList哈希集,它应该具有所有10个表中的所有唯一用户id。

我正在使用Cassandra数据库,并且只建立一次连接,所以我不需要创建多个连接。

如果您能够使用Java 8,那么您可能可以对表列表使用parallelStream,并使用lambda将表名扩展到每个表的唯一ID的相应列表中,然后将结果连接到一个哈希中。

如果没有Java8,我会使用GoogleGuava的listenablefutures和类似于以下的执行器服务:

public static Set<String> fetchFromTable(int table) {
    String sql = "select * from testkeyspace.test_table_" + table + ";";
    Set<String> result = new HashSet<String>();
    // populate result with your SQL statements
    // ...
    return result;
}
public static Set<String> fetchFromAllTables() throws InterruptedException, ExecutionException {
    // Create a ListeningExecutorService (Guava) by wrapping a 
    // normal ExecutorService (Java) 
    ListeningExecutorService executor = 
            MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    List<ListenableFuture<Set<String>>> list = 
            new ArrayList<ListenableFuture<Set<String>>>(); 
    // For each table, create an independent thread that will 
    // query just that table and return a set of user IDs from it
    for (int i = 0; i < 10; i++) {
        final int table = i;
        ListenableFuture<Set<String>> future = executor.submit(new Callable<Set<String>>() {
            public Set<String> call() throws Exception {
                return fetchFromTable(table);
            }
        });
        // Add the future to the list
        list.add(future);
    }
    // We want to know when ALL the threads have completed, 
    // so we use a Guava function to turn a list of ListenableFutures
    // into a single ListenableFuture
    ListenableFuture<List<Set<String>>> combinedFutures = Futures.allAsList(list);
    // The get on the combined ListenableFuture will now block until 
    // ALL the individual threads have completed work.
    List<Set<String>> tableSets = combinedFutures.get();
    // Now all we have to do is combine the individual sets into a
    // single result
    Set<String> userList = new HashSet<String>();
    for (Set<String> tableSet: tableSets) {
        userList.addAll(tableSet);
    }
    return userList;
}

Executors和Futures的使用都是Java的核心。Guava做的唯一一件事就是让我把Futures变成ListableFutures。有关为什么后者更好的讨论,请参见此处。

可能还有一些方法可以提高这种方法的并行性,但如果您的大部分时间都花在等待DB响应或处理网络流量上,那么这种方法可能会有所帮助。

您可以使它成为多线程的,但由于线程创建和多个连接的开销,您可能不会获得显著的好处。相反,在mysql中使用UNION语句,并一次获取所有这些语句。让数据库引擎找出如何有效地获取所有这些信息:

String sql = "select user_id from testkeyspace.test_table_1 UNION select  user_id from testkeyspace.test_table_2 UNION select user_id from testkeyspace.test_table_3 ...."

当然,您必须以编程方式创建sql查询字符串。不要把"…"放在你的查询中。

相关内容

  • 没有找到相关文章

最新更新