从存储在GCS中的文件中读取Google Cloud Dataflow中的Excel文件



目前Google CloudDataflow不支持从Google Cloud Storage读取Excel文件。

对于解决方法,我尝试了以下方法,

我尝试使用应用程序引擎工具来读取Excel文件,然后使用Apache POI并尝试将Excel文件转换为CSV文件。

下面是使用的代码示例,

GcsService gcsService = GcsServiceFactory.createGcsService(); GcsFilename fileName = new GcsFilename("TestBucket", "Test1.xlsx"); GcsInputChannel readChannel = gcsService.openPrefetchingReadChannel(fileName, 0, BUFFER_SIZE); InputStream inputStream = Channels.newInputStream(readChannel);

然后添加Apache POI来读取InputStream

XSSFWorkbook workbook = new XSSFWorkbook(inputStream); XSSFWorkbook workbook = new XSSFWorkbook(new FileInputStream(inputFile)); XSSFSheet sheet = workbook.getSheetAt(0);

但收到以下错误,

Aug 17, 2017 6:58:35 PM com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl waitForFetch
WARNING: PrefetchingGcsInputChannelImpl [filename=GcsFilename(TestBucket, Test1.xlsx), blockSizeBytes=2048, closed=false, eofHit=false, length=-1, fetchPosition=0, pendingFetch=com.google.common.util.concurrent.Futures$ImmediateFailedFuture@7770f470, retryParams=RetryParams [requestTimeoutMillis=30000, requestTimeoutRetryFactor=1.2, maxRequestTimeout=60000, retryMinAttempts=3, retryMaxAttempts=6, initialRetryDelayMillis=1000, maxRetryDelayMillis=32000, retryDelayBackoffFactor=2.0, totalRetryPeriodMillis=50000]]: IOException fetching block
java.util.concurrent.ExecutionException: java.io.IOException: java.lang.NullPointerException
at com.google.common.util.concurrent.Futures$ImmediateFailedFuture.get(Futures.java:234)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.waitForFetch(PrefetchingGcsInputChannelImpl.java:152)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.access$000(PrefetchingGcsInputChannelImpl.java:43)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl$1.call(PrefetchingGcsInputChannelImpl.java:136)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl$1.call(PrefetchingGcsInputChannelImpl.java:134)
at com.google.appengine.tools.cloudstorage.RetryHelper.doRetry(RetryHelper.java:108)
at com.google.appengine.tools.cloudstorage.RetryHelper.runWithRetries(RetryHelper.java:166)
at com.google.appengine.tools.cloudstorage.RetryHelper.runWithRetries(RetryHelper.java:156)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.waitForFetchWithRetry(PrefetchingGcsInputChannelImpl.java:134)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.read(PrefetchingGcsInputChannelImpl.java:212)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at java.io.FilterInputStream.read(Unknown Source)
at java.io.PushbackInputStream.read(Unknown Source)
at java.util.zip.ZipInputStream.readFully(Unknown Source)
at java.util.zip.ZipInputStream.readLOC(Unknown Source)
at java.util.zip.ZipInputStream.getNextEntry(Unknown Source)
at org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource.<init>(ZipInputStreamZipEntrySource.java:51)
at org.apache.poi.openxml4j.opc.ZipPackage.<init>(ZipPackage.java:83)
at org.apache.poi.openxml4j.opc.OPCPackage.open(OPCPackage.java:267)
at org.apache.poi.util.PackageHelper.open(PackageHelper.java:39)
at org.apache.poi.xssf.usermodel.XSSFWorkbook.<init>(XSSFWorkbook.java:204)
at chalel.paratChalel.main(paratChalel.java:102)
Caused by: java.io.IOException: java.lang.NullPointerException
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService$BlobStorageAdapter.getInstance(LocalRawGcsService.java:186)
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService$BlobStorageAdapter.access$000(LocalRawGcsService.java:109)
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService.ensureInitialized(LocalRawGcsService.java:194)
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService.readObjectAsync(LocalRawGcsService.java:432)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.requestBlock(PrefetchingGcsInputChannelImpl.java:107)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.<init>(PrefetchingGcsInputChannelImpl.java:88)
at com.google.appengine.tools.cloudstorage.GcsServiceImpl.openPrefetchingReadChannel(GcsServiceImpl.java:126)
at chalel.paratChalel.main(paratChalel.java:91)
Caused by: java.lang.NullPointerException
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService$BlobStorageAdapter.<init>(LocalRawGcsService.java:123)
at com.google.appengine.tools.cloudstorage.dev.LocalRawGcsService$BlobStorageAdapter.getInstance(LocalRawGcsService.java:184)
... 7 more
Aug 17, 2017 6:58:35 PM com.google.appengine.tools.cloudstorage.RetryHelper doRetry
INFO: RetryHelper(44.11 ms, 1 attempts, com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl$1@7bedc48a): Attempt #1 failed [java.io.IOException: PrefetchingGcsInputChannelImpl [filename=GcsFilename(TestBucket, Test1.xlsx), blockSizeBytes=2048, closed=false, eofHit=false, length=-1, fetchPosition=0, pendingFetch=com.google.common.util.concurrent.Futures$ImmediateFailedFuture@77f1baf5, retryParams=RetryParams [requestTimeoutMillis=30000, requestTimeoutRetryFactor=1.2, maxRequestTimeout=60000, retryMinAttempts=3, retryMaxAttempts=6, initialRetryDelayMillis=1000, maxRetryDelayMillis=32000, retryDelayBackoffFactor=2.0, totalRetryPeriodMillis=50000]]: Prefetch failed, prefetching again], sleeping for 1146 ms
Aug 17, 2017 6:58:36 PM com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl waitForFetch
WARNING: PrefetchingGcsInputChannelImpl [filename=GcsFilename(TestBucket, Test1.xlsx), blockSizeBytes=2048, closed=false, eofHit=false, length=-1, fetchPosition=0, pendingFetch=com.google.common.util.concurrent.Futures$ImmediateFailedFuture@77f1baf5, retryParams=RetryParams [requestTimeoutMillis=30000, requestTimeoutRetryFactor=1.2, maxRequestTimeout=60000, retryMinAttempts=3, retryMaxAttempts=6, initialRetryDelayMillis=1000, maxRetryDelayMillis=32000, retryDelayBackoffFactor=2.0, totalRetryPeriodMillis=50000]]: IOException fetching block
java.util.concurrent.ExecutionException: java.io.IOException: java.lang.NullPointerException
at com.google.common.util.concurrent.Futures$ImmediateFailedFuture.get(Futures.java:234)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.waitForFetch(PrefetchingGcsInputChannelImpl.java:152)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.access$000(PrefetchingGcsInputChannelImpl.java:43)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl$1.call(PrefetchingGcsInputChannelImpl.java:136)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl$1.call(PrefetchingGcsInputChannelImpl.java:134)
at com.google.appengine.tools.cloudstorage.RetryHelper.doRetry(RetryHelper.java:108)
at com.google.appengine.tools.cloudstorage.RetryHelper.runWithRetries(RetryHelper.java:166)
at com.google.appengine.tools.cloudstorage.RetryHelper.runWithRetries(RetryHelper.java:156)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.waitForFetchWithRetry(PrefetchingGcsInputChannelImpl.java:134)
at com.google.appengine.tools.cloudstorage.PrefetchingGcsInputChannelImpl.read(PrefetchingGcsInputChannelImpl.java:212)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at sun.nio.ch.ChannelInputStream.read(Unknown Source)
at java.io.FilterInputStream.read(Unknown Source)
at java.io.PushbackInputStream.read(Unknown Source)
at java.util.zip.ZipInputStream.readFully(Unknown Source)
at java.util.zip.ZipInputStream.readLOC(Unknown Source)
at java.util.zip.ZipInputStream.getNextEntry(Unknown Source)
at org.apache.poi.openxml4j.util.ZipInputStreamZipEntrySource.<init>(ZipInputStreamZipEntrySource.java:51)
at org.apache.poi.openxml4j.opc.ZipPackage.<init>(ZipPackage.java:83)
at org.apache.poi.openxml4j.opc.OPCPackage.open(OPCPackage.java:267)
at org.apache.poi.util.PackageHelper.open(PackageHelper.java:39)
at org.apache.poi.xssf.usermodel.XSSFWorkbook.<init>(XSSFWorkbook.java:204)
at chalel.paratChalel.main(paratChalel.java:102)

注意:我在pom中添加了以下依赖项.xml

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.appengine.tools/appengine-gcs-client -->
<dependency>
<groupId>com.google.appengine.tools</groupId>
<artifactId>appengine-gcs-client</artifactId>
<version>0.6</version>
</dependency>

可能是什么问题?

此代码将 Excel 中的所有工作表转换为具有相同工作表名称的 CSV。通过ReadChannel链接存储在Google Cloud中的Excel文件到FileInputStream Java。从那里使用 Excel 可读库(在本例中为 POI(连接到文件输入流。For 循环将 Excel 工作表转换为 CSV。声明一个 BlobId,并通过写入通道将其写入 Google 云存储桶。

private static final int BUFFER_SIZE = 64 * 1024;
private static void printBlob(com.google.cloud.storage.Storage storage, String bucketName, String blobPath)
throws IOException, InvalidFormatException {
try (ReadChannel reader = ((com.google.cloud.storage.Storage) storage).reader(bucketName, blobPath)) {
InputStream inputStream = Channels.newInputStream(reader);
Workbook wb = WorkbookFactory.create(inputStream);
StringBuffer data = new StringBuffer();
for (int i = 0; i < wb.getNumberOfSheets(); i++) {
String fName = wb.getSheetAt(i).getSheetName();
XSSFSheet sheet = (XSSFSheet) wb.getSheetAt(i);
Iterator<Row> rowIterator = sheet.iterator();
data.delete(0, data.length());
while (rowIterator.hasNext()) {
// Get Each Row
Row row = rowIterator.next();
data.append('n');
// Iterating through Each column of Each Row
Iterator<Cell> cellIterator = row.cellIterator();
while (cellIterator.hasNext()) {
Cell cell = cellIterator.next();
// Checking the cell format
switch (cell.getCellType()) {
case Cell.CELL_TYPE_NUMERIC:
data.append(cell.getNumericCellValue() + ",");
break;
case Cell.CELL_TYPE_STRING:
data.append(cell.getStringCellValue() + ",");
break;
case Cell.CELL_TYPE_BOOLEAN:
data.append(cell.getBooleanCellValue() + ",");
break;
case Cell.CELL_TYPE_BLANK:
data.append("" + ",");
break;
default:
data.append(cell + ",");
}
}
}
String filename = "test_excel/"+fName;
BlobId blobId = BlobId.of(bucketname, filename);
byte[] content = data.toString().getBytes(UTF_8);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
try (WriteChannel writer = storage.writer(blobInfo)) {
writer.write(ByteBuffer.wrap(content, 0, content.length));
}
}
}
}

您是否考虑过尝试使用 Cloud Dataprep 来读取 Excel 数据?

它本身支持Excel工作簿,尽管它们需要上传到Dataprep,而不是从GCS读取。如果您尝试分析和清理 Excel 工作簿数据以进行进一步分析,Dataprep 可能更方便,它恰好使用 Dataflow 在下面进行处理。

最新更新