BigQuery读取存储API,ARROW格式为NUMERIC和BIGNUMERIC数据追加0



我在Google文档中试用了Read Storage API实现的代码。但是Numeric和BigNumeric列返回时附加了0。

例如:我的表有数字数据123,下面的代码返回如下:架构<numeric_datatype:十进制(38128(>numeric_datatype123000000000000000000

使用的代码:https://cloud.google.com/bigquery/docs/reference/storage/libraries

请帮助理解并解决此问题。

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.protobuf.Timestamp;
import com.google.api.gax.rpc.ServerStream;
public class StorageArrowExample {
private static class SimpleRowReader implements AutoCloseable {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
private final VectorSchemaRoot root;
private final VectorLoader loader;
public SimpleRowReader(ArrowSchema arrowSchema) throws IOException {
Schema schema = MessageSerializer.deserializeSchema(new ReadChannel(
new ByteArrayReadableSeekableByteChannel(arrowSchema.getSerializedSchema().toByteArray())));
System.out.println(schema);
Preconditions.checkNotNull(schema);
List<FieldVector> vectors = new ArrayList<>();
for (Field field : schema.getFields()) {
vectors.add(field.createVector(allocator));
}
root = new VectorSchemaRoot(vectors);
root.syncSchema();
loader = new VectorLoader(root);
}
public void processRows(ArrowRecordBatch batch) throws IOException {
org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch = MessageSerializer
.deserializeRecordBatch(new ReadChannel(
new ByteArrayReadableSeekableByteChannel(batch.getSerializedRecordBatch().toByteArray())),
allocator);
System.out.println(deserializedBatch);
loader.load(deserializedBatch);
// Release buffers from batch (they are still held in the vectors in root).
deserializedBatch.close();
String test = root.contentToTSVString();
System.out.println(root.contentToTSVString());
root.clear();
}
@Override
public void close() throws Exception {
// TODO Auto-generated method stub
}
}
public static void main(String... args) throws Exception {
String projectId = "****";
String table = "****";
String dataset = "****";
Integer snapshotMillis = null;
if (args.length > 1) {
snapshotMillis = Integer.parseInt(args[1]);
}
try (BigQueryReadClient client = BigQueryReadClient.create()) {
String parent = String.format("projects/%s", projectId);
String srcTable = String.format("projects/%s/datasets/%s/tables/%s", projectId, table, dataset);
TableReadOptions options = TableReadOptions.newBuilder().addSelectedFields("numeric_datatype")
.addSelectedFields("bignumeric_datatype").clearArrowSerializationOptions().build();
ReadSession.Builder sessionBuilder = ReadSession.newBuilder().setTable(srcTable)
.setDataFormat(DataFormat.ARROW).setReadOptions(options);
// Optionally specify the snapshot time. When unspecified, snapshot time is
// "now".
if (snapshotMillis != null) {
Timestamp t = Timestamp.newBuilder().setSeconds(snapshotMillis / 1000)
.setNanos((int) ((snapshotMillis % 1000) * 1000000)).build();
TableModifiers modifiers = TableModifiers.newBuilder().setSnapshotTime(t).build();
sessionBuilder.setTableModifiers(modifiers);
}
// Begin building the session creation request.
CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder().setParent(parent)
.setReadSession(sessionBuilder).setMaxStreamCount(1);
ReadSession session = client.createReadSession(builder.build());
// Setup a simple reader and start a read session.
try (SimpleRowReader reader = new SimpleRowReader(session.getArrowSchema())) {
Preconditions.checkState(session.getStreamsCount() > 0);
String streamName = session.getStreams(0).getName();
ReadRowsRequest readRowsRequest = ReadRowsRequest.newBuilder().setReadStream(streamName).build();
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
reader.processRows(response.getArrowRecordBatch());
}
}
}
}
}

使用您的代码,我无法重现问题。

创建测试表:

CREATE or REPLACE table abc.num
AS SELECT CAST(123 as numeric) as numeric_datatype, 
CAST(123 as bignumeric) as bignumeric_datatype

然后运行您的代码(在粘贴的代码中生成交换了数据集和表的URL表(,我得到:

ArrowRecordBatch [length=1, nodes=[ArrowFieldNode [length=1, nullCount=0], ArrowFieldNode [length=1, nullCount=0]], #buffers=4, buffersLayout=[ArrowBuffer [offset=0, size=0], ArrowBuffer [offset=0, size=16], ArrowBuffer [offset=16, size=0], ArrowBuffer [offset=16, size=32]], closed=false]
numeric_datatype    bignumeric_datatype
123.000000000   123.00000000000000000000000000000000000000

小数点后的额外数字是意料之中的,因为数字的小数位数是9,而双数字的小数小数位数是38,这意味着整数值在逻辑上包括了这些值。contentToTSVString只是对DecimalVector/Decimal256Vector返回的BigDecimal调用toString。如果要删除小数位数,可以在打印值之前调用DecimalVector的getObjectstripTrailingZeros

相关相关性:

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>grpc-google-cloud-bigquerystorage-v1</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<version>6.0.0</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquerystorage</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
<version>6.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.api/gax -->
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
<version>2.12.2</version>
</dependency>

最新更新