如何使用 jedis 客户端在 java 中侦听 Redis 集群的密钥空间通知



我正在设置一个服务器,它可以监听和发送有关 redis 数据库中发生的任何事件的消息。我已成功收到有关 redis 主机和端口的新事件的通知,但无法为 redis 集群这样做。

GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(30);
config.setMaxWaitMillis(2000);
Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>();
jedisClusterNode.add(new HostAndPort("127.0.0.1", 7001));
JedisCluster cluster1 = new JedisCluster(jedisClusterNode, config);
String redisProperties = cluster1.getClusterNodes().toString().replaceAll("[{}]", "");
Set<HostAndPort> nodes = new HashSet<>();
String[] mainArray = redisProperties.split(",");
for (int i = 0; i < mainArray.length; i++) {
    String[] equalArray = mainArray[i].split("=");
    String mainData = equalArray[0];
    String[] ipPortPair = mainData.split(":");
    nodes.add(new HostAndPort(ipPortPair[0].trim(), Integer.valueOf(ipPortPair[1].trim())));
}
JedisCluster cluster = new JedisCluster(nodes, 10000, 1000, 1, config);
jedis.configSet("notify-keyspace-events", "AKE"); // For all kind of events
jedis.psubscribe(new KeyListenerCluster(), "__keyevent@0__:*");

我能够在使用 redis 集群时执行所有其他操作,但不能做一件事。

cluster.configSet("notify-keyspace-events", "AKE"); // For all kind of events
cluster.psubscribe(new KeyListenerCluster(), "__keyevent@0__:*");

我遇到了同样的问题,键空间通知间歇性地工作(6 次中有 7 次工作 10 次(。我读到需要订阅所有主节点才能获得通知。如果有帮助,我已经在下面粘贴了我的配置文件:

KeySpaceNotificationMessageListener keySpaceNotificationMessageListener;
    @Value("${spring.redis.cluster.nodes}")
    private String hostsAndPorts;
    @Bean
    JedisConnectionFactory jedisConnectionFactory(RedisClusterConfiguration redisClusterConfiguration)
    {
        return new JedisConnectionFactory(redisClusterConfiguration);
    }
    @Bean
    MessageListenerAdapter messageListener() {
        return new MessageListenerAdapter(keySpaceNotificationMessageListener);
    }

    @Bean(name = "cacheManager1")
    @Primary
    public RedisCacheManager redisCacheManager1()
    {
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .disableCachingNullValues()
                .entryTtl(Duration.ofMinutes(1))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.json()));
        redisCacheConfiguration.usePrefix();
        return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()))
                .cacheDefaults(redisCacheConfiguration).build();
    }
    @Bean(name = "cacheManager2")
    public RedisCacheManager redisCacheManager2( JedisConnectionFactory jedisConnectionFactory)
    {
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .disableCachingNullValues()
                .entryTtl(Duration.ofDays(1))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(RedisSerializer.json()));
        redisCacheConfiguration.usePrefix();
        return RedisCacheManager.RedisCacheManagerBuilder.fromConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()))
                .cacheDefaults(redisCacheConfiguration).build();
    }
    @Bean
    public RedisClusterConfiguration redisClusterConfiguration()
    {
        String [] redisHostAndPorts = hostsAndPorts.split(",");
        System.out.println(Arrays.toString(redisHostAndPorts));
        RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(Arrays.asList(redisHostAndPorts));
        return redisClusterConfiguration;
    }
    @Bean
    RedisMessageListenerContainer redisContainer() {
        final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(jedisConnectionFactory(redisClusterConfiguration()));
        container.addMessageListener(messageListener(), new PatternTopic("__keyspace@*:*"));
        container.setTaskExecutor(Executors.newFixedThreadPool(4));
        return container;
    }
    public void setKeySpaceNotificationMessageListener(KeySpaceNotificationMessageListener keySpaceNotificationMessageListener)
    {
        this.keySpaceNotificationMessageListener = keySpaceNotificationMessageListener;
    }

最新更新