将 Hive 结果集转换为多字符分隔的 CSV - 选择HiveQl 处理器 NIFI



我正在尝试使用 selectHiveQL 处理器从 hive 获取包含 10M+ 记录的大全表,并且确实发现源代码中的 converttoCSVStream(( 方法比获取结果集花费更长的时间。 观察代码:结果集正在逐行迭代,然后添加到输出流中。

当表大小较小时,它会在几秒钟内完成该过程,但由于数据很大,因此需要更长的时间。有什么方法可以优化转换吗? 我尝试过 100000/1000/10000/1000 的提取大小。

这是代码:

while (rs.next()) {
//logger.info("+++++++++++++Inside the While loop+++++++++++++++++");
if (callback != null) {
callback.processRow(rs);
}
List<String> rowValues = new ArrayList<>(nrOfColumns);
for (int i = 1; i <= nrOfColumns; i++) {
final int javaSqlType = meta.getColumnType(i);
final Object value = rs.getObject(i);
//logger.info("+++++++++++++Entering the Switch at +++++++++++++++++");
switch (javaSqlType) {
case CHAR:
case LONGNVARCHAR:
case LONGVARCHAR:
case NCHAR:
case NVARCHAR:
case VARCHAR:
String valueString = rs.getString(i);
if (valueString != null) {
// Removed extra quotes as those are a part of the escapeCsv when required.
StringBuilder sb = new StringBuilder();
if (outputOptions.isQuote()) {
sb.append(""");
if (outputOptions.isEscape()) {
sb.append(StringEscapeUtils.escapeCsv(valueString));
} else {
sb.append(valueString);
}
sb.append(""");
rowValues.add(sb.toString());
} else {
if (outputOptions.isEscape()) {
rowValues.add(StringEscapeUtils.escapeCsv(valueString));
} else {
rowValues.add(valueString);
}
}
} else {
rowValues.add("");
}
break;
case ARRAY:
case STRUCT:
case JAVA_OBJECT:
String complexValueString = rs.getString(i);
if (complexValueString != null) {
rowValues.add(StringEscapeUtils.escapeCsv(complexValueString));
} else {
rowValues.add("");
}
break;
default:
if (value != null) {
rowValues.add(value.toString());
} else {
rowValues.add("");
}
}
//logger.info("+++++++++++++Exiting the Switch at +++++++++++++++++" + System.currentTimeMillis());
}

// Write row values
//logger.info("+++++++++++++Writing Row value at+++++++++++++++++" + System.currentTimeMillis());
outStream.write(StringUtils.join(rowValues, outputOptions.getDelimiter()).getBytes(StandardCharsets.UTF_8));
outStream.write("n".getBytes(StandardCharsets.UTF_8));
nrOfRows++;

if (maxRows > 0 && nrOfRows == maxRows)
break;
}

对 SelectHiveQL CSV 输出的改进在 NIFI-5307 中有所介绍,但尚未实现。也有讨论(但据我所知没有 Jira(允许增量提交(即在处理完后立即发送部分结果,而不是在结果集完全处理后发送所有结果(,例如在 QueryDatabaseTable (NIFI-4836( 中所做的那样。

最新更新