如何填充Java Apache箭头字段列表的结构?



我有一个数据集,它主要是一个2D表,但是一个列(Field)(称为Attributes)在每个单元格中包含StructsList。每一个Struct有三个Fields: Attribute Tag, Attribute Type和Attribute价值。

属性Field的定义是:

/**
* Attribute Tag - Two character tag.
*/
public static final Field ATTRIBUTE_TAG_FIELD =
new Field("AttributeTag", FieldType.notNullable(new ArrowType.FixedSizeBinary(2)), null);

/**
* Attribute Type - One character type.
*/
public static final Field ATTRIBUTE_TYPE_FIELD =
new Field(
"AttributeType",
new FieldType(false,
new ArrowType.FixedSizeBinary(1), null),
null
);
/**
* String representation of the Attribute value.
*/
public static final Field ATTRIBUTE_VALUE_FIELD = new Field("AttributeValue", FieldType.notNullable(new ArrowType.Utf8()), null);
/**
* The field is a nullable List of Structs each with an attribute tag,
type and value.
*/
public static final Field ATTRIBUTES_FIELD =
new Field("Attributes", FieldType.nullable(new ArrowType.List()), List.of(
new Field("Attribute", FieldType.nullable(new ArrowType.Struct()), List.of(
ATTRIBUTE_TAG_FIELD, ATTRIBUTE_TYPE_FIELD, ATTRIBUTE_VALUE_FIELD))));

我有这个代码,试图从一些填充属性源数据。虽然在运行时不会产生错误,但它不会返回属性向量中的任何值。

final ListVector attributes = (ListVector)
ATTRIBUTES_FIELD.createVector(allocator);
// this is the source of the attributes that I will populate into the
attributes vector
final List<SAMRecord.SAMTagAndValue> recordAttributes =
samRecord.getAttributes();
if (recordAttributes != null && recordAttributes.size() > 0 ) {
final UnionListWriter listWriter = attributes.getWriter();
listWriter.allocate();
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
listWriter.setPosition(attributeIndex);
listWriter.startList();
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue recordAttributes.get(attributeIndex);
// I think the problem is here. In a debugger this seems to create a new writer not related to my Vector??
final BaseWriter.StructWriter structWriter = listWriter.struct("Attribute");
structWriter.start();
final byte[] tagBytes =
samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
// todo find out the type from the value
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes =
samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();

tempBuf = allocator.buffer(typeBytes.length);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
structWriter.end();
});
listWriter.setValueCount(recordAttributes.size());
listWriter.end();
}

为什么在attributesListVector中没有任何值?正确的做法是什么?

我最终找到的解决方案包括将UnionListWriterstartList(),endList()setRowCount()调用放在适当的位置。

一般格式为:

try (
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);

// create all your vectors and other closable ...
final ListVector attributes = (ListVector) SamAlignmentSchema.ATTRIBUTES_FIELD.createVector(allocator);
// create the writer for the list vector
final UnionListWriter listWriter = attributes.getWriter();
// create the vector schema root
VectorSchemaRoot samRecordBatch = new VectorSchemaRoot(
aligmentSchema.getFields(),
List.of(
// add vectors including the list vecto
attributes
)
)
){
//allocate the VectorSchemaRoot
samRecordBatch.allocateNew();

// for reach sam record (read)
samReader.iterator().stream().forEach(samRecord -> {
int index = samRecordBatch.getRowCount();
System.out.println("index = " + index);
// set the values of the primitive vectors using set or setSafe ...
// write a list of attributes into the attrbutes field
final List<SAMRecord.SAMTagAndValue> recordAttributes = samRecord.getAttributes();
// start a new list at the current index
listWriter.startList();
listWriter.setPosition(index);
if (recordAttributes != null) {
// for each attribute
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue = recordAttributes.get(attributeIndex);
final BaseWriter.StructWriter structWriter = listWriter.struct();

// start a struct to go into the list
structWriter.start();
final byte[] tagBytes = samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(typeBytes.length);
tempBuf.setBytes(0, typeBytes);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
tempBuf.setBytes(0, valueBytes);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
//finished with this struct
structWriter.end();
});
}
index++;
// finished writing the list of structs for this position
listWriter.endList();
listWriter.setValueCount(index);
samRecordBatch.setRowCount(index);
});