如何以自定义方式从主题恢复全局存储



假设我在从主题获取数据后将数据存储在 Globalstore 中时正在进行一些自定义处理,即我正在从消息值创建自定义键.它会在本地删除状态后以相同的方式再次恢复 Globalstore。

override def process(key: String, value: String): Unit = {
logger.info("telephonyUsersProcessorCounter = "+telephonyUsersProcessorCounter)
telephonyUsersProcessorCounter  = telephonyUsersProcessorCounter +1
val telKey = processKey(key)
if (telKey.isDefined) {
  val telValue = processValue(value)
  if(telValue.isDefined ){
    StreamConstants.teleStore.get.put(telKey.get,telValue.get)
    val compositeKeyForNumber = telValue.get.enterpriseId + telValue.get.phoneNumber
    val compositeKeyForDeviceName =  telValue.get.enterpriseId +telValue.get.deviceName
    val compositeKeyForNumberAndDeviceName =  telValue.get.enterpriseId +telValue.get.phoneNumber+telValue.get.deviceName
    val telCompositeKeyForNumber =  StreamConstants.teleStore.get.get(compositeKeyForNumber)
    val telCompositeKeyForDeviceName =  StreamConstants.teleStore.get.get(compositeKeyForDeviceName)
    val telCompositeKeyForNumberAndDeviceName =  StreamConstants.teleStore.get.get(compositeKeyForNumberAndDeviceName)
    if(null !=telCompositeKeyForNumber ){
      if(telCompositeKeyForNumber.dateCreated.toLong < telValue.get.dateCreated.toLong){
        StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
      }
    }else {
      StreamConstants.teleStore.get.put(compositeKeyForNumber,telValue.get)
    }
    if(null != telCompositeKeyForDeviceName){
      if(telCompositeKeyForDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
        StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
      }
    }else {
      StreamConstants.teleStore.get.put(compositeKeyForDeviceName,telValue.get)
    }
    if(null != telCompositeKeyForNumberAndDeviceName){
      if(telCompositeKeyForNumberAndDeviceName.dateCreated.toLong < telValue.get.dateCreated.toLong){
        StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
      }
    }else {
      StreamConstants.teleStore.get.put(compositeKeyForNumberAndDeviceName,telValue.get)
    }
    context.forward(telKey.get, telValue.get.toJson.toString())
    context.forward(compositeKeyForNumber, telValue.get.toJson.toString())
    context.forward(compositeKeyForDeviceName, telValue.get.toJson.toString())
    context.forward(compositeKeyForNumberAndDeviceName, telValue.get.toJson.toString())
  }else {
    StreamConstants.teleStore.get.put(telKey.get,null)
    context.forward(telKey.get,null)
  }
}

}

使用

消息值中的数据创建自定义键,而不是使用主题中的直接键。假设我删除了我的本地全局存储。从紧凑主题恢复此商店时会发生什么?

在还原时,来自更改日志主题的数据将按原样放入全局存储中,跳过任何自定义处理器逻辑。这是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-4963

最新更新