If i have a topic in kafka that has messages which use integer as their keys. How to create a topic that is based on this topic but has no duplication and the messages are ordered by its key?

For example, let's say the topic name is "my_topic", and there are 5 messages in this topic:

key: "10", value: "{ value: 15 }"key: "13", value: "{ value: 40 }"key: "11", value: "{ value: 30 }"key: "10", value: "{ value: 15 }"key: "12", value: "{ value: 20 }"

Then, how to create a "ordered_deduplicated_my_topic" such that it has only 4 messages (becase the messages in are ordered asc by key, and the duplicated "10" was removed):

key: "10", value: "{ value: 15 }"key: "11", value: "{ value: 30 }"key: "12", value: "{ value: 20 }"key: "13", value: "{ value: 40 }"
3

Best Answer


I'm new here, so can't reply directly to comments.

This comment is in reference to setting a topic as a compacted topic in order to ensure a unique entry per key in Kafka logs: this would be an incorrect solution. Messages in compacted topics will still exist for a time until Kafka actually marks them for deletion (tombstones), and then actually removes them over time. This time is, by default, a ratio of dirty messages it cleans from time to time.

You can see and configure the clean ratio here: https://docs.confluent.io/current/installation/configuration/topic-configs.html#min.cleanable.dirty.ratio

You can also in effect configure how long messages are retained in a compacted log, similar to how default topics work, but ensuring the latest occurrence of a key always remains:https://docs.confluent.io/current/installation/configuration/topic-configs.html#min.compaction.lag.ms

The main caveat here, though, is to understand that compacted topics do not automatically remove old keys. They will actually keep them for a while longer, and even if we configure it to be very aggressive in getting rid of older messages, this is actually not advisable, because it can have multiple side effects, such as slow consumers, that suddenly lost their pointer (deleted), or even performance issues. This is a log after all, and removing ad-hoc entries is costly and time consuming.

Latest version of Kafka comes with exactly-once-delivery semantics which aims to write to Kafka exactly-once. If your kafka based solution is in beta phase then I would recommend you to update producers and consumers to use exactly-once-semantics. If you go with exactly-once-semantics then you won't have to worry about it at all.

If you do not have option to use exactly-once-semantics then Effective strategy to avoid duplicate messages in apache kafka consumer might help a little.

To achieve this, you should set cleanup.policy for this topic to compact, as shown below:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor 1 --config cleanup.policy=compact