将数据帧数据集转换为<Row>特定列的字符串数据类型的 JSON 格式,并将 JSON 字符串转换回数据帧



我有一个数据帧。我需要为每个记录调用一个 Rest API。

假设数据帧如下所示:

|----|-------------|-----|---------|
|UUID|PID          |DEVID|FIRSTNAME|
|----|-------------|-----|---------|
|1111|1234567891011|ABC11|JOHN     |
|2222|9876543256827|ABC22|HARRY    |
|----|-------------|-----|---------|

第一行的 JSON 请求字符串应如下所示(注意:json 是在 2 列上创建的,而不是全部(,因为要调用的 Rest API 需要以下格式的输入:

{"applicationInfo": {"appId": "ec78fef4-92b9-3b1b-a68d-c45376b6977a"}, "requestData": [{"secureData": "JOHN", "secureDataType": "FIRSTNAME", "index": 1 }, {"secureData": "1234567891011", "secureDataType": "PID", "index": 2 } ] }

索引键的值必须动态生成,对每行使用增量计数器。

然后,我需要调用 Rest API 将上述 JSON 作为字符串参数发送。

加密后来自 API 的响应如下所示:

{"responseData":[{"resultCode":"00","secureData":"63ygdydshbhgvdyw3et7edgu","secureDataType":"FIRSTNAME","index":1},{"resultCode":"00","secureData":"HKJJBJHVHG66456456FXXFFCGF","secureDataType":"PID","index":2}],"responseCode":"00","responseMessage":"SUCCESS","resultCounts":{"totalCount":2,"successCount":2,"failedCount":0}}

然后我需要阅读上面的响应并创建一个数据帧,它应该看起来像:

|----|--------------------------|-----|------------------------|
|UUID|PID                       |DEVID|FIRSTNAME               |
|----|--------------------------|-----|------------------------|
|1111|HKJJBJHVHG66456456FXXFFCGF|ABC11|63ygdydshbhgvdyw3et7edgu|
|----|--------------------------|-----|------------------------|

如果我将初始输入数据帧转换为 JSON((.collectAsList((,那么它看起来像:

[{"UUID":"1111","PID":"1234567891011","DEVID":"ABC11","FIRSTNAME":"JOHN"}, {"UUID":"2222","PID":"9876543256827","DEVID":"ABC22","FIRSTNAME":"HARRY"}]

但这不起作用,因为 Rest API 需要以上述特定格式输入。 请帮忙。

对于上述内容,我假设数据集已跨 Spark 工作线程的数量进行分区,并且它是 Row(数据帧(的通用数据集,那么可以采用以下机制。

  1. 将具有所需属性的类定义为数据容器
  2. 将数据集内容作为列表(如果数据集,则采用 AsList 方法,参考(
  3. 创建并填充数据容器的对象(并以稍后识别它们的方式存储,您必须使用解密的数据重新填充它们(
  4. 使用杰克逊将列表序列化为 JSON 数组(参考( 步骤 4 和 5 可以与 Jackson 自定义序列化程序结合使用 参考示例
  5. 进行 REST 调用并重新填充数据容器对象(在使用 Jackson 反序列化响应后(
  6. 创建数据框(示例(
  7. 处理数据框(行的数据集(

注意:您提供的 JSON 结构似乎不正确,JSON 数组为 [

{},{},{}]

在您的情况下,给定请求 JSON 的格式,行的直接转换将不起作用,如第 1 点所述,制作一组模型类,您可以考虑以下模型类。

package org.test.json;
import java.util.List;
public class RequestModel {
protected ApplicationInfo applicationInfo;
protected List<RequestData> requestData;
public ApplicationInfo getApplicationInfo() {return applicationInfo;}
public void setApplicationInfo(ApplicationInfo applicationInfo) {this.applicationInfo = applicationInfo;}
public List<RequestData> getRequestData() {return requestData;}
public void setRequestData(List<RequestData> requestData) {this.requestData = requestData;}
}//class closing


package org.test.json;
public class ApplicationInfo {
protected String appId;
public String getAppId() {return appId;}
public void setAppId(String appId) {this.appId = appId;}
}//class closing


package org.test.json;
public class RequestData {
protected String secureData;
protected String secureDataType;
protected int index;
public String getSecureData() {return secureData;}
public void setSecureData(String secureData) {this.secureData = secureData;}
public String getSecureDataType() {return secureDataType;}
public void setSecureDataType(String secureDataType) {this.secureDataType = secureDataType;}
public int getIndex() {return index;}
public void setIndex(int index) {this.index = index;}
}//class closing

处理从数据框获取的列表并填充模型类,然后使用 Jackson 进行转换以获取请求 JSON。


下面应该做你要找的,不要直接运行这个,数据集为空

//Do not run this, will generate NullPointer, for example only
Dataset<Row> ds=null;
List<Row> rows=ds.collectAsList();
RequestModel request=new RequestModel();
//Set application id
ApplicationInfo appInfo=new ApplicationInfo();
appInfo.setAppId("some id");
request.setApplicationInfo(appInfo);
List<RequestData> reqData=new ArrayList<>();
for(int i=0;i<rows.size();i++) {
//Incrementally generated for each row
int index=i;
Row r=rows.get(i);
int rowLength=r.size();
for(int j=0;j<rowLength;j++) {
RequestData dataElement=new RequestData();
dataElement.setIndex(index);
switch(j) {
case 1:{dataElement.setSecureData(r.getString(j));dataElement.setSecureDataType("PID");break;}
case 3:{dataElement.setSecureDataType(r.getString(j));dataElement.setSecureDataType("FIRSTNAME");break;}
default:{break;}
}//switch closing
reqData.add(dataElement);
}//for closing
}//for closing

最新更新