如何用幻影dsl增加Cassandra Counter Column



有在phantom dsl中实现计数器操作的例子吗?

已检查:

http://outworkers.com/blog/post/a-series-on-cassandra-part-3-advanced-features

https://github.com/outworkers/phantom/wiki/Counter-columns

https://github.com/outworkers/phantom/blob/develop/phantom-dsl/src/test/scala/com/websudos/phantom/tables/CounterTableTest.scala

有点像在寻找这个信息的虚幻dsl版本:

https://github.com/Netflix/astyanax/wiki/Working-with-counter-columns


以下是部分实现。它提出了两个问题:

  1. 我不知道如何从应用程序中获取值,并在计数器表中的计数器列中实现递增计数器操作。

  2. 如何更新与同一条目相关的表中的行,其中表具有不同数量的行和键。

在thiagos的例子中,两个表歌曲'&'songs_by_artist’都有相同的行,但有不同的分区(主键/集群列)

我不确定在phantom dsl中如何更新与相同条目相关的行,例如使用"records"&下面的"record_transaction_counts"表。

例如

RecordTransactionCounts.{hash, time} relates to Records.{hash, time}


case class Record(hash: String,
                 size: Int,
                 time: Long,
                 difficulty: Float)

sealed class RecordsModel extends CassandraTable[RecordsModel, Record] {
  override def fromRow(row: Row): Record = {
    Record(
      hash(row),
      size(row),
      time(row),
      difficulty(row)
    )
  }
  object hash extends StringColumn(this) with PartitionKey[String]
  object size extends IntColumn(this)
  object time extends LongColumn(this)
  object difficulty extends FloatColumn(this)
}
abstract class ConcreteRecordsModel extends RecordsModel with RootConnector {
  override val tableName = "records"
  def insertNew(block: Record): Future[ResultSet] = insertNewRecord(block).future()
  def insertNewRecord(r: Record) = {
    insert
      .value(_.hash, r.hash)
      .value(_.size, r.size)
      .value(_.time, r.time)
      .value(_.difficulty, r.difficulty)
  }
}
case class RecordTransactionCounts(hash: String, time: Long, num_transactions: Long )
class RecordTransactionCountsModel extends CassandraTable[RecordTransactionCountsModel, RecordTransactionCounts] {
  override def tableName: String = "record_transaction_counts"
  object hash extends StringColumn(this) with PartitionKey[String]
  object time extends LongColumn(this) with ClusteringOrder[Long]
  object num_transactions extends CounterColumn(this)
  override def fromRow(r: Row): RecordTransactionCounts = {
    RecordTransactionCounts(
      hash(r),
      time(r),
      num_transactions(r)
    )
  }
}
abstract class ConcreteRecordTransactionCountsModel extends TransactionCountsModel with RootConnector {
  def createTable(): Future[ResultSet] = {
    create.ifNotExists().future()
  }
  def store(count: RecordTransactionCounts): Future[ResultSet] = {
    insert
      .value(_.hash, count.hash)
      .value(_.time, count.time)
      .value(_.num_transactions, count.num_transactions)
      .future()
  }
  def getCount(hash: String): Future[Option[Long]] = {
    select(_.count).where(_.hash eqs hash).one()
  }
}
class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {
  def insertRecordTransactionCounts(tc: RecordTransactionCounts) = {
    Batch.logged
      .add(ChainDatabase.tc.store(tc))
      .future()
  }
  object tc extends ConcreteRecordTransactionCountsModel with keyspace.Connector
}
object ChainDatabase extends Database(Config.keySpaceDefinition)

正如Thiago所建议的,您可以使用+=-=运算符来递减计数器的值。您也可以分别使用incrementdecrement方法来实现相同的目的。

def increment(count: RecordTransactionCounts): Future[ResultSet] = {
  update
    .where(_.hash eqs count.hash)
    .and(_.time eqs count.time)
    .modify(_.num_transactions += count.num_transactions)
    .future()
}
// or
def increment(count: RecordTransactionCounts): Future[ResultSet] = {
  update
    .where(_.hash eqs count.hash)
    .and(_.time eqs count.time)
    .modify(_.num_transactions increment count.num_transactions)
    .future()
}

要递减,只需将行替换为:

    ...
    .modify(_.num_transactions -= count.num_transactions)
    // or
    .modify(_.num_transactions decrement count.num_transactions)

在你过于依赖计数器之前,你还应该用谷歌搜索一下,看看其他人遇到了什么问题。

为了在phantom dsl中使用CounterColumn,您必须使用以下模式来递增它:

.modify(_.myCounterColumn += 1) //or whatever value you want to increment

在你的ConcreteRecordTransactionCountsModel中,你可以改变你的商店,以这样一种适当的方式增加计数器:

def increment(count: RecordTransactionCounts): Future[ResultSet] = {
  update
    .where(_.hash eqs count.hash)
    .and(_.time eqs count.time)
    .modify(_.num_transactions += count.num_transactions)
    .future()
}

我将尝试用我以前工作过的更多例子来更新我的github。如果你有任何建议,请打开一张票,我会这样做的。

最新更新