正在测试具有嵌套依赖项的Nifi类



如何在apache nifi中为org.apache.nifi.processors.standard.ValidateRecord处理器编写junit?这是我的代码

@Test
public void valiateRecordTest() throws IOException {
ValidateRecord validateRecord = new ValidateRecord();

TestRunner runner = TestRunners.newTestRunner(validateRecord); 
String inputJson = IOUtils.toString(this.getClass().getResourceAsStream("validateRecordInput.json"));
String inputSchema = IOUtils.toString(this.getClass().getResourceAsStream("validateRecordInputSchema.json"));
String expectedResult = IOUtils.toString(this.getClass().getResourceAsStream("validateRecordOutput.json"));

//runner.setProperty(ValidateRecord.SCHEMA_REGISTRY, inputSchema);
MockRecordParser reader = new MockRecordParser();
runner.addControllerService("reader", reader);           
runner.setProperty("Record Reader","JsonTreeReader");
runner.setProperty("Record Writer","JsonRecordSetWriter");
runner.setProperty("service", "reader");

runner.setProperty(reader,"Schema Registry","AvroSchemaRegistry"); //for JsonTreeReader
runner.setProperty(reader,"Schema Text",inputSchema); //for JsonRecordSetWriter
runner.enableControllerService(reader);
runner.enqueue(inputJson);
runner.run(1);
runner.assertQueueEmpty();
List<MockFlowFile> resultList = runner.getFlowFilesForRelationship("valid");         
........
}

JsonTreeReader有其他参数,比如Avro模式。我该如何设置?(因为我没有在这里传递JsonTreeReader的实例(。我得到以下错误

'Record Reader' is invalid because Record Reader is required
'Record Writer' is invalid because Record Writer is required
'Record Reader' validated against 'JsonTreeReader' is invalid because 'Record Reader' is not a supported property or has no Validator associated with it
'Record Writer' validated against 'JsonRecordSetWriter' is invalid because 'Record Writer' is not a supported property or has no Validator associated with it

TestRunner上有用于添加和启用控制器服务的方法。类似这样的东西:

def reader = new MockRecordParser()
runner.addControllerService("reader", reader)
runner.setProperty(SomeDescriptor, "reader")
runner.enableControllerService(reader)

您可以设置如下属性:

runner.setProperty(reader, DESCRIPTOR_HERE, "myvalue")

最新更新