我可以获得使用avro kafka消息的示例代码吗



我刚刚建立了Datatorrent RTS(Apache Apex)平台并运行了pi演示。我想使用来自kafka的"avro"消息,然后将数据聚合并存储到hdfs中。我可以得到一个例子代码这个或卡夫卡?

这是一个完整的工作应用程序的代码,它使用了Apex Malhar中的新Kafka输入运算符和文件输出运算符。它将字节数组转换为字符串,并使用有界大小(本例中为1K)的滚动文件将它们写入HDFS;在文件大小达到绑定之前,它将具有一个扩展名为.tmp的临时名称。您可以按照DevT在中的建议在这两个运算符之间插入额外的运算符https://stackoverflow.com/a/36666388):

package com.example.myapexapp;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
{
  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
    in.setInitialPartitionCount(1);
    in.setTopics("test");
    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
    //in.setClusters("localhost:2181");
    in.setClusters("localhost:9092");   // NOTE: need broker address, not zookeeper
    LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
    out.setFilePath("/tmp/FromKafka");
    out.setFileName("test");
    out.setMaxLength(1024);        // max size of rolling output file
    // create stream connecting input adapter to output adapter
    dag.addStream("data", in.outputPort, out.input);
  }
}
/**
 * Converts each tuple to a string and writes it as a new line to the output file
 */
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;
  private String fileName;
  @Override
  public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }
  @Override
  protected String getFileName(byte[] tuple) { return fileName; }
  public String getFileName() { return fileName; }
  public void setFileName(final String v) { fileName = v; }
}

在高级应用程序代码类似于

KafkaSinglePortStringInputOperator->AvroToPojo->维度聚合器->AbstractFileOutputOperator 的实现

KafkaSinglePortStringInputOperator-如果使用其他数据类型,可以使用KafkaSinglePortByteArrayInputOperator或编写自定义实现。

AvroToPojo-https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java.

此运算符将GenericRecord转换为用户给定的POJO。用户需要给定一个POJO类,否则将使用反射。目前,它用于从容器文件中读取GenericRecords,并且只支持基元类型。为了阅读Kafka,您可以按照类似的方式对操作员进行建模,并添加一个Schema对象来解析传入的记录。在处理过程中,Tuple方法应该可以工作,Schema Schema=新Schema.Parser().parse());GenericDatumReader reader=新的GenericDaumReader(架构);

维度聚合器-您可以选择此处给定的聚合器之一-https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensions或者按照相同的思路编写自定义聚合器。

FileWriter——来自上面文章中的例子。

相关内容

  • 没有找到相关文章

最新更新