更好的方法是将几种事件类型放在同一个 Kafka 主题中



假设有两种类型T1T2以及主题T。T1 和T2都必须进入主题T(出于某种原因)。有什么方法可以实现这一点?哪一个更好?

一种方法(许多)是利用继承,我们可以定义一个基类,然后子类可以扩展它。在我们的例子中,我们可以定义一个基类TB,然后T1T2可以扩展TB。

基类 (TB)

package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.extern.java.Log;
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Log
public class Animal implements Externalizable {
public String name;
public void whoAmI() {
log.info("I am an Animal");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
name = (String) in.readObject();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(name);
}
}

派生类 (T1)

package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Cat extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Cat");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}

派生类 (T2)

package poc.kafka.domain;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.java.Log;
@Log
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Dog extends Animal implements Externalizable {
private int legs;
public void whoAmI() {
log.info("I am a Dog");
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
super.readExternal(in);
legs = in.readInt();
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
super.writeExternal(out);
out.writeInt(legs);
}
}

反序列化器

package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import poc.kafka.domain.Animal;
public class AnimalDeserializer implements Deserializer<Animal> {
@Override
public Animal deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
}

序列化程序

package poc.kafka.domain.serialization;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;
import poc.kafka.domain.Animal;
public class AnimalSerializer implements Serializer<Animal> {
@Override
public byte[] serialize(String topic, Animal data) {
return SerializationUtils.serialize(data);
}
}

然后我们可以像下面这样发送T1T2

IntStream.iterate(0, i -> i + 1).limit(10).forEach(i -> {
if (i % 2 == 0)
producer.send(new ProducerRecord<Integer, Animal>("T", i, new Dog(i)));
else
producer.send(new ProducerRecord<Integer, Animal>("gs3", i, new Cat(i)));
});

这可能不是对问题的直接回答,而是在这里重新考虑某些方面的主张,这可能会解决原始问题。

首先,尽管 Kafka 能够支持任何数据格式,但对于可序列化的二进制格式,我建议使用Apache Avro而不是序列化的 Java 对象。

使用 Avro,您将获得紧凑的二进制、与语言无关的数据类型和广泛的工具集的所有好处。例如,有一些 CLI 工具可以在 Avro 中读取包含内容的 Kafka 主题,但我不知道有任何工具能够在那里反序列化 Java 对象。

你可以在这里阅读关于阿夫罗本身的信息

关于为什么使用Avro的一些很好的见解也可以在这个SO问题中找到 这里

第二。你的问题标题是关于事件类型的,但判断描述可能意味着"如何通过单个 Kafka 主题处理不同的数据类型"。如果事件之间的差异只是事件类型(例如,单击、提交、登录、注销等),则可以在内部保留具有此类型的enum字段,否则使用通用容器对象。

如果这些事件应该携带的数据有效载荷的结构存在差异,那么,同样,使用 Avro,您可以使用Union类型来解决它。

最后,如果数据差异如此之大,以至于这些事件基本上是不同的数据结构,没有任何重要的共同点 - 请使用不同的 Kafka 主题

尽管能够在同一主题中使用不同的分区来发送不同的数据类型,但它实际上只会在未来引起维护头痛和扩展限制,正如此处其他响应中正确指出的那样。因此,对于这种情况,如果可以选择不同的主题 - 最好这样做。

最简单的方法是使用您的自定义org.apache.kafka.common.serialization.Serializer,它将能够处理两种类型的事件。这两种类型的事件都应继承自同一类型/基于类。

示例代码可能如下所示:

public class CustomSerializer implements Serializer<T> {
public void configure(Map<String, ?> configs, boolean isKey) {
// nothing to do
}
public byte[] serialize(String topic, T data) {
// serialization
return null;
}
public void close() {
// nothing to do
}
} 

如果没有继承的概念,例如数据不像

Animal -> Cat
Animal -> Dog

然后另一种方法是使用包装器。

public class DataWrapper
{
private Object data;
private EventType type;
// getter and setters omitted for the sake of brevity
}

将所有事件放在包装器对象中,并用它们的EventType区分每个事件,例如,这可以是一个enum

然后,您可以按正常方式对其进行序列化(如您在问题中发布的那样),在反序列化时,您可以检查EventType,然后根据EventType将其委托给相应的事件处理器

此外,为了确保您的 DataWrapper 不会包装所有类型的数据,即应该仅用于特定类型的数据,那么您可以使用Marker接口并制作所有类,您将其对象推送到主题以实现此接口。

例如

interface MyCategory {
}

然后您的自定义类可以具有例如,

class MyEvent implements MyCategory {
}

在你可以拥有的DataWrapper..

public class DataWrapper<T extends MyCategory> {
private T data;
private EventType type;
// getters and setters omitted for the sake of brevity
}

最好的方法是创建自定义分区。

通过分区将每条消息生成到不同的分区

这是默认的实现,你需要实现你的分区逻辑。

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
} 
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

有关更多示例,请查看本教程。

这是卡夫卡关于何时选择服装分区的权威指南中的一段话。

实现自定义分区策略 到目前为止,我们已经讨论了 默认分区程序的特征,这是最常见的特征 使用。但是,Kafka 并不限制您只使用哈希分区,并且 有时有充分的理由以不同的方式对数据进行分区。为 例如,假设您是 B2B 供应商和最大的客户 是一家生产名为香蕉的手持设备的公司。 假设你用 cus-tomer "香蕉"做了很多生意,以至于 您超过10%的日常交易是与该客户进行的。如果您使用 默认哈希分区,Banana财务会计软件记录将分配给 与其他帐户相同的分区,导致一个分区 大约是其余的两倍。这可能会导致服务器用完 空间,处理速度等。我们真正想要的是给予Banana财务会计软件自己的分区,然后使用哈希分区来映射 其余帐户到分区。

相关内容

最新更新