我有一个数据集,它主要是一个2D表,但是一个列(Field
)(称为Attributes)在每个单元格中包含Structs
的List
。每一个Struct
有三个Field
s: Attribute Tag, Attribute Type和Attribute价值。
属性Field
的定义是:
/**
* Attribute Tag - Two character tag.
*/
public static final Field ATTRIBUTE_TAG_FIELD =
new Field("AttributeTag", FieldType.notNullable(new ArrowType.FixedSizeBinary(2)), null);
/**
* Attribute Type - One character type.
*/
public static final Field ATTRIBUTE_TYPE_FIELD =
new Field(
"AttributeType",
new FieldType(false,
new ArrowType.FixedSizeBinary(1), null),
null
);
/**
* String representation of the Attribute value.
*/
public static final Field ATTRIBUTE_VALUE_FIELD = new Field("AttributeValue", FieldType.notNullable(new ArrowType.Utf8()), null);
/**
* The field is a nullable List of Structs each with an attribute tag,
type and value.
*/
public static final Field ATTRIBUTES_FIELD =
new Field("Attributes", FieldType.nullable(new ArrowType.List()), List.of(
new Field("Attribute", FieldType.nullable(new ArrowType.Struct()), List.of(
ATTRIBUTE_TAG_FIELD, ATTRIBUTE_TYPE_FIELD, ATTRIBUTE_VALUE_FIELD))));
我有这个代码,试图从一些填充属性源数据。虽然在运行时不会产生错误,但它不会返回属性向量中的任何值。
final ListVector attributes = (ListVector)
ATTRIBUTES_FIELD.createVector(allocator);
// this is the source of the attributes that I will populate into the
attributes vector
final List<SAMRecord.SAMTagAndValue> recordAttributes =
samRecord.getAttributes();
if (recordAttributes != null && recordAttributes.size() > 0 ) {
final UnionListWriter listWriter = attributes.getWriter();
listWriter.allocate();
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
listWriter.setPosition(attributeIndex);
listWriter.startList();
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue recordAttributes.get(attributeIndex);
// I think the problem is here. In a debugger this seems to create a new writer not related to my Vector??
final BaseWriter.StructWriter structWriter = listWriter.struct("Attribute");
structWriter.start();
final byte[] tagBytes =
samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
// todo find out the type from the value
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes =
samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(typeBytes.length);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
structWriter.end();
});
listWriter.setValueCount(recordAttributes.size());
listWriter.end();
}
为什么在attributes
ListVector
中没有任何值?正确的做法是什么?
我最终找到的解决方案包括将UnionListWriter
和startList()
,endList()
和setRowCount()
调用放在适当的位置。
一般格式为:
try (
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
// create all your vectors and other closable ...
final ListVector attributes = (ListVector) SamAlignmentSchema.ATTRIBUTES_FIELD.createVector(allocator);
// create the writer for the list vector
final UnionListWriter listWriter = attributes.getWriter();
// create the vector schema root
VectorSchemaRoot samRecordBatch = new VectorSchemaRoot(
aligmentSchema.getFields(),
List.of(
// add vectors including the list vecto
attributes
)
)
){
//allocate the VectorSchemaRoot
samRecordBatch.allocateNew();
// for reach sam record (read)
samReader.iterator().stream().forEach(samRecord -> {
int index = samRecordBatch.getRowCount();
System.out.println("index = " + index);
// set the values of the primitive vectors using set or setSafe ...
// write a list of attributes into the attrbutes field
final List<SAMRecord.SAMTagAndValue> recordAttributes = samRecord.getAttributes();
// start a new list at the current index
listWriter.startList();
listWriter.setPosition(index);
if (recordAttributes != null) {
// for each attribute
IntStream.range(0, recordAttributes.size()).forEachOrdered(attributeIndex -> {
// put the values of the attribute in the arrow struct
final SAMRecord.SAMTagAndValue samTagAndValue = recordAttributes.get(attributeIndex);
final BaseWriter.StructWriter structWriter = listWriter.struct();
// start a struct to go into the list
structWriter.start();
final byte[] tagBytes = samTagAndValue.tag.getBytes(StandardCharsets.UTF_8);
final byte[] typeBytes = "S".getBytes(StandardCharsets.UTF_8);
final byte[] valueBytes = samTagAndValue.value.toString().getBytes(StandardCharsets.UTF_8);
ArrowBuf tempBuf = allocator.buffer(tagBytes.length);
tempBuf.setBytes(0, tagBytes);
structWriter.varChar("AttributeTag").writeVarChar(0, tagBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(typeBytes.length);
tempBuf.setBytes(0, typeBytes);
structWriter.varChar("AttributeType").writeVarChar(0, typeBytes.length, tempBuf);
tempBuf.close();
tempBuf = allocator.buffer(valueBytes.length);
tempBuf.setBytes(0, valueBytes);
structWriter.varChar("AttributeValue").writeVarChar(0, valueBytes.length, tempBuf);
tempBuf.close();
//finished with this struct
structWriter.end();
});
}
index++;
// finished writing the list of structs for this position
listWriter.endList();
listWriter.setValueCount(index);
samRecordBatch.setRowCount(index);
});