如何使用 Astyanax 客户端将 Cassandra 插入到复合列中



我正在尝试使用客户端插入 Cassandra 中的Composite Columns Astyanax。下面是我创建的列系列,其中所有列都Composite Columns。我不确定我应该如何插入下面的列系列,每列都是一个Composite Column.

create column family USER_DATA
with key_validation_class = 'UTF8Type'
and comparator = 'CompositeType(UTF8Type,UTF8Type,DateType)'
and default_validation_class = 'UTF8Type'
and gc_grace = 86400
and column_metadata = [ {column_name : 'lmd', validation_class : DateType}];

下面是我创建的一个简单的示例,它将插入到Cassandra中,但它不适用于Composite ColumnsattributesMap是包含我尝试插入的数据的地图,key将被column name,并且actual valuecomposite value该列。

String s1 = "Hello";
String S2 = "World";
long s3 = System.currentTimeMillis();
// insert the above three values into a composite column
clientDao.upsertAttributes("123", attributesMap, columnFamily);

下面是我的DAOImpl code,它有upsertAttributes method,我用它来插入 Cassandra 中,仅用于单列值。有什么方法,我可以修改相同的内容以开始使用 Atyanax 的复合列行为?

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String rowKey, final Map<String, String> attributes, final String columnFamily) {
    try {
        MutationBatch m = CassandraAstyanaxConnection.getInstance().getKeyspace().prepareMutationBatch();
        ColumnListMutation<String> mutation = m.withRow(CassandraAstyanaxConnection.getInstance().getEmp_cf(), rowKey);
        for (Map.Entry<String, String> entry : attributes.entrySet()) {
            mutation = mutation.putColumn(entry.getKey(), entry.getValue(), null);
        }
        mutation.putColumn("lmd", System.currentTimeMillis());
        m.setConsistencyLevel(ConsistencyLevel.CL_ONE).execute();
    } catch (ConnectionException e) {
            // log here
    } catch (Exception e) {
            // log here
    }
}

下面是我使用 Astyanax 客户端制作 Cassandra 连接的类-

public class CassandraAstyanaxConnection {
    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<String, String> emp_cf;

    private static class ConnectionHolder {
        static final CassandraAstyanaxConnection connection = new CassandraAstyanaxConnection();
    }
    public static CassandraAstyanaxConnection getInstance() {
        return ConnectionHolder.connection;
    }
    /**
     * Creating Cassandra connection using Astyanax client
     *
     */
    private CassandraAstyanaxConnection() {
        context = new AstyanaxContext.Builder()
        .forCluster(Constants.CLUSTER)
        .forKeyspace(Constants.KEYSPACE)
        .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
            .setPort(9160)
            .setMaxConnsPerHost(1000)
            .setSeeds("host1:portnumber")
        )
        .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
            .setCqlVersion("3.0.0")
            .setTargetCassandraVersion("1.2")
            .setConnectionPoolType(ConnectionPoolType.ROUND_ROBIN)
            .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE))
        .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
        .buildKeyspace(ThriftFamilyFactory.getInstance());
        context.start();
        keyspace = context.getEntity();
        emp_cf = ColumnFamily.newColumnFamily(Constants.COLUMN_FAMILY, StringSerializer.get(), StringSerializer.get());
    }
    /**
     * returns the keyspace
     * 
     * @return
     */
    public Keyspace getKeyspace() {
        return keyspace;
    }
    public ColumnFamily<String, String> getEmp_cf() {
        return emp_cf;
    }
}

使用 Astyanax 插入复合列的任何简单示例都将帮助我更好地理解。我找不到与此相关的任何示例。有人可以帮我解决这个问题吗?

我们只用复合键完成了此操作,但我相信原理保持不变。 ColumnListMutation 类有一个 putColumn 方法,该方法可以采用任意类型及其序列化程序。 这需要一个固定的类型(不是任意的Map),尽管你可以创建一个采用Map的序列化程序(有一个MapSerializer,但我没有使用过它)。 我们使用 AnnotatedCompositeSerializer 而不是从头开始编写 Serializer。 所以代码将是这样的(我还没有测试过这个,但它应该足以让你开始):

public static class ComplexType {
  @Component(ordinal = 0)
  String val1;
  @Component(ordinal = 1)
  String val2;
  @Component(ordinal = 2)
  int timestamp;
  // get/set methods left out...
}
private static final AnnotateCompositeSerializer<ComplexType> complexTypeSerializer = new AnnotatedCompositeSerializer<>(ComplexType.class);
public void upsertAttributes(final String rowKey, final Map<String, String> attributes,  final String columnFamily) {
  try {
      MutationBatch m = CassandraAstyanaxConnection.getInstance().getKeyspace().prepareMutationBatch();
      ComplexType ct = createComplexTypeFromAttributes(attributes);
      ColumnListMutation<String> mutation = m.withRow(CassandraAstyanaxConnection.getInstance().getEmp_cf(), rowKey);
    mutation
          .putColumn("lmd", ct, complexTypeSerializer, null);
    m.setConsistencyLevel(ConsistencyLevel.CL_ONE).execute();
} catch (ConnectionException e) {
        // log here
} catch (Exception e) {
        // log here
}
}

相关内容

  • 没有找到相关文章

最新更新