我正在尝试从spark 2.0.2 org.apache.spark.spark.examples.ml.javadecisiontreeclassification example构建决策树分类示例的版本。我无法直接使用它,因为它使用了LIBSVM编码的数据。我需要避免使用LIBSVM(无证件的AFAIK),以更轻松地对普通数据集进行分类。我正在尝试调整示例以使用Kyro编码的数据集。
该问题起源于下面的地图调用,尤其是使用Encoders.kyro作为SparkML功能矢量指示的Encoder的后果和Java中的Spark 2.0.2编码器
public SMLDecisionTree(Dataset<Row> incomingDS, final String label, final String[] features)
{
this.incomingDS = incomingDS;
this.label = label;
this.features = features;
this.mapSet = new StringToDoubleMapperSet(features);
this.sdlDS = incomingDS
.select(label, features)
.filter(new FilterFunction<Row>()
{
public boolean call(Row row) throws Exception
{
return !row.getString(0).equals(features[0]); // header
}
})
.map(new MapFunction<Row, LabeledFeatureVector>()
{
public LabeledFeatureVector call(Row row) throws Exception
{
double labelVal = mapSet.addValue(0, row.getString(0));
double[] featureVals = new double[features.length];
for (int i = 1; i < row.length(); i++)
{
Double val = mapSet.addValue(i, row.getString(i));
featureVals[i - 1] = val;
}
return new LabeledFeatureVector(labelVal, Vectors.dense(featureVals));
}
// https://stackoverflow.com/questions/36648128/how-to-store-custom-objects-in-a-dataset
}, Encoders.kryo(LabeledFeatureVector.class));
Dataset<LabeledFeatureVector>[] splits = sdlDS.randomSplit(new double[] { 0.7, 0.3 });
this.trainingDS = splits[0];
this.testDS = splits[1];
}
这影响了原始Spark示例的StringIndexer和VectorIndexer,这些示例无法处理所得的Kyro编码数据集。这是从火花决策树示例代码中获取的管道构建代码:
public void run() throws IOException
{
sdlDS.show();
StringIndexerModel labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(df);
VectorIndexerModel featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4) // treat features with > 4 distinct values as continuous.
.fit(df);
DecisionTreeClassifier classifier = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures");
IndexToString labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels());
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]
{ labelIndexer, featureIndexer, classifier, labelConverter });
此代码显然期望具有带有标签和双重编码功能的向量的"标签"one_answers"功能"列的数据集。问题在于,Kyro产生了一个名为" Value"的单列,该列似乎容纳了一个字节数组。我知道如何将其转换为原始的StringIndexer和VectorIndexer期望的文档。有人可以帮忙吗?java请。
首先不要使用Kryo编码器。它通常非常有限,在这里根本不适用。这里最简单的解决方案是删除自定义类并使用Row
编码器。首先,您需要一堆进口:
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.ml.linalg.*;
和一个模式:
List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("label", DoubleType, false));
fields.add(DataTypes.createStructField("features", new VectorUDT(), false));
StructType schema = DataTypes.createStructType(fields);
可以这样定义编码器:
Encoder<Row> encoder = RowEncoder.apply(schema);
使用如下所示:
Dataset<Row> inputDs = spark.read().json(sc.parallelize(Arrays.asList(
"{"lablel": 1.0, "features": "foo"}"
)));
inputDs.map(new MapFunction<Row, Row>() {
public Row call(Row row) {
return RowFactory.create(1.0, Vectors.dense(1.0, 2.0));
}
}, encoder);