将粘合作业连接到Amazon密钥空间



我正在尝试将AWS粘合作业连接到Amazon密钥空间。是否可以使用pyspark连接并处理这些表。PS:由于组织限制,我无法使用AWS cli。

您可以利用开源的spark-cassandra连接器将AWS Glue与Amazon Keyspaces连接起来。

首先,您需要为您的帐户启用murmur3分区器或随机分区器。

UPDATE system.local set partitioner='org.apache.cassandra.dht.Murmur3Partitioner' where key='local';

其次,确保您了解所需的容量。默认情况下,密钥空间表是使用OnDemand模式创建的,该模式通过根据您以前的流量窥视将资源翻倍来学习所需容量。新创建的表能够执行每秒4000 WCU/和每秒12000 RCU/。如果您需要更高的容量,请在具有所需吞吐量的供应模式下创建表,然后切换到按需模式。

第三,在我们的示例存储库中找到我们预先构建的示例。我们有出口、进口、计数和前N的模式。示例显示了如何将spark-cassandra连接器加载到s3,设置数据加载的最佳实践。以下片段显示导出到s3。

val spark: SparkContext = new SparkContext(conf)
val glueContext: GlueContext = new GlueContext(spark)
val sparkSession: SparkSession = glueContext.getSparkSession
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import sparkSession.implicits._
Job.init(args("JOB_NAME"), glueContext, args.asJava)
val tableName = args("TABLE_NAME")
val keyspaceName = args("KEYSPACE_NAME")
val backupS3 = args("S3_URI")
val backupFormat = args("FORMAT")
val tableDf = sparkSession.read
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> tableName, "keyspace" -> keyspaceName))
.load()
tableDf.write.format(backupFormat).mode(SaveMode.ErrorIfExists).save(backupS3)
Job.commit()
}
}

最佳做法是对每个DPU/Worker使用Glue进行速率限制。了解您希望每个DPU实现的吞吐量,并在cassandra驱动程序设置中设置节流器。

advanced.throttler = {
class = RateLimitingRequestThrottler
max-requests-per-second = 1000
max-queue-size = 50000
drain-interval = 1 millisecond
}

您需要确保您拥有访问Amazon密钥空间的适当IAM权限。如果您使用VPC端点,您还需要在此处包含特权。

最新更新