以前我以以下格式读取JSON数据:
JSON
{
"CreationTime":"2018-01-12T12:32:31",
"Id":"08f81fd7-21f1-48ba-a991-08d559b88cc5",
"Operation":"AddedToGroup",
"RecordType":14,
"UserType":0,
"Version":1,
"Workload":"OneDrive",
"ClientIP":"115.186.129.229",
"UserId":"omaji7@emumbaa10.onmicrosoft.com",
"EventSource":"SharePoint",
"ItemType":"Web"
}
我正在阅读KAFKA主题中的JSON数据,并在其上进行一些流处理,然后将其传递到另一个主题。在处理中,我创建了两个JSON对象,send
和received
。
使用此代码:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("========> o365_user_activity_by_date Log: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation_type", received.get("Operation"));
send.put("app_name", received.get("Workload"));
keywords.add(send.toString());
// apply regex to value and for each match add it to keywords
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("o365_user_activity_by_date");
这很简单。现在我有一个带有列表的JSON数据。
JSON
{
"CreationTime":"2017-12-27T07:47:46",
"Id":"10ee505b-90a4-4ac1-b96f-a6dbca939694",
"Operation":"Add member to role.",
"OrganizationId":"2f88f444-62da-4aae-b8af-8331a6915801",
"RecordType":8,
"ResultStatus":"success",
"UserKey":"10030000A656FE5B@emumbaa10.onmicrosoft.com",
"UserType":0,
"Version":1,
"Workload":"AzureActiveDirectory",
"ObjectId":"mustafa@emumbaa10.onmicrosoft.com",
"UserId":"omaji7@emumbaa10.onmicrosoft.com",
"AzureActiveDirectoryEventType":1,
"ExtendedProperties":[
{
"Name":"Role.ObjectID",
"Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"
},
{
"Name":"Role.DisplayName",
"Value":"Billing Administrator"
},
{
"Name":"Role.TemplateId",
"Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"
},
{
"Name":"Role.WellKnownObjectName",
"Value":"BillingAdmins"
}
],
"Actor":[
{
"ID":"omaji7@emumbaa10.onmicrosoft.com",
"Type":5
},
{
"ID":"10030000A656FE5B",
"Type":3
},
{
"ID":"User_d03ca514-adfa-4585-a8bd-7182a9a086c7",
"Type":2
}
],
"ActorContextId":"2f88f444-62da-4aae-b8af-8331a6915801",
"InterSystemsId":"6d402a5b-c5de-4d9f-a805-9371c109e55f",
"IntraSystemId":"a5568d01-f100-497a-b88b-c9731ff31248",
"Target":[
{
"ID":"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d",
"Type":2
},
{
"ID":"mustafa@emumbaa10.onmicrosoft.com",
"Type":5
},
{
"ID":"1003BFFDA67CCA03",
"Type":3
}
],
"TargetContextId":"2f88f444-62da-4aae-b8af-8331a6915801"
}
我该如何在流处理中做同样的事情?我希望能够根据某些键读取JSON数据(包括列表数据键(。
为什么不将JSON转换为对象,然后过滤对象中的字段?
你不能这样做吗?
send.put("target_0_id", received.get("Target").getJSONObject(0).get("ID"));
您可以使用GSON库,并可以将JSON转换为对象并使用Getter和Setter,您可以构建所需的输出JSON。您也可以解析输入JSON以获取JSONARRAY详细信息。以下是代码如何使用pojo进行。
输入类:
public class Input {
private String UserType;
private String TargetContextId;
private String RecordType;
private String Operation;
private String Workload;
private String UserId;
private String OrganizationId;
private String InterSystemsId;
private ExtendedProperties[] ExtendedProperties;
private String ActorContextId;
private String CreationTime;
private String IntraSystemId;
private Target[] Target;
private Actor[] Actor;
private String Id;
private String Version;
private String ResultStatus;
private String ObjectId;
private String AzureActiveDirectoryEventType;
private String UserKey;
public String getUserType ()
{
return UserType;
}
public void setUserType (String UserType)
{
this.UserType = UserType;
}
public String getTargetContextId ()
{
return TargetContextId;
}
public void setTargetContextId (String TargetContextId)
{
this.TargetContextId = TargetContextId;
}
public String getRecordType ()
{
return RecordType;
}
public void setRecordType (String RecordType)
{
this.RecordType = RecordType;
}
public String getOperation ()
{
return Operation;
}
public void setOperation (String Operation)
{
this.Operation = Operation;
}
public String getWorkload ()
{
return Workload;
}
public void setWorkload (String Workload)
{
this.Workload = Workload;
}
public String getUserId ()
{
return UserId;
}
public void setUserId (String UserId)
{
this.UserId = UserId;
}
public String getOrganizationId ()
{
return OrganizationId;
}
public void setOrganizationId (String OrganizationId)
{
this.OrganizationId = OrganizationId;
}
public String getInterSystemsId ()
{
return InterSystemsId;
}
public void setInterSystemsId (String InterSystemsId)
{
this.InterSystemsId = InterSystemsId;
}
public ExtendedProperties[] getExtendedProperties ()
{
return ExtendedProperties;
}
public void setExtendedProperties (ExtendedProperties[] ExtendedProperties)
{
this.ExtendedProperties = ExtendedProperties;
}
public String getActorContextId ()
{
return ActorContextId;
}
public void setActorContextId (String ActorContextId)
{
this.ActorContextId = ActorContextId;
}
public String getCreationTime ()
{
return CreationTime;
}
public void setCreationTime (String CreationTime)
{
this.CreationTime = CreationTime;
}
public String getIntraSystemId ()
{
return IntraSystemId;
}
public void setIntraSystemId (String IntraSystemId)
{
this.IntraSystemId = IntraSystemId;
}
public Target[] getTarget ()
{
return Target;
}
public void setTarget (Target[] Target)
{
this.Target = Target;
}
public Actor[] getActor ()
{
return Actor;
}
public void setActor (Actor[] Actor)
{
this.Actor = Actor;
}
public String getId ()
{
return Id;
}
public void setId (String Id)
{
this.Id = Id;
}
public String getVersion ()
{
return Version;
}
public void setVersion (String Version)
{
this.Version = Version;
}
public String getResultStatus ()
{
return ResultStatus;
}
public void setResultStatus (String ResultStatus)
{
this.ResultStatus = ResultStatus;
}
public String getObjectId ()
{
return ObjectId;
}
public void setObjectId (String ObjectId)
{
this.ObjectId = ObjectId;
}
public String getAzureActiveDirectoryEventType ()
{
return AzureActiveDirectoryEventType;
}
public void setAzureActiveDirectoryEventType (String AzureActiveDirectoryEventType)
{
this.AzureActiveDirectoryEventType = AzureActiveDirectoryEventType;
}
public String getUserKey ()
{
return UserKey;
}
public void setUserKey (String UserKey)
{
this.UserKey = UserKey;
}
@Override
public String toString()
{
return "ClassPojo [UserType = "+UserType+", TargetContextId = "+TargetContextId+", RecordType = "+RecordType+", Operation = "+Operation+", Workload = "+Workload+", UserId = "+UserId+", OrganizationId = "+OrganizationId+", InterSystemsId = "+InterSystemsId+", ExtendedProperties = "+ExtendedProperties+", ActorContextId = "+ActorContextId+", CreationTime = "+CreationTime+", IntraSystemId = "+IntraSystemId+", Target = "+Target+", Actor = "+Actor+", Id = "+Id+", Version = "+Version+", ResultStatus = "+ResultStatus+", ObjectId = "+ObjectId+", AzureActiveDirectoryEventType = "+AzureActiveDirectoryEventType+", UserKey = "+UserKey+"]";
}}
目标类:
public class Target {
private String Type;
private String ID;
public String getType() {
return Type;
}
public void setType(String Type) {
this.Type = Type;
}
public String getID() {
return ID;
}
public void setID(String ID) {
this.ID = ID;
}
@Override
public String toString() {
return "ClassPojo [Type = " + Type + ", ID = " + ID + "]";
}}
演员课:
public class Actor {
private String Type;
private String ID;
public String getType() {
return Type;
}
public void setType(String Type) {
this.Type = Type;
}
public String getID() {
return ID;
}
public void setID(String ID) {
this.ID = ID;
}
@Override
public String toString() {
return "ClassPojo [Type = " + Type + ", ID = " + ID + "]";
}}
ExtendedProperties类:
public class ExtendedProperties {
private String Name;
private String Value;
public String getName() {
return Name;
}
public void setName(String Name) {
this.Name = Name;
}
public String getValue() {
return Value;
}
public void setValue(String Value) {
this.Value = Value;
}
@Override
public String toString() {
return "ClassPojo [Name = " + Name + ", Value = " + Value + "]";
}}
主类:
public class Stack {
public static void main(String[] args) {
doIt();
}
private static void doIt() {
String received = "{"CreationTime":"2017-12-27T07:47:46","Id":"10ee505b-90a4-4ac1-b96f-a6dbca939694","Operation":"Add member to role.","OrganizationId":"2f88f444-62da-4aae-b8af-8331a6915801","RecordType":8,"ResultStatus":"success","UserKey":"10030000A656FE5B@emumbaa10.onmicrosoft.com","UserType":0,"Version":1,"Workload":"AzureActiveDirectory","ObjectId":"mustafa@emumbaa10.onmicrosoft.com","UserId":"omaji7@emumbaa10.onmicrosoft.com","AzureActiveDirectoryEventType":1,"ExtendedProperties":[{"Name":"Role.ObjectID","Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"},{"Name":"Role.DisplayName","Value":"Billing Administrator"},{"Name":"Role.TemplateId","Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"},{"Name":"Role.WellKnownObjectName","Value":"BillingAdmins"}],"Actor":[{"ID":"omaji7@emumbaa10.onmicrosoft.com","Type":5},{"ID":"10030000A656FE5B","Type":3},{"ID":"User_d03ca514-adfa-4585-a8bd-7182a9a086c7","Type":2}],"ActorContextId":"2f88f444-62da-4aae-b8af-8331a6915801","InterSystemsId":"6d402a5b-c5de-4d9f-a805-9371c109e55f","IntraSystemId":"a5568d01-f100-497a-b88b-c9731ff31248","Target":[{"ID":"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d","Type":2},{"ID":"mustafa@emumbaa10.onmicrosoft.com","Type":5},{"ID":"1003BFFDA67CCA03","Type":3}],"TargetContextId":"2f88f444-62da-4aae-b8af-8331a6915801"}";
JSONObject send = new JSONObject();
Gson gson = new Gson();
Input inputObject = gson.fromJson(received, Input.class);
// you can add values here and customize the output JSON
send.put("userId", inputObject.getUserId());
send.put("Workload", inputObject.getWorkload());
// read Actor list
Actor[] arr = inputObject.getActor();
for (int i = 0; i < arr.length; i++) {
// write your logic here how you want to handle the Actor list
// values
System.out.println(arr[i].getID() + " : " + arr[i].getType());
}
// read ExtendedProperties list
ExtendedProperties[] extendedProperties = inputObject.getExtendedProperties();
for (int j = 0; j < extendedProperties.length; j++) {
// write your logic here how you want to handle the
// ExtendedProperties list values
System.out.println(extendedProperties[j].getName() + " : " + extendedProperties[j].getValue());
}
System.out.println("*************");
}}
不使用POJO的替代主类。这里org.json
库已用于解析输入JSON。
public class Test {
public static void main(String[] args) {
doIt();
}
private static void doIt() {
String received = "{"CreationTime":"2017-12-27T07:47:46","Id":"10ee505b-90a4-4ac1-b96f-a6dbca939694","Operation":"Add member to role.","OrganizationId":"2f88f444-62da-4aae-b8af-8331a6915801","RecordType":8,"ResultStatus":"success","UserKey":"10030000A656FE5B@emumbaa10.onmicrosoft.com","UserType":0,"Version":1,"Workload":"AzureActiveDirectory","ObjectId":"mustafa@emumbaa10.onmicrosoft.com","UserId":"omaji7@emumbaa10.onmicrosoft.com","AzureActiveDirectoryEventType":1,"ExtendedProperties":[{"Name":"Role.ObjectID","Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"},{"Name":"Role.DisplayName","Value":"Billing Administrator"},{"Name":"Role.TemplateId","Value":"b0f54661-2d74-4c50-afa3-1ec803f12efe"},{"Name":"Role.WellKnownObjectName","Value":"BillingAdmins"}],"Actor":[{"ID":"omaji7@emumbaa10.onmicrosoft.com","Type":5},{"ID":"10030000A656FE5B","Type":3},{"ID":"User_d03ca514-adfa-4585-a8bd-7182a9a086c7","Type":2}],"ActorContextId":"2f88f444-62da-4aae-b8af-8331a6915801","InterSystemsId":"6d402a5b-c5de-4d9f-a805-9371c109e55f","IntraSystemId":"a5568d01-f100-497a-b88b-c9731ff31248","Target":[{"ID":"User_8f77c311-3ea0-4146-9f7d-db21bd052d3d","Type":2},{"ID":"mustafa@emumbaa10.onmicrosoft.com","Type":5},{"ID":"1003BFFDA67CCA03","Type":3}],"TargetContextId":"2f88f444-62da-4aae-b8af-8331a6915801"}";
JSONObject send = new JSONObject();
JSONObject input = new JSONObject(received);
// you can add values here and customize the output JSON
send.put("userId", input.getString("UserId"));
send.put("Workload", input.getString("Workload"));
// read Actor list
JSONArray actorArray = input.getJSONArray("Actor");
for (int i = 0; i < actorArray.length(); i++) {
// write your logic here how you want to handle the Actor list
// values
System.out.println(
actorArray.getJSONObject(i).getString("ID") + ":" + actorArray.getJSONObject(i).getInt("Type"));
}
// read ExtendedProperties list
JSONArray extendedProperties = input.getJSONArray("ExtendedProperties");
for (int j = 0; j < extendedProperties.length(); j++) {
// write your logic here how you want to handle the
// ExtendedProperties list values
System.out.println(extendedProperties.getJSONObject(j).getString("Name") + " : "
+ extendedProperties.getJSONObject(j).getString("Value"));
}
System.out.println("*************");
}}