我正在用我的第一个使用Java的Spark/Cassandra程序做一些愚蠢的事情,希望有人能提供帮助我弄清楚为什么我会得到这个错误:
: com.datastax.driver.core.exceptions.SyntaxError: line 1:8 no viable alternative at input 'FROM' (SELECT [FROM]...)
设置是
Create keyspace test with replicaton={'class':strategy name,
'replication_factor': No of replications on different nodes}
CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
use test;
CREATE TABLE KeyValue ( key varchar, value bigint, PRIMARY KEY (key));
INSERT INTO KeyValue (key, value) VALUES ('afoo', 100);
代码(下图(很简单...我正在使用"选择"子句,所以我不确定为什么驱动程序不是拿起我指定的列。
import com.datastax.spark.connector.cql.CassandraConnector;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
public class JavaDemo {
public static void main(String[] args) throws Exception {
String sparkMaster = "local[2]";
String cassandraHost = "localhost";
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraHost);
JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
CassandraConnector connector = CassandraConnector.apply(conf);
JavaRDD<KeyValue> rdd = javaFunctions(sc)
.cassandraTable("test", "keyvalue", mapRowTo(KeyValue.class))
.withConnector(connector).select("key", "value")
.where("key = 'afoo'");
rdd.foreach(row -> System.out.println("got item" + row));
}
public static class KeyValue implements Serializable {
private String key;
private Integer value;
public KeyValue() {
}
public static KeyValue newInstance(String k, Integer v) {
KeyValue kv = new KeyValue();
kv.setKey(k);
kv.setValue(v);
return kv;
}
public String getKey() {
return key;
}
public Integer getValue() {
return value;
}
void setKey(String k) {
this.key = k;
}
void setValue(Integer v) {
this.value = v;
}
@Override
public String toString() {
return "KeyValue{" +
"key='" + key + ''' +
", value=" + value +
'}';
}
}
}
更新:如果我按如下方式更新代码,我可以避免语法错误......这还不是我想要的。 明天我会摆弄它,如果没有人打败我,我会发布答案。 我很接近;^(
JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
CassandraConnector connector = CassandraConnector.apply(conf);
CassandraTableScanJavaRDD<CassandraRow> rdd = javaFunctions(sc)
.cassandraTable("test", "keyvalue")
.select("key", "value")
.where("key = 'afoo'");
rdd.foreach(row -> System.out.println("got item" + row));
由于您没有提供任何列映射器,Spark连接器将使用默认JavaBeanColumnMapper
例如,基本上它是正常的Java反射,例如来自apache的BeanUtils。这将需要所有选定列的公共构造函数或公共 getter/setter。
它工作,您应该具有如下所示的公共构造函数,或者将 setter/getter 方法定义为 key
属性的公共方法。希望对您有所帮助。
公共构造函数:
public KeyValue(String key, Integer value) {
this.key = key;
this.value = value;
}
公共二传手/吸气器:
public void setKey(String key) {
this.key = key;
}
public void setValue(Integer value) {
this.value = value;
}