我有一个使用apache flink处理数据流信号的SpringBoot gradle项目。当一个新信号通过数据流时,我想查询查找(即findById(((它的详细信息,使用已经创建的postgres数据库表中的ID来获取有关信号的其他信息并丰富数据。我想避免使用 spring 依赖项来执行查找(即 Autowire 存储库(,并希望坚持使用 flink 实现进行查找。
我在哪里可以指定如何添加 postgres 连接配置信息,例如端口、数据库、URL、用户名、密码等......(为简单起见,可以假设 Postgres DB 在我的机器中是本地的(。它是否像将配置添加到应用程序属性文件一样简单?如果是这样,我如何编写查询方法以在按非主键值搜索时查找 Postgres 表中的记录?
一些在线资源建议使用此骨架代码,但我不确定它如何/id适合我的用例。(我创建了一个事件实体模型,其中包含我正在查找的表中的所有参数/列(。
like so
public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {
// Declare DB connection & query statements
public void open(Configuration parameters) throws Exception {
//Initialize DB connection
//prepare query statements
}
@Override
public void flatMap(String value, Collector<EventEntity> out) throws Exception {
}
}
您的示例代码是正确的。您可以在open()
方法中为 PostgreSQL 设置所有自定义初始化和准备代码。然后,您可以在flatMap()
函数中使用预配置的字段。
下面是 Redis 操作的一个示例
- 我在这里使用了RichAsyncFunction,我建议你按照建议的最佳实践做同样的事情。阅读此处了解更多信息: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html(
-
可以在构造函数方法中传递配置参数,并在初始化过程中使用它
public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> { private JedisPool jedisPool; private Configuration redisConf; public AsyncRedisOperations(Configuration redisConf) { this.redisConf = redisConf; } @Override public void open(Configuration parameters) { JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8)); jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8)); jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0)); JedisPool jedisPool = new JedisPool(jedisPoolConfig, this.redisConf.getString("host", "192.168.10.10"), this.redisConf.getInteger("port", 6379), 5000); try { this.jedisPool = jedisPool; this.logger.info("Redis connected: " + jedisPool.getResource().isConnected()); } catch (Exception e) { this.logger.error(BaseUtil.append("Exception while connecting Redis")); } } @Override public void asyncInvoke(Object in, ResultFuture<Object> out) { try (Jedis jedis = this.jedisPool.getResource()) { String key = jedis.get(key); this.logger.info("Redis Key: " + key); } } }