无法通过Scala中的[customKey,customValue]缓存执行IGNITE SQL查询



我正在尝试使用Apache Ignite设置分布式缓存。设置高速缓存后,我能够将项目放在知道密钥的情况下,但是任何类型的SQL查询始终返回带有Null Iterator的光标。

这是我设置高速缓存的方式(请注意,这是在点火之前完成的(:

def setupTelemetryCache(): CacheConfiguration[TelemetryKey, TelemetryValue] = {
val dataRegionName = "persistent-region"
val cacheName = "telemetry-cache"
// This object is required to perform SQL queries over custom key object
val queryEntity = new QueryEntity("TelemetryKey", "TelemetryValue")
val fields: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]
fields.put("deviceId", classOf[String].getName)
fields.put("metricName", classOf[String].getName)
fields.put("timestamp", classOf[String].getName)
queryEntity.setFields(fields)
val keyFields: util.HashSet[String] = new util.HashSet[String]()
keyFields.add("deviceId")
keyFields.add("metricName")
keyFields.add("timestamp")
queryEntity.setKeyFields(keyFields)
queryEntity.setIndexes(Collections.emptyList[QueryIndex]())
new CacheConfiguration()
  .setName(cacheName)
  .setDataRegionName(dataRegionName)
  .setCacheMode(CacheMode.PARTITIONED) // Data is split among nodes
  .setBackups(1) // each partition has 1 backup
  .setIndexedTypes(classOf[String], classOf[TelemetryKey])  // Index by ID
  .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC) // Faster, clients do not wait for cache
  // synchronization. Consistency issues?
  .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) // Allows transactional query
  .setQueryEntities(Collections.singletonList(queryEntity)) 
}

,这些是我的telemetrykey的代码:

case class TelemetryKey private (
                              @(AffinityKeyMapped @field)
                              @(QuerySqlField@field)(index = true)
                              deviceId: String,
                              @(QuerySqlField@field)(index = false)
                              metricName: String,
                              @(QuerySqlField@field)(index = true)
                              timestamp: String) extends Serializable 

和遥控值:

class TelemetryValue private(valueType: ValueTypes.Value, doubleValue: Option[Double],
                             stringValue: Option[String],
                             longValue: Option[Long]) extends Serializable

我必须实现的示例SQL查询可以是"从缓存中进行选择 * where deviceId ='dev1234'",我希望同一deviceId

这是我执行查询的方式:

 private def sqlQuery(query: SqlQuery[TelemetryKey, TelemetryValue]):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    cache.query(query)
  }
  def getEntries(ofDeviceId: String):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryKey], "deviceId = ?")
    sqlQuery(q.setArgs(ofDeviceId))
  }

即使更改查询的主体,我会收到一个空的光标对象。我什至无法执行"选择 *"查询。

感谢您的帮助

有两种配置索引和可查询字段的方法。

  1. 基于注释的配置
    您的密钥和价值类需要注释@QuerySqlField如下。

    case class TelemetryKey private (
    @(AffinityKeyMapped @field)
    @(QuerySqlField@field)(index = true)
    deviceId: String,
    @(QuerySqlField@field)(index = false)
    metricName: String,
    @(QuerySqlField@field)(index = true)
    timestamp: String) extends Serializable

定义了索引和可查询字段后,必须在SQL引擎中注册它们以及它们所属的对象类型。


    new CacheConfiguration()
    .setName(cacheName)
    .setDataRegionName(dataRegionName)
    .setCacheMode(CacheMode.PARTITIONED)
    .setBackups(1)
    .setIndexedTypes(classOf[TelemetryKey], classOf[TelemetryValue])
    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)

upd:还应该修复的另一件事是您的SqlQuery

  def getEntries(ofDeviceId: String):
  QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
    val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryValue], "deviceId = ?")
    sqlQuery(q.setArgs(ofDeviceId))
  }
  1. 基于查询的方法
val queryEntity = new QueryEntity(classOf[TelemetryKey], classOf[TelemetryValue]);

    new CacheConfiguration()
    .setName(cacheName)
    .setDataRegionName(dataRegionName)
    .setCacheMode(CacheMode.PARTITIONED)
    .setBackups(1)
    .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
    .setQueryEntities(Collections.singletonList(queryEntity))

长话短说,您应该向查询提供完整的JVM班级名称。

如:

val queryEntity = new QueryEntity("com.pany.telemetry.TelemetryKey",
    "com.pany.telemetry.TelemetryValue") // or e.g. TelemetryKey.class.getName()

IGNITE需要这些以区分可以存储在一个缓存中的各种类型,它不是装饰性的 - 必须确切匹配。

更好吗?使用setIndexedTypes()代替setQueryEntities()。它允许您通过类而不是字符串,它将扫描注释,您已经拥有。

最新更新