添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

kafka消息在过了保留周期之后会被自动清除。但总有一些情况,需要立刻删除消息。

假设这样场景:已经开始给kafka主题生产消息的应用发现了缺陷,接着bug修复程序需要更新,这是kafka主题中已经了一些错误的消息。这样场景通常在开发环境,我们需要的就是快速批量删除这些消息。

为了模拟环境,首先在kafka目录中创建 purge-scenario主题:

$ bin/kafka-topics.sh \
  --create --topic purge-scenario --if-not-exists \
  --partitions 2 --replication-factor 1 \
  --zookeeper localhost:2181

接着使用shuf命令生成随机数,然后通过kafka-console-producer.sh发送kafka主题:

$ /usr/bin/shuf -i 1-100000 -n 50000000 \
  | tee -a /tmp/kafka-random-data \
  | bin/kafka-console-producer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

shuf -i 1-100000 -n 50000000 :表示生成n个1-100000范围内随机数。

tee -a 前面命令结果写入文件 -a 表示追加;这是使用tee保存模拟数据是为了以后使用;

最后验证消费主题消息:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 3
76696
49425
Processed a total of 3 messages

在purge-scenario主题中的消息有缺省7天保留时间。为了删除消息,我们可以临时设置主题的 retention.ms 属性为10秒,然后等待其自动过期:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=10000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario \
  && sleep 10

现在验证消费是否过期:

$ bin/kafka-console-consumer.sh  \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:20:15,951] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

最后,我们要恢复主题的保留周期:

$ bin/kafka-configs.sh --alter \
  --add-config retention.ms=604800000 \
  --bootstrap-server=0.0.0.0:9092 \
  --topic purge-scenario

通过这个方法,kafka会删除主题所有分区的数据。

选择性删除消息

有时可能需要有选择性删除一个或多个主题的分区数据,可以使用kafka-delete-records.sh脚本实现。首先需要在delete-config.json 配置文件中指定分区级偏移量,我们打算分区的删除所有数据,partition指定分区,offset=-1:

"partitions": [ "topic": "purge-scenario", "partition": 1, "offset": -1 "version": 1

接着处理删除记录:

$ bin/kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file delete-config.json

现在验证从分区0获取数据:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario --partition=0 \
  --max-messages 1 --timeout-ms 1000
  44017
  Processed a total of 1 messages

接着从分区1获取数据:

$ bin/kafka-console-consumer.sh \
  --bootstrap-server=0.0.0.0:9092 \
  --from-beginning --topic purge-scenario \
  --partition=1 \
  --max-messages 1 --timeout-ms 1000
[2021-02-28 11:48:03,548] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TimeoutException
Processed a total of 0 messages

删除重新创建主题

另外方法是通过删除主题删除其所有数据,然后重新创建主题。当然只有服务端设置delete.topic.enable属性为true才可能删除主题:

$ bin/kafka-server-start.sh config/server.properties \
  --override delete.topic.enable=true

可以通过kafka-topics.sh命令删除主题:

$ bin/kafka-topics.sh \
  --delete --topic purge-scenario \
  --zookeeper localhost:2181
Topic purge-scenario is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

现在验证主题:

$ bin/kafka-topics.sh --zookeeper localhost:2181 --list

如果确认主题已不存在,然后再重新创建主题。

本文介绍了几种方式删除kafka主题数据。包括设置主题过期时间、删除主题所有数据及部分分区数据,到通过删除主题变相删除数据。

kafka消息在过了保留周期之后会被自动清除。但总有一些情况,需要立刻删除消息。假设这样场景:已经开始给kafka主题生产消息的应用发现了缺陷,接着bug修复程序需要更新,这是kafka主题中已经了一些错误的消息。这样场景通常在开发环境,我们需要的就是快速批量删除这些消息。本文介绍了几种方式删除kafka主题数据。包括设置主题过期时间、删除主题所有数据及部分分区数据,到通过删除主题变相删除数据。 停止每台机器上的kafka删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录; 删除zookeeper上与kafka相关的znode节点; 重启kafka、如果删除topic还在则需要重启zookeeper; 1、配置文件sparkStreaming.properties 2、动态加载配置文件LoadParameter类 3、主程序Kafka2SparkStreaming2Kafka 四、启动程序测试 1、创建相关kafka主题 2、kafka生产数据到程序 3、从写入到的kafka主题中读取数据 1、环境介绍 spark:
kafka删除topic消息的三种方式 方法一:快速配置删除法(确保topic数据不要了) 1.kafka启动之前,在server.properties配置delete.topic.enable=true 2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除 注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记为marked
Kafka主题浏览器 Ktop是Kafka主题浏览器。 它使列出kafka集群中的所有主题变得更加容易,使用typeahead快速查找主题,并检查主题数据。 git clone https://github.com/yichen/ktop.git cd ktop/ktop go install ktop {zookeeperserver:port}/{kafkacluster} 这将启动一个列出所有主题的控制台应用程序。 开始输入以利用预输入过滤功能。 要退出问题,请使用Ctrl-Q 要向下翻页,请使用向下翻页键或Ctrl-F要向上翻页,请使用向上翻页键或Ctrl-B 使用箭头键导航到特定主题,然后输入键检查主题
删除kafka topic及其数据,严格来说并不是很难的操作。但是,往往给kafka 使用者带来诸多问题。项目组之前接触过多个开发者,发现都会偶然出现无法彻底删除kafka的情况。本文总结多个删除kafka topic的应用场景,总结一套删除kafka topic的标准操作方法。 step1: 如果需要被删除topic 此时正在被程序 produce和consume,则这些生产和消费... 在kafka的使用、测试过程中,可能老是会遇到想要删除某个主题的需求,但是由于对其内部机制不够了解,总是无法有效的删除主题。今天这篇文章详细讨论如何彻底的删除一个主题删除主题的前提 在kafka的broker端的配置参数中有这样一个参数:delete.topic.enable,必须将它设置为true才能够删除kafka主题。这个参数的默认值是true。如果配置为false那么删除主...
Kafka手动删除主题 主题的元数据存储在Zookeeper中的/brokes/topics/ 和/config/topics路径下,主题中的消息数据存储在log.dir 或log.dirs配置的路径下,我们只需要手动删除这些地方的内容即可。总共分为三个步骤: 1.删除Zookeeper中的节点 /config/topics/topicName 2.**** 我们对Markdown编辑器进行了一些功能拓展与语法支持,除了标准的Markdown编辑器功能,我们增加了如下几点新功能,帮助你用它写博客:
Kafka主题删除机制 命令删除Kafka中当一个主题不再使用的时候,可以选择将其删除,以此来释放磁盘,文件句柄等资源,删除过程其实很简单,使用kafka-topics.sh脚本中的delete指令就可以。 bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic topic-delete 在删除完毕后,会有相应的提示信息,提示信息与broker端配置参数delete.topic.enable有关,必须将delete.
1、删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录 rm -rf TOPIC_1-* 2、删除zookeeper里面的topic信息 (1)登录zookeeper客户端:命令:./bin/zookeeper-client(或者通过可视化工具连接删除) (2)找到topic所在的目录:ls /brokers/topics (3)找到要删除的topic,执行命令:rmr /brokers/topics/【
本意利用kafka实现一个类似redis发布订阅的模式,比redis多了一个数据分区的功能。kafka里面的数据我们是不需要存储的,因此我们需要配置对应的删除策略 Kafka版本号 2.8.1 数据清理策略 kafka有两种数据清理策略,delete删除和compact压缩,默认是删除。 delete:一般是使用按照时间保留的策略,当不活跃的segment的时间戳是大于设置的时间的时候,当前segment就会被删除 compact: 日志不会被删除,会被去重清理,这种模式要求每个record都必须 1. 安装Kafka和相应的客户端库。 2. 创建一个主题(topic)来存储数据。 3. 创建一个生产者(producer)客户端,用于将数据发送到Kafka。 4. 将数据发送到生产者客户端。 5. 生产者客户端将数据分区并将其写入Kafka主题。 6. 可以使用消费者(consumer)客户端从主题中读取数据。 以下是一个基本的Java示例代码,演示如何将数据推送到Kafka: import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.KafkaProducer; public class KafkaProducerExample { public static void main(String[] args) throws Exception{ // 设置Kafka服务器的地址 String kafkaServer = "localhost:9092"; // 设置要连接的Kafka主题 String kafkaTopic = "my_topic"; // 设置Kafka生产者属性 Properties properties = new Properties(); properties.put("bootstrap.servers", kafkaServer); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 创建Kafka生产者 Producer<String, String> producer = new KafkaProducer<String, String>(properties); // 创建Kafka记录 ProducerRecord<String, String> record = new ProducerRecord<String, String>(kafkaTopic, "key", "value"); // 发送Kafka记录 producer.send(record); // 关闭Kafka生产者 producer.close(); 以上代码将一个键值对(key-value)发送到名为“my_topic”的Kafka主题。可以根据需要修改生产者属性,例如设置序列化程序、压缩算法等。