上传自定义数据融合插件jar和json文件时出错



我试图上传一个自定义插件到DataFusion。我有两个文件JAR和JSON。我将在下面显示两者的代码:

  1. HTTP-batchsource.jar
@Plugin(type = BatchSource.PLUGIN_TYPE)
@Name(HttpBatchSource.NAME)
@Description("Read data from HTTP endpoint.")
public class HttpBatchSource extends BatchSource<NullWritable, BasePage, StructuredRecord> {
static final String NAME = "HTTP";
private static final Logger LOG = LoggerFactory.getLogger(HttpBatchSource.class);
private final HttpBatchSourceConfig config;
private Schema schema;
public HttpBatchSource(HttpBatchSourceConfig config) {
this.config = config;
}
@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
config.validate(); // validate when macros not yet substituted
config.validateSchema();
pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema());
}
@Override
public void prepareRun(BatchSourceContext context) {
config.validate(); // validate when macros are already substituted
config.validateSchema();
schema = config.getSchema();
LineageRecorder lineageRecorder = new LineageRecorder(context, config.referenceName);
lineageRecorder.createExternalDataset(schema);
lineageRecorder.recordRead("Read", String.format("Read from HTTP '%s'", config.getUrl()),
Preconditions.checkNotNull(schema.getFields()).stream()
.map(Schema.Field::getName)
.collect(Collectors.toList()));
context.setInput(Input.of(config.referenceName, new HttpInputFormatProvider(config)));
}
@Override
public void initialize(BatchRuntimeContext context) throws Exception {
this.schema = config.getSchema();
super.initialize(context);
}
@Override
public void transform(KeyValue<NullWritable, BasePage> input, Emitter<StructuredRecord> emitter) {
BasePage page = input.getValue();
while (page.hasNext()) {
PageEntry pageEntry = page.next();
if (!pageEntry.isError()) {
emitter.emit(pageEntry.getRecord());
} else {
InvalidEntry<StructuredRecord> invalidEntry = pageEntry.getError();
switch (pageEntry.getErrorHandling()) {
case SKIP:
LOG.warn(invalidEntry.getErrorMsg());
break;
case SEND:
emitter.emitError(invalidEntry);
break;
case STOP:
throw new RuntimeException(invalidEntry.getErrorMsg());
default:
throw new UnexpectedFormatException(
String.format("Unknown error handling strategy '%s'", config.getErrorHandling()));
}
}
}
}
}

和HTTP-batchsource.json

{
"metadata": {
"spec-version": "1.5"
},
"configuration-groups": [{
"label": "General",
"properties": [{
"widget-type": "textbox",
"label": "Reference Name",
"name": "referenceName"
},
{
"widget-type": "textbox",
"label": "URL",
"name": "url"
},
{
"widget-type": "select",
"label": "HTTP Method",
"name": "httpMethod",
"widget-attributes": {
"values": [
"GET",
"POST",
"PUT",
"DELETE",
"HEAD"
],
"default": "GET"
}
},
{
"widget-type": "keyvalue",
"label": "Headers",
"name": "headers",
"widget-attributes": {
"showDelimiter": "false"
}
},
{
"widget-type": "textarea",
"name": "requestBody",
"label": "Request Body",
"widget-attributes": {
"rows": "5"
}
}
]
},
{
"label": "Format",
"properties": [{
"widget-type": "select",
"label": "Format",
"name": "format",
"widget-attributes": {
"values": [
"json",
"xml",
"tsv",
"csv",
"text",
"blob"
],
"default": "json"
}
},
{
"widget-type": "textbox",
"label": "JSON/XML Result Path",
"name": "resultPath"
},
{
"widget-type": "keyvalue",
"label": "JSON/XML Fields Mapping",
"name": "fieldsMapping",
"widget-attributes": {
"showDelimiter": "false"
}
},
{
"widget-type": "radio-group",
"label": "CSV Skip First Row",
"name": "csvSkipFirstRow",
"widget-attributes": {
"layout": "inline",
"default": "false",
"options": [{
"id": "true",
"label": "true"
},
{
"id": "false",
"label": "false"
}
]
}
}
]
},
{
"label": "OAuth2",
"properties": [{
"widget-type": "toggle",
"label": "OAuth2 Enabled",
"name": "oauth2Enabled",
"widget-attributes": {
"default": "false",
"on": {
"label": "True",
"value": "true"
},
"off": {
"label": "False",
"value": "false"
}
}
},
{
"widget-type": "textbox",
"label": "Auth URL",
"name": "authUrl"
},
{
"widget-type": "textbox",
"label": "Token URL",
"name": "tokenUrl"
},
{
"widget-type": "textbox",
"label": "Client ID",
"name": "clientId"
},
{
"widget-type": "password",
"label": "Client Secret",
"name": "clientSecret"
},
{
"widget-type": "textbox",
"label": "Scopes",
"name": "scopes"
},
{
"widget-type": "textbox",
"label": "Refresh Token",
"name": "refreshToken"
}
]
},
{
"label": "Basic Authentication",
"properties": [{
"widget-type": "textbox",
"label": "Username",
"name": "username"
},
{
"widget-type": "password",
"label": "Password",
"name": "password"
}
]
},
{
"label": "HTTP Proxy",
"properties": [{
"widget-type": "textbox",
"label": "Proxy URL",
"name": "proxyUrl"
},
{
"widget-type": "textbox",
"label": "Username",
"name": "proxyUsername"
},
{
"widget-type": "password",
"label": "Password",
"name": "proxyPassword"
}
]
},
{
"label": "Error Handling",
"properties": [{
"widget-type": "keyvalue-dropdown",
"label": "HTTP Errors Handling",
"name": "httpErrorsHandling",
"widget-attributes": {
"default": "2..:Success,.*:Fail",
"showDelimiter": "false",
"dropdownOptions": [
"Success",
"Fail",
"Skip",
"Send to error",
"Retry and fail",
"Retry and skip",
"Retry and send to error"
],
"key-placeholder": "HTTP Status Code Regex"
}
},
{
"widget-type": "radio-group",
"label": "Non-HTTP Error Handling",
"name": "errorHandling",
"widget-attributes": {
"layout": "inline",
"default": "stopOnError",
"options": [{
"id": "stopOnError",
"label": "Stop on error"
},
{
"id": "sendToError",
"label": "Send to error"
},
{
"id": "skipOnError",
"label": "Skip on error"
}
]
}
},
{
"widget-type": "radio-group",
"label": "Retry Policy",
"name": "retryPolicy",
"widget-attributes": {
"layout": "inline",
"default": "exponential",
"options": [{
"id": "exponential",
"label": "Exponential"
},
{
"id": "linear",
"label": "Linear"
}
]
}
},
{
"widget-type": "number",
"label": "Linear Retry Interval",
"name": "linearRetryInterval",
"widget-attributes": {
"min": "0",
"default": "30"
}
},
{
"widget-type": "number",
"label": "Max Retry Duration",
"name": "maxRetryDuration",
"widget-attributes": {
"min": "0",
"default": "600"
}
},
{
"widget-type": "number",
"label": "Connect Timeout",
"name": "connectTimeout",
"widget-attributes": {
"min": "0",
"default": "120"
}
},
{
"widget-type": "number",
"label": "Read Timeout",
"name": "readTimeout",
"widget-attributes": {
"min": "0",
"default": "120"
}
}
]
},
{
"label": "Pagination",
"properties": [{
"widget-type": "select",
"label": "Pagination Type",
"name": "paginationType",
"widget-attributes": {
"values": [
"None",
"Link in response header",
"Link in response body",
"Token in response body",
"Increment an index",
"Custom"
],
"default": "None"
}
},
{
"widget-type": "textbox",
"label": "Start Index",
"name": "startIndex",
"widget-attributes": {
"placeholder": "For pagination type "Increment an index""
}
},
{
"widget-type": "textbox",
"label": "Max Index",
"name": "maxIndex",
"widget-attributes": {
"placeholder": "For pagination type "Increment an index""
}
},
{
"widget-type": "textbox",
"label": "Index Increment",
"name": "indexIncrement",
"widget-attributes": {
"placeholder": "For pagination type "Increment an index""
}
},
{
"widget-type": "textbox",
"label": "Next Page JSON/XML Field Path",
"name": "nextPageFieldPath",
"widget-attributes": {
"placeholder": "For pagination type "Link in response body""
}
},
{
"widget-type": "textbox",
"label": "Next Page Token Path",
"name": "nextPageTokenPath",
"widget-attributes": {
"placeholder": "For pagination type "Token in response body""
}
},
{
"widget-type": "textbox",
"label": "Next Page Url Parameter",
"name": "nextPageUrlParameter",
"widget-attributes": {
"placeholder": "For pagination type "Token in response body""
}
},
{
"widget-type": "python-editor",
"label": "Custom Pagination Python Code",
"name": "customPaginationCode",
"widget-attributes": {
"placeholder": "def get_next_page_url(url, page, headers):n    """n    Based on previous page data generates next page url, when "Custom pagination" is enabled.n    Returns 'None' if no more pages to load nn    Args:n        url (string): previous page urln        page (string): a body of previous pagen        headers (dict): a dictionary of headers from previous pagenn    """n    return "https://next-page-url.com""
}
},
{
"widget-type": "number",
"label": "Wait Time Between Pages (milliseconds)",
"name": "waitTimeBetweenPages",
"widget-attributes": {
"min": "0",
"default": "0"
}
}
]
},
{
"label": "SSL/TLS",
"properties": [{
"widget-type": "toggle",
"label": "Verify HTTPS Trust Certificates",
"name": "verifyHttps",
"widget-attributes": {
"default": "true",
"on": {
"label": "True",
"value": "true"
},
"off": {
"label": "False",
"value": "false"
}
}
},
{
"widget-type": "textbox",
"label": "Keystore File",
"name": "keystoreFile"
},
{
"widget-type": "select",
"label": "Keystore Type",
"name": "keystoreType",
"widget-attributes": {
"default": "Java KeyStore (JKS)",
"values": [
"Java KeyStore (JKS)",
"Java Cryptography Extension KeyStore (JCEKS)",
"PKCS #12"
]
}
},
{
"widget-type": "password",
"label": "Keystore Password",
"name": "keystorePassword"
},
{
"widget-type": "textbox",
"label": "Keystore Key Algorithm",
"name": "keystoreKeyAlgorithm",
"widget-attributes": {
"default": "SunX509"
}
},
{
"widget-type": "textbox",
"label": "TrustStore File",
"name": "trustStoreFile"
},
{
"widget-type": "select",
"label": "TrustStore Type",
"name": "trustStoreType",
"widget-attributes": {
"default": "Java KeyStore (JKS)",
"values": [
"Java KeyStore (JKS)",
"Java Cryptography Extension KeyStore (JCEKS)",
"PKCS #12"
]
}
},
{
"widget-type": "password",
"label": "TrustStore Password",
"name": "trustStorePassword"
},
{
"widget-type": "textbox",
"label": "TrustStore Key Algorithm",
"name": "trustStoreKeyAlgorithm",
"widget-attributes": {
"default": "SunX509"
}
},
{
"widget-type": "csv",
"label": "Transport Protocols",
"name": "transportProtocols",
"widget-attributes": {
"default": "TLSv1.2"
}
},
{
"widget-type": "textbox",
"label": "Cipher Suites",
"name": "cipherSuites"
}
]
}
],
"emit-errors": true,
"outputs": [{
"name": "schema",
"label": "schema",
"widget-type": "schema",
"widget-attributes": {
"schema-types": [
"boolean",
"int",
"long",
"float",
"double",
"bytes",
"string",
"array",
"record",
"map",
"union"
],
"schema-default-type": "string",
"property-watch": "format"
}
}],
"filters": [{
"name": "Proxy authentication",
"condition": {
},
"show": [{
"name": "proxyUsername",
"type": "property"
},
{
"name": "proxyPassword",
"type": "property"
}
]
},
{
"name": "Increment an index",
"condition": {
"property": "paginationType",
"operator": "equal to",
"value": "Increment an index"
},
"show": [{
"name": "startIndex",
"type": "property"
},
{
"name": "maxIndex",
"type": "property"
},
{
"name": "indexIncrement",
"type": "property"
}
]
},
{
"name": "Token in Response Body",
"condition": {
"property": "paginationType",
"operator": "equal to",
"value": "Token in response body"
},
"show": [{
"name": "nextPageTokenPath",
"type": "property"
},
{
"name": "nextPageUrlParameter",
"type": "property"
}
]
},
{
"name": "Link in response body",
"condition": {
"property": "paginationType",
"operator": "equal to",
"value": "Link in response body"
},
"show": [{
"name": "nextPageFieldPath",
"type": "property"
}]
},
{
"name": "Custom pagination",
"condition": {
"property": "paginationType",
"operator": "equal to",
"value": "Custom"
},
"show": [{
"name": "customPaginationCode",
"type": "property"
}]
},
{
"name": "Pagination none",
"condition": {
"property": "paginationType",
"operator": "equal to",
"value": "None"
},
"show": [{
"name": "waitTimeBetweenPages",
"type": "property"
}]
},
{
"name": "OAuth 2 disabled",
"condition": {
"property": "oauth2Enabled",
"operator": "equal to",
"value": "false"
},
"show": [{
"name": "username",
"type": "property"
},
{
"name": "password",
"type": "property"
}
]
},
{
"name": "OAuth 2 enabled",
"condition": {
"property": "oauth2Enabled",
"operator": "equal to",
"value": "true"
},
"show": [{
"name": "authUrl",
"type": "property"
},
{
"name": "tokenUrl",
"type": "property"
},
{
"name": "clientId",
"type": "property"
},
{
"name": "clientSecret",
"type": "property"
},
{
"name": "scopes",
"type": "property"
},
{
"name": "refreshToken",
"type": "property"
}
]
},
{
"name": "JSON/XML Formatting",
"condition": {
"expression": "format == 'json' || format == 'xml'"
},
"show": [{
"name": "resultPath",
"type": "property"
},
{
"name": "fieldsMapping",
"type": "property"
}
]
},
{
"name": "CSV Formatting",
"condition": {
"property": "format",
"operator": "equal to",
"value": "csv"
},
"show": [{
"name": "csvSkipFirstRow",
"type": "property"
}]
}
]
}

我遇到的问题是,当我上传.jar到数据融合它工作得很好,然而,当我尝试上传.json时,它给了我错误:

"无效的插件JSON。请指定父工件。或者"插件配置应该是JSON格式">

一旦运行mvn clean package -DskipTests,将在插件repo的目标/目录中创建jar和json。应该使用jar和json来部署插件。

最新更新