使用 Postgres 实现 Spring + Apache Flink 项目



我有一个使用apache flink处理数据流信号的SpringBoot gradle项目。当一个新信号通过数据流时,我想查询查找(即findById(((它的详细信息,使用已经创建的postgres数据库表中的ID来获取有关信号的其他信息并丰富数据。我想避免使用 spring 依赖项来执行查找(即 Autowire 存储库(,并希望坚持使用 flink 实现进行查找。

我在哪里可以指定如何添加 postgres 连接配置信息,例如端口、数据库、URL、用户名、密码等......(为简单起见,可以假设 Postgres DB 在我的机器中是本地的(。它是否像将配置添加到应用程序属性文件一样简单?如果是这样,我如何编写查询方法以在按非主键值搜索时查找 Postgres 表中的记录?

一些在线资源建议使用此骨架代码,但我不确定它如何/id适合我的用例。(我创建了一个事件实体模型,其中包含我正在查找的表中的所有参数/列(。

like so

public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {
// Declare DB connection & query statements
public void open(Configuration parameters) throws Exception {
//Initialize DB connection
//prepare query statements
}
@Override
public void flatMap(String value, Collector<EventEntity> out) throws Exception {
}
}

您的示例代码是正确的。您可以在open()方法中为 PostgreSQL 设置所有自定义初始化和准备代码。然后,您可以在flatMap()函数中使用预配置的字段。

下面是 Redis 操作的一个示例

  • 我在这里使用了RichAsyncFunction,我建议你按照建议的最佳实践做同样的事情。阅读此处了解更多信息: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html(
  • 可以在构造函数方法中传递配置参数,并在初始化过程中使用它

    public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {
    private JedisPool jedisPool;
    private Configuration redisConf;
    public AsyncRedisOperations(Configuration redisConf) {
    this.redisConf = redisConf;
    } 
    @Override
    public void open(Configuration parameters) {
    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
    jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
    jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));
    JedisPool jedisPool = new JedisPool(jedisPoolConfig,
    this.redisConf.getString("host", "192.168.10.10"),
    this.redisConf.getInteger("port", 6379), 5000);
    try {
    this.jedisPool = jedisPool;
    this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
    } catch (Exception e) {
    this.logger.error(BaseUtil.append("Exception while connecting Redis"));
    }
    }
    @Override
    public void asyncInvoke(Object in, ResultFuture<Object> out) {
    try (Jedis jedis = this.jedisPool.getResource()) {
    String key = jedis.get(key);
    this.logger.info("Redis Key: " + key);
    } 
    }
    }      
    

相关内容

  • 没有找到相关文章

最新更新