有没有办法使用 datastax/spark-cassandra-connector 来选择每个分区键的最新版本,该版本相当于 Cassandra 3.6 及更高版本的每个分区限制选项?
在 Cassandra 3.6 及更高版本中,"每个分区限制"选项设置 查询从每个分区返回的最大行数。为 例如,创建一个将数据排序为多个表格的表 分区。
我尝试了以下方法,但没有成功:
卡桑德拉版本
[CQLSH 5.0.1 |卡桑德拉 3.9.0 |CQL 规范 3.4.2 |本机协议 v4]
主要
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.rdd.CassandraJavaPairRDD;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import com.datastax.spark.connector.japi.rdd.CassandraTableScanJavaRDD;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import scala.Tuple2;
import scala.Tuple3;
import static java.lang.Double.*;
import static java.lang.Integer.*;
@Slf4j
public class Main extends Configured implements Tool {
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Main(), args));
}
@Override
public int run(String[] args) throws Exception {
SPARK_SESSION = SparkSession
.builder()
.master(SPARK_MASTER)
.appName(APP_NAME)
.config("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
.config("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
.config("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
.config("pushdown", "true")
.getOrCreate();
try (JavaSparkContext sc = new JavaSparkContext(SPARK_SESSION.sparkContext())) {
insertPerPartitionLimitTestList();
getJavaRddPerPartitionLimitTest(sc);
getTypedJavaRddPerPartitionLimitTest(sc);
getJavaPairRddPerPartitionLimitTest(sc);
getCassandraJavaRddPerPartitionLimitTest(sc);
getTypedCassandraJavaRddPerPartitionLimitTest(sc);
getCassandraTableScanJavaRddPerPartitionLimitTest(sc);
getTypedCassandraTableScanJavaRddPerPartitionLimitTest(sc);
getCassandraJavaRddToJavaRddPerPartitionLimitTest(sc);
getSparkDatasetPerPartitionLimitTest(sc);
getSparkSqlDatasetPerPartitionLimitTest();
log.info("Done");
return 0; // success exit code
} catch (Throwable t) {
log.error("Spark transform failed.", t);
return 1; // failure exit code
}
}
public final Map<String, String> cassandraConfig(String keyspace, String table) {
return ImmutableMap.<String, String>builder()
.put("spark.cassandra.connection.host", CASSANDRA_HOST_IPS)
.put("spark.cassandra.auth.username", CASSANDRA_USER_NAME)
.put("spark.cassandra.auth.password", CASSANDRA_PASSWORD)
.put("pushdown", "true")
.put("keyspace", keyspace)
.put("table", table)
.build();
}
/**
* Generate test data to INSERT INTO the Cassandra bug.per_partition_limit_test table.
*
* @param listSize The number of rows of test data to generate.
* @return {@link List} of {@link PerPartitionLimitTest} containing test data.
*/
public List<PerPartitionLimitTest> buildPerPartitionLimitTestList(Integer listSize){
final Timestamp timeSeriesDate = Timestamp.from(LocalDateTime.now().atZone(ZoneId.of("UTC")).toInstant());
final List<PerPartitionLimitTest> perPartitionLimitTests = new ArrayList<>(listSize);
// Populate List of objects with test data.
for(int i = 0; i < listSize; i++){
final String itemUuid = UUID.randomUUID().toString();
perPartitionLimitTests.add(
PerPartitionLimitTest.of(
itemUuid,
timeSeriesDate,
String.format("/items/%s", itemUuid.toString())
)
);
}
return perPartitionLimitTests;
}
/**
* Generate test data and INSERT Dataset data into Cassandra table
*/
public void insertPerPartitionLimitTestList(){
final Map<String, String> cassandraConfig = cassandraConfig("bug", "per_partition_limit_test");
createDatasetFromList(
PerPartitionLimitTest.class,
buildPerPartitionLimitTestList(20)
)
.select("itemUuid", "timeSeriesDate", "itemUri")
.toDF("item_uuid",
"time_series_date",
"item_uri")
.write()
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(cassandraConfig)
.save();
}
private PerPartitionLimitTestRowReaderFactory perPartitionLimitTestRowReaderFactory = new PerPartitionLimitTestRowReaderFactory();
public String getPerPartitionLimitTestItemUuidMin(JavaSparkContext sc){
return String.valueOf(
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.first()
.getItemUuid());
}
public void getJavaRddPerPartitionLimitTest(JavaSparkContext sc){
final String itemUuidMin = String.valueOf(
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.first()
.getItemUuid());
JavaRDD<CassandraRow> javaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test")
.where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
public void getTypedJavaRddPerPartitionLimitTest(JavaSparkContext sc){
JavaRDD<PerPartitionLimitTest> javaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
public void getJavaPairRddPerPartitionLimitTest(JavaSparkContext sc){
JavaPairRDD<String, PerPartitionLimitTest> javaPairRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1")
.keyBy((Function<PerPartitionLimitTest, String>) PerPartitionLimitTest::getItemUuid);
log.info(String.format("javaPairRDD.count() = %s", javaPairRDD.count()));
}
public void getTypedCassandraJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraJavaRDD<PerPartitionLimitTest> cassandraJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraJavaRDD.count() = %s", cassandraJavaRDD.count()));
}
public void getCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraTableScanJavaRDD<CassandraRow> cassandraTableScanJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test")
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
}
public void getTypedCassandraTableScanJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraTableScanJavaRDD<PerPartitionLimitTest> cassandraTableScanJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test", perPartitionLimitTestRowReaderFactory)
.where("PER PARTITION LIMIT 1");
log.info(String.format("cassandraTableScanJavaRDD.count() = %s", cassandraTableScanJavaRDD.count()));
}
public void getCassandraJavaRddToJavaRddPerPartitionLimitTest(JavaSparkContext sc){
CassandraJavaRDD<CassandraRow> cassandraJavaRDD = javaFunctions(sc)
.cassandraTable("bug", "per_partition_limit_test");
JavaRDD<PerPartitionLimitTest> javaRDD = cassandraJavaRDD
.where("PER PARTITION LIMIT 1")
.map((Function<CassandraRow, PerPartitionLimitTest>) cassandraRow -> PerPartitionLimitTest.of(
cassandraRow.getUUID("item_uuid").toString(),
new Timestamp(cassandraRow.getDateTime("time_series_date").getMillis()),
cassandraRow.getString("item_uri")
));
log.info(String.format("javaRDD.count() = %s", javaRDD.count()));
}
/**
* SELECT data from an input data source into a typed {@link Dataset}.
*
* @param clazz {@link Class} The class of type T that Spark should used to convert the internal Spark SQL representation into. This
* tells Spark the type of object each row in this Dataset should be encoded as.
* @param format Specifies the input data source format.
* @param config {@link Map} of {@link String} containing options defining the input data source connection.
* @param <T> type of class.
* @return Typed {@link Dataset} containing table data selected from the input data source.
*/
public <T> Dataset<T> getPerPartitionLimitTestDataset(Class<T> clazz, String format, Map<String, String> config) {
final Encoder<T> encoder = Encoders.bean(clazz);
return SPARK_SESSION
.read()
.format(format)
.options(config)
.load()
.select("item_uuid", "time_series_date", "item_uri")
.toDF("itemUuid", "timeSeriesDate", "itemUri")
.as(encoder);
}
public void getSparkDatasetPerPartitionLimitTest(JavaSparkContext sc){
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.where("PER PARTITION LIMIT 1");
log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
}
public void getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan(JavaSparkContext sc){
final String itemUuidMin = getPerPartitionLimitTestItemUuidMin(sc);
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(
PerPartitionLimitTest.class,
"org.apache.spark.sql.cassandra",
cassandraConfig("bug", "per_partition_limit_test")
)
.where(String.format("TOKEN(item_uuid) > TOKEN(%s) PER PARTITION LIMIT 1", itemUuidMin));
log.info(String.format("perPartitionLimitTestDataset.count() = %s", perPartitionLimitTestDataset.count()));
}
public void getSparkSqlDatasetPerPartitionLimitTest(){
final Dataset<PerPartitionLimitTest> perPartitionLimitTestDataset =
getPerPartitionLimitTestDataset(PerPartitionLimitTest.class, "org.apache.spark.sql.cassandra", cassandraConfig("bug", "per_partition_limit_test"));
// Register the DataFrame as a SQL temporary view
perPartitionLimitTestDataset.createOrReplaceTempView("perPartitionLimitTests");
final Encoder<PerPartitionLimitTest> perPartitionLimitTestEncoder = Encoders.bean(PerPartitionLimitTest.class);
// Modify data using Spark SQL
final Dataset<PerPartitionLimitTest> perPartitionLimitTestSqlDS = SPARK_SESSION.sql(
"SELECT item_uuid, "
+ "time_series_date, "
+ "'item_uri "
+ "FROM perPartitionLimitTests "
+ "PER PARTITION LIMIT 1")
.as(perPartitionLimitTestEncoder);
log.info(String.format("perPartitionLimitTestSqlDS.count() = %s", perPartitionLimitTestSqlDS.count()));
}
}
PerPartitionLimitTestRowReader
import java.io.Serializable;
import java.sql.Timestamp;
import com.datastax.driver.core.Row;
import com.datastax.spark.connector.CassandraRowMetadata;
import com.datastax.spark.connector.ColumnRef;
import com.datastax.spark.connector.cql.TableDef;
import com.datastax.spark.connector.rdd.reader.RowReader;
import com.datastax.spark.connector.rdd.reader.RowReaderFactory;
import scala.collection.IndexedSeq;
public class PerPartitionLimitTestRowReader extends GenericRowReader<PerPartitionLimitTest> {
private static final long serialVersionUID = 1L;
private static RowReader<PerPartitionLimitTest> reader = new PerPartitionLimitTestRowReader();
public static class PerPartitionLimitTestRowReaderFactory implements RowReaderFactory<PerPartitionLimitTest>, Serializable{
private static final long serialVersionUID = 1L;
@Override
public RowReader<PerPartitionLimitTest> rowReader(TableDef arg0, IndexedSeq<ColumnRef> arg1) {
return reader;
}
@Override
public Class<PerPartitionLimitTest> targetClass() {
return PerPartitionLimitTest.class;
}
}
@Override
public PerPartitionLimitTest read(Row row, CassandraRowMetadata rowMetaData) {
PerPartitionLimitTest perPartitionLimitTest = new PerPartitionLimitTest();
perPartitionLimitTest.setItemUuid(row.getUUID("item_uuid").toString());
perPartitionLimitTest.setTimeSeriesDate(new Timestamp(row.getTimestamp("time_series_date").getTime()));
perPartitionLimitTest.setItemUri(row.getString("item_uri"));
return perPartitionLimitTest;
}
}
}
泛型行读取器
import java.io.Serializable;
import com.datastax.spark.connector.ColumnRef;
import com.datastax.spark.connector.rdd.reader.RowReader;
import scala.Option;
import scala.collection.Seq;
public abstract class GenericRowReader<T> implements RowReader<T>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public Option<Seq<ColumnRef>> neededColumns() {
return Option.empty();
}
}
PerPartitionLimitTest Domain Entity
import java.io.Serializable;
import java.sql.Timestamp;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Table;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@Data
@NoArgsConstructor
@Table(keyspace = "bug", name = "per_partition_limit_test")
@RequiredArgsConstructor(staticName = "of")
@XmlType(name = "PerPartitionLimitTest")
@XmlRootElement(name = "perPartitionLimitTest")
public class PerPartitionLimitTest implements Serializable {
/**
* Type 4 uuid that uniquely identifies the item.
*/
@Valid
@NotNull @NonNull
@Column(name = "item_uuid")
private String itemUuid;
/**
* The timestamp when the data was inserted into Cassandra.
*/
@NotNull @NonNull
@Column(name = "time_series_date")//, codec = TimestampTypeCodec.class)
private Timestamp timeSeriesDate;
/**
* URI that points to an itme.
*/
@Column(name = "item_uri")
@NotNull @NonNull
private String itemUri;
}
卡桑德拉表:
USE bug;
DROP TABLE IF EXISTS bug.per_partition_limit_test;
CREATE TABLE bug.per_partition_limit_test (
item_uuid uuid,
time_series_date timestamp,
item_uri text static,
PRIMARY KEY ((item_uuid), time_series_date)
) WITH CLUSTERING ORDER BY (time_series_date DESC)
AND comment = 'Table Properties:
default_time_to_live - set to 518400 seconds which is 6 days, data will be automatically dropped after 6 days
Compaction
class - set to TimeWindowCompactionStrategy which is used for time series data stored in tables that use the default TTL for all data
compaction_window_unit - set to DAYS which is time unit used to define the bucket size
compaction_window_size - set to 6 which is how many units per bucket'
AND compaction = {'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy', 'compaction_window_size': '6', 'compaction_window_unit': 'DAYS'}
AND default_time_to_live = 518400
AND gc_grace_seconds = 519400;
马文参考资料:
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.2</version>
<scope>compile</scope>
</dependency>
错误
[Stage 0:> (0 + 8) / 18]ERROR [2017-01-27 04:24:38,061] (Executor task launch worker-1) org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
[Stage 0:> (0 + 8) / 18]ERROR [2017-01-27 04:26:02,044] (Executor task launch worker-3) org.apache.spark.executor.Executor: Exception in task 3.0 in stage 0.0 (TID 3)
com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.Responses$Error.asException(Responses.java:132)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:224)
at com.datastax.driver.core.SessionManager$4.apply(SessionManager.java:200)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:906)
at shade.com.datastax.spark.connector.google.common.util.concurrent.Futures$1$1.run(Futures.java:635)
... 3 common frames omitted
Wrapped by: com.datastax.driver.core.exceptions.SyntaxError: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:58)
at com.datastax.driver.core.exceptions.SyntaxError.copy(SyntaxError.java:24)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:113)
at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
at com.sun.proxy.$Proxy8.prepare(Unknown Source)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:279)
... 16 common frames omitted
Wrapped by: java.io.IOException: Exception during preparation of SELECT "item_uuid", "time_series_date", "item_uri" FROM "bug"."per_partition_limit_test" WHERE token("item_uuid") > ? AND token("item_uuid") <= ? AND PER PARTITION LIMIT 1 ALLOW FILTERING: line 1:154 no viable alternative at input 'PARTITION' (...("item_uuid") <= ? AND [PER] PARTITION...)
at com.datastax.spark.connector.rdd.CassandraTableScanRDD.createStatement(CassandraTableScanRDD.scala:293)
ERROR [2017-01-27 01:41:50,369] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'PARTITION' expecting <EOF>(line 1, pos 67)
== SQL ==
TOKEN(item_uuid) > TOKEN(13432d97-3849-4158-8405-804447d1b0c3) PER PARTITION LIMIT 1
-------------------------------------------------------------------^^^
ERROR [2017-01-27 04:27:31,265] (main) Main: Spark transform failed.
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input ''' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, SCIENTIFIC_DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 36)
== SQL ==
SELECT item_uuid, time_series_date, 'item_uri FROM perPartitionLimitTests PER PARTITION LIMIT 1
------------------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
at Main.getSparkSqlDatasetPerPartitionLimitTest(Main.java:397)
at Main.run(Main.java:177)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
不,有一个 JIRA 可以添加此功能 https://datastax-oss.atlassian.net/browse/SPARKC-446
如果您想进行beta测试,我为此添加了PR