我正在尝试使用Apache Ignite设置分布式缓存。设置高速缓存后,我能够将项目放在知道密钥的情况下,但是任何类型的SQL查询始终返回带有Null Iterator的光标。
这是我设置高速缓存的方式(请注意,这是在点火之前完成的(:
def setupTelemetryCache(): CacheConfiguration[TelemetryKey, TelemetryValue] = {
val dataRegionName = "persistent-region"
val cacheName = "telemetry-cache"
// This object is required to perform SQL queries over custom key object
val queryEntity = new QueryEntity("TelemetryKey", "TelemetryValue")
val fields: util.LinkedHashMap[String, String] = new util.LinkedHashMap[String, String]
fields.put("deviceId", classOf[String].getName)
fields.put("metricName", classOf[String].getName)
fields.put("timestamp", classOf[String].getName)
queryEntity.setFields(fields)
val keyFields: util.HashSet[String] = new util.HashSet[String]()
keyFields.add("deviceId")
keyFields.add("metricName")
keyFields.add("timestamp")
queryEntity.setKeyFields(keyFields)
queryEntity.setIndexes(Collections.emptyList[QueryIndex]())
new CacheConfiguration()
.setName(cacheName)
.setDataRegionName(dataRegionName)
.setCacheMode(CacheMode.PARTITIONED) // Data is split among nodes
.setBackups(1) // each partition has 1 backup
.setIndexedTypes(classOf[String], classOf[TelemetryKey]) // Index by ID
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC) // Faster, clients do not wait for cache
// synchronization. Consistency issues?
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) // Allows transactional query
.setQueryEntities(Collections.singletonList(queryEntity))
}
,这些是我的telemetrykey的代码:
case class TelemetryKey private (
@(AffinityKeyMapped @field)
@(QuerySqlField@field)(index = true)
deviceId: String,
@(QuerySqlField@field)(index = false)
metricName: String,
@(QuerySqlField@field)(index = true)
timestamp: String) extends Serializable
和遥控值:
class TelemetryValue private(valueType: ValueTypes.Value, doubleValue: Option[Double],
stringValue: Option[String],
longValue: Option[Long]) extends Serializable
我必须实现的示例SQL查询可以是"从缓存中进行选择 * where deviceId ='dev1234'",我希望同一deviceId
这是我执行查询的方式:
private def sqlQuery(query: SqlQuery[TelemetryKey, TelemetryValue]):
QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
cache.query(query)
}
def getEntries(ofDeviceId: String):
QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryKey], "deviceId = ?")
sqlQuery(q.setArgs(ofDeviceId))
}
即使更改查询的主体,我会收到一个空的光标对象。我什至无法执行"选择 *"查询。
感谢您的帮助
有两种配置索引和可查询字段的方法。
- 基于注释的配置
您的密钥和价值类需要注释@QuerySqlField
如下。
case class TelemetryKey private (
@(AffinityKeyMapped @field)
@(QuerySqlField@field)(index = true)
deviceId: String,
@(QuerySqlField@field)(index = false)
metricName: String,
@(QuerySqlField@field)(index = true)
timestamp: String) extends Serializable
定义了索引和可查询字段后,必须在SQL引擎中注册它们以及它们所属的对象类型。
new CacheConfiguration()
.setName(cacheName)
.setDataRegionName(dataRegionName)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1)
.setIndexedTypes(classOf[TelemetryKey], classOf[TelemetryValue])
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
upd:还应该修复的另一件事是您的SqlQuery
def getEntries(ofDeviceId: String):
QueryCursor[Cache.Entry[TelemetryKey, TelemetryValue]] = {
val q = new SqlQuery[TelemetryKey, TelemetryValue](classOf[TelemetryValue], "deviceId = ?")
sqlQuery(q.setArgs(ofDeviceId))
}
- 基于查询的方法
val queryEntity = new QueryEntity(classOf[TelemetryKey], classOf[TelemetryValue]);
new CacheConfiguration()
.setName(cacheName)
.setDataRegionName(dataRegionName)
.setCacheMode(CacheMode.PARTITIONED)
.setBackups(1)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_ASYNC)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setQueryEntities(Collections.singletonList(queryEntity))
长话短说,您应该向查询提供完整的JVM班级名称。
如:
val queryEntity = new QueryEntity("com.pany.telemetry.TelemetryKey",
"com.pany.telemetry.TelemetryValue") // or e.g. TelemetryKey.class.getName()
IGNITE需要这些以区分可以存储在一个缓存中的各种类型,它不是装饰性的 - 必须确切匹配。
更好吗?使用setIndexedTypes()
代替setQueryEntities()
。它允许您通过类而不是字符串,它将扫描注释,您已经拥有。