在Java中高效地从输入流中流式传输大型JSON



为了节省内存并避免OOM错误,我想从输入流中流式传输一个大型JSON,并从中提取所需的内容。更确切地说,我想提取并保存该JSON中的一些字符串:

  1. files.content.fileContent.subList.text="文件中的某些文本">
  2. files.content.fileContent.subList.text="文件2中的一些文本">

并将它们保存到字符串变量中:

String result = "some text in file rnsome text in file2"

我尝试使用Jackson解析JSON:

JsonFactory jsonFactory = new JsonFactory();
StringBuilder result = new StringBuilder();
try (JsonParser jsonParser = jsonFactory.createParser(jsonAsInputStream)) {
String fieldName;
while (jsonParser.nextToken() != JsonToken.END_OBJECT) {
jsonParser.nextToken();
fieldName = jsonParser.getCurrentName();
if ("files".equals(fieldName)) {
while (true) {
jsonParser.nextToken();
fieldName = jsonParser.getCurrentName();
if ("content".equals(fieldName)) {
jsonParser.nextToken();
fieldName = jsonParser.getCurrentName();
while (true) {
if ("text".equals(fieldName)) {
result.append(jsonParser.getText());
}
}
}
}
}
}
LOGGER.info("result: {}", result);
} catch (JsonParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

上述方法根本不起作用,解决方案变得复杂起来。有什么简单的方法可以解析JSON inputStream并从中提取一些文本吗?

下面是附加的JSON:

{
"id": "1",
"name": "TestFile.xlsx",
"files": [
{
"id": "1",
"fileName": "TestFile.xlsx",
"types": {
"fileId": "1",
"context": [
{
"id": 1,
"contextId": "xyz",
"metadata": {
"abc": "1"
}
},
{
"id": 2,
"contextId": "abc"
}
],
"fileSettings": [
{
"id": 1,
"settingId": 1
},
{
"id": 2,
"settingId": 2
}

],
"fileAttachments": [
{
"id": 1,
"canDelete": true,
"canAttach": []
}
],
"replacements": [
{
"id": 1,
"replacementText": "xcv"
}
]
},
"content": [
{
"id": "1",
"contextList": [
1,
2,
3
],
"fileContent": {
"contentType": "text",
"subList": [
{
"id": "1",
"subList": [
{
"id": "1",
"text": "some text in file",
"type": "text"
}
]
}
]
},
"externalContent": {
"id": "1",
"children": [
{
"id": "1",
"contentType": "text corrupted",
"children": []
}
]
}
},
{
"id": "2",
"contextList": [
1,
2
],
"fileContent": {
"contentType": "text",
"subList": [
{
"id": "2",
"subList": [
{
"id": "1",
"text": "some text in file2",
"type": "text"
}
]
}
]
},
"externalContent": {
"id": "2",
"children": [
{
"id": "2",
"contentType": "text corrupted2",
"children": []
}
]
}
}
]
}
]

}

简而言之,

  • 您的代码无法工作,因为它实现了错误的算法
  • 正如已经提出的那样,JsonPath似乎是一个很好的DSL实现,但它使用DOM方法将整个JSON树收集到内存中,因此您将再次遇到OOM

您有两种解决方案:

  • 在您当前的方法中实现一个适当的算法(我同意您的做法是正确的(
  • 尝试实现类似JsonPath实现的东西,将问题分解为支持真正流式方法的较小问题

我不会记录我的大部分代码,因为它很容易理解和适应其他库,但您可以使用Java 17(启用预览功能(和javax.json(+一些Lombok for Java样板(开发以下代码中更高级的东西:

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class PathJsonParser
implements JsonParser, Iterator<JsonParser.Event> {
private static final int DEFAULT_PATH_LENGTH = 32;
private final JsonParser jsonParser;
private final AbstractPathElement[] path;
private int last;
public static PathJsonParser create(final JsonParser jsonParser) {
final int maxPathLength = DEFAULT_PATH_LENGTH;
final PathJsonParser pathJsonParser = new PathJsonParser(jsonParser, new AbstractPathElement[maxPathLength]);
pathJsonParser.path[0] = AbstractPathElement.Root.instance;
for ( int i = 1; i < maxPathLength; i++ ) {
pathJsonParser.path[i] = new AbstractPathElement.Container();
}
return pathJsonParser;
}
@Override
public Event next() {
final Event event = jsonParser.next();
switch ( event ) {
case START_ARRAY -> {
path[last].tryIncreaseIndex();
path[++last].reset(JsonValue.ValueType.ARRAY);
}
case START_OBJECT -> {
path[last].tryIncreaseIndex();
path[++last].reset(JsonValue.ValueType.OBJECT);
}
case KEY_NAME -> path[last].setKeyName(jsonParser.getString());
case VALUE_STRING -> path[last].tryIncreaseIndex();
case VALUE_NUMBER -> path[last].tryIncreaseIndex();
case VALUE_TRUE -> path[last].tryIncreaseIndex();
case VALUE_FALSE -> path[last].tryIncreaseIndex();
case VALUE_NULL -> path[last].tryIncreaseIndex();
case END_OBJECT -> --last;
case END_ARRAY -> --last;
default -> throw new AssertionError(event);
}
return event;
}
public boolean matchesRoot(final int at) {
@Nullable
final AbstractPathElement e = tryElementAt(at);
return e != null && e.matchesRoot();
}
public boolean matchesIndex(final int at, final IntPredicate predicate) {
@Nullable
final AbstractPathElement e = tryElementAt(at);
return e != null && e.matchesIndex(predicate);
}
public boolean matchesName(final int at, final Predicate<? super String> predicate) {
@Nullable
final AbstractPathElement e = tryElementAt(at);
return e != null && e.matchesName(predicate);
}
// @formatter:off
@Override public boolean hasNext() { return jsonParser.hasNext(); }
@Override public String getString() { return jsonParser.getString(); }
@Override public boolean isIntegralNumber() { return jsonParser.isIntegralNumber(); }
@Override public int getInt() { return jsonParser.getInt(); }
@Override public long getLong() { return jsonParser.getLong(); }
@Override public BigDecimal getBigDecimal() { return jsonParser.getBigDecimal(); }
@Override public JsonLocation getLocation() { return jsonParser.getLocation(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonObject getObject() { return jsonParser.getObject(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonValue getValue() { return jsonParser.getValue(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public JsonArray getArray() { return jsonParser.getArray(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<JsonValue> getArrayStream() { return jsonParser.getArrayStream(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<Map.Entry<String, JsonValue>> getObjectStream() { return jsonParser.getObjectStream(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public Stream<JsonValue> getValueStream() { return jsonParser.getValueStream(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public void skipArray() { jsonParser.skipArray(); }
@Override @SuppressWarnings("MethodDoesntCallSuperMethod") public void skipObject() { jsonParser.skipObject(); }
@Override public void close() { jsonParser.close(); }
// @formatter:on
@Nullable
private AbstractPathElement tryElementAt(final int at) {
final int pathAt;
if ( at >= 0 ) {
pathAt = at;
} else {
pathAt = last + at + 1;
}
if ( pathAt < 0 || pathAt > last ) {
return null;
}
return path[pathAt];
}
private abstract static sealed class AbstractPathElement
permits AbstractPathElement.Root, AbstractPathElement.Container {
abstract void reset(JsonValue.ValueType valueType);
abstract void setKeyName(String keyName);
abstract void tryIncreaseIndex();
abstract boolean matchesRoot();
abstract boolean matchesIndex(IntPredicate predicate);
abstract boolean matchesName(Predicate<? super String> predicate);
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
private static final class Root
extends AbstractPathElement {
private static final AbstractPathElement instance = new Root();
@Override
void reset(final JsonValue.ValueType valueType) {
throw new UnsupportedOperationException();
}
@Override
void setKeyName(final String keyName) {
throw new UnsupportedOperationException();
}
@Override
void tryIncreaseIndex() {
// do nothing
}
@Override
boolean matchesRoot() {
return true;
}
@Override
boolean matchesIndex(final IntPredicate predicate) {
return false;
}
@Override
boolean matchesName(final Predicate<? super String> predicate) {
return false;
}
}
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
private static final class Container
extends AbstractPathElement {
private static final String NO_KEY_NAME = null;
private static final int NO_INDEX = -1;
private JsonValue.ValueType valueType;
private String keyName = NO_KEY_NAME;
private int index = NO_INDEX;
@Override
void reset(final JsonValue.ValueType valueType) {
this.valueType = valueType;
keyName = NO_KEY_NAME;
index = NO_INDEX;
}
@Override
void setKeyName(final String keyName) {
this.keyName = keyName;
}
@Override
void tryIncreaseIndex() {
if ( valueType == JsonValue.ValueType.ARRAY ) {
index++;
}
}
@Override
boolean matchesRoot() {
return false;
}
@Override
boolean matchesIndex(final IntPredicate predicate) {
return switch ( valueType ) {
case ARRAY -> index != NO_INDEX && predicate.test(index);
case OBJECT -> false;
case STRING, NUMBER, TRUE, FALSE, NULL -> throw new AssertionError(valueType);
};
}
@Override
boolean matchesName(final Predicate<? super String> predicate) {
return switch ( valueType ) {
case ARRAY -> false;
case OBJECT -> !Objects.equals(keyName, NO_KEY_NAME) && predicate.test(keyName);
case STRING, NUMBER, TRUE, FALSE, NULL -> throw new AssertionError(valueType);
};
}
}
}
}

使用示例:

public final class PathJsonParserTest {
// $.files.0.content.0.fileContent.subList.0.subList.0.text
private static boolean matches(final PathJsonParser parser) {
return parser.matchesName(-1, name -> name.equals("text"))
&& parser.matchesIndex(-2, index -> true)
&& parser.matchesName(-3, name -> name.equals("subList"))
&& parser.matchesIndex(-4, index -> true)
&& parser.matchesName(-5, name -> name.equals("subList"))
&& parser.matchesName(-6, name -> name.equals("fileContent"))
&& parser.matchesIndex(-7, index -> true)
&& parser.matchesName(-8, name -> name.equals("content"))
&& parser.matchesIndex(-9, index -> true)
&& parser.matchesName(-10, name -> name.equals("files"))
&& parser.matchesRoot(-11);
}
@Test
public void test()
throws IOException {
try ( final PathJsonParser parser = PathJsonParser.create(JsonParsers.openFromResource(PathJsonParserTest.class, "input.json")) ) {
for ( ; parser.hasNext(); parser.next() ) {
if ( matches(parser) ) {
parser.next();
System.out.println(parser.getValue());
}
}
}
}
}

当然,看起来并不像JsonPath那么酷,但你可以做以下事情:

  • 实现匹配器生成器API使其看起来更好
  • 实现符合JSON路径的解析器来构建匹配器
  • 将CCD_ 2模式包装到通用算法中(类似于CCD_ 3实现或包装用于流API的算法(
  • 实现某种简单的JSON到对象反序列化器

或者,如果可能的话,找到一个好的代码生成器,它可以生成一个运行时开销尽可能小的流式解析器(它的结果与您的非常相似,但有效(。(如果你知道的话,请打电话给我。(

您检查JsonPath了吗?您可以使用Gson或Jackson作为提供者,但默认情况下,它使用Json smart,它注重性能。

下面是一个基于所附JSON的示例。

InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("file.json");
String[] textArray = JsonPath.parse(inputStream).read("files[*].content[*].fileContent.subList[*].subList[*].text", String[].class);
Arrays.stream(textArray).forEach(System.out::println);

JsonPath会占用大量内存。如果您没有足够的内存来处理大文件,可以使用流/令牌方法。如果你不存储文本,下面的代码可以处理6GB的json文件,堆大小不超过900MB。

public class Main {
public static void main(String[] args) throws Exception {
try (InputStream inputStream = getJsonAsInputStream()) {
EnumMap<JsonToken, JsonTokenHandler> map = getJsonTokenHandler();
Context context = new Context();
JsonReader reader = new JsonReader(new InputStreamReader(inputStream));
while (true) {
JsonToken token = reader.peek();
JsonTokenHandler jsonTokenHandler = map.get(token);
jsonTokenHandler.handle(reader, context);
if (token.equals(END_DOCUMENT)) {
break;
}
}
context.getTexts().forEach(System.out::println);
}
}
private static EnumMap<JsonToken, JsonTokenHandler> getJsonTokenHandler() {
EnumMap<JsonToken, JsonTokenHandler> map = new EnumMap<>(JsonToken.class);
map.put(BEGIN_ARRAY, (reader, context) -> reader.beginArray());
map.put(END_ARRAY, (reader, context) -> reader.endArray());
map.put(BEGIN_OBJECT, (reader, context) -> reader.beginObject());
map.put(END_OBJECT, (reader, context) -> reader.endObject());
map.put(NAME, (reader, context) -> {
reader.nextName();
context.setCurrentPath(reader.getPath());
});
map.put(STRING, (reader, context) -> {
String string = reader.nextString();
if (context.isTextAttribute()) {
context.addText(string);
}
});
map.put(NUMBER, (reader, context) -> reader.nextString());
map.put(BOOLEAN, (reader, context) -> reader.nextBoolean());
map.put(NULL, (reader, context) -> reader.nextNull());
map.put(END_DOCUMENT, (reader, context) -> {
});
return map;
}
private static InputStream getJsonAsInputStream() throws FileNotFoundException {
File inFile = new File("/path/to/your/large/file.json");
ReadableByteChannel rChannel = new RandomAccessFile(inFile, "r").getChannel();
return Channels.newInputStream(rChannel);
}

static class Context {
private String currentPath;
private List<String> texts = new ArrayList<>();
public void addText(String text) {
texts.add(text);
}
public List<String> getTexts() {
return texts;
}
public void setCurrentPath(String path) {
this.currentPath = path;
}
public boolean isTextAttribute() {
return currentPath.matches("\$\.files\[\d+\]\.content\[\d+\]\.fileContent\.subList\[\d+\]\.subList\[\d+\]\.text");
}
}
interface JsonTokenHandler {
void handle(JsonReader reader, Context context) throws IOException;
}

}

您的问题可能有一个非常简单的解决方案。您可以使用Jackson";JsonParser";流式传输对象,ObjectMapper解析对象,而无需重写解析逻辑。

这看起来像这样:

try (JsonParser jsonParser = objectMapper.getFactory().createParser(inputStreamReader)) {
if (jsonParser.nextToken() != JsonToken.START_ARRAY) {
throw new IllegalStateException("Expected content to be an array");
}
while (jsonParser.nextToken() != JsonToken.END_ARRAY) {
MyObject myObject = objectMapper.readValue(jsonParser, MyObject.class);
log.info("This is my object: {}", myObject);
}
}

如果您的每个对象都足够小(大多数情况下都是这样(,那么您可以轻松处理具有GB数据的文件。这并不是你的具体情况。

唯一的问题是,如果你的一个物体很大,这将不起作用。

我希望它能帮助到别人。

最新更新