bin/kafka-consumer-groups.sh --bootstrap-server {你的地址以及端口} --delete --group {你的消费组主题}
基于时间的保留策略会根据消息的时间戳来删除旧的消息,而基于大小的保留策略会根据分区的大小来删除旧的消息。消息排序和传输保证:Kafka保证了消息的有序性,每个分区内的消息是有序的,并且可以根据时间戳进行全局有序。一旦消息的时间戳超过了设定的过期时间,Kafka会将其标记为过期,并在后续的清理过程中删除这些过期的消息。默认的分区选择策略是基于消息的key进行哈希计算,以保证相同key的消息进入同一个分区,从而保证相同key的消息的有序性。Kafka保证每个分区内的消息顺序,但不保证跨分区的消息有序。
Kafka生产者通过topic发送数据,topic只是一个逻辑概念,真正存储数据的位置是分区,分区在broker机器上对应的是文件夹(topic名称-分区号)
分区内部存储了数据文件,也是分段存储的。在一个分区下可能存在多个日志分区段(segment)
每个段都对应了3个文件:.index索引文件、.log真正的数据文件、.timeindex时间索引文件
2. 为什么要分段?
删除无用文件(已经被消费过很长时间的文件)更方便,提高磁盘利用率
查找数据更便捷
执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manage集群管理工具删除。参考链接:https://cloud.tencent.com/developer/article/1590094。,只需要修改server.properties配置文件的delete.topic.enable为true就可以了。kafka启动之前,在server.properties配置。
一、更改日志输出级别
config/log4j.properties中日志的级别设置的是TRACE,在长时间运行过程中产生的日志大小吓人,所以如果没有特殊需求,强烈建议将其更改成INFO级别。具体修改方法如下所示,将config/log4j.properties文件中最后的几行中的TRACE改成INFO,修改前如下所示:
log4j.logger.kafka.network.RequestC...
写在前面:各位看到此博客的小伙伴,如有不对的地方请及时通过私信我或者评论此博客的方式指出,以免误人子弟。多谢!
默认情况下,当项目启动时,监听器就开始工作(监听消费发送到指定 topic 的消息)。如果我们想让监听器在程序运行的过程中能够动态地开启、关闭监听器,可以借助 KafkaListenerEndpointRegistry 实现,只需要定义两个 controller 接口分别通过 KafkaListenerEndpointRegistry 来控制监听器的开启、关闭即可,以下记录下测试代码:
方法一:快速配置删除法(简单粗暴,如果这个主题有程序还在消费都,此时KAFKA就game over)
1.kafka启动之前,在server.properties配置delete.topic.enable=true
2.执行命令bin/kafka-topics.sh --delete --topic test --zookeeper zk:2181或者使用kafka-manager集群管理工具删除
注意:如果kafka启动之前没有配置delete.topic.enable=true,topic只会标记
# 指定kafka server的地址,集群配多个,中间,逗号隔开
bootstrap-servers: 175.24.42.150:9092
#如果'enable.auto.commit'为
springboot集成kafka还是挺方便了,spring提供了KafkaTemplate,用来生产消息,消费者消费消息,就需要使用KafkaListener了,创建和删除topic使用AdminClient。
Kafka消费组可以通过Kafka客户端的命令行方式连接Kafka实例完成删除。
查询消费组
kafka-consumer-groups.sh --bootstrap-server {kafka连接地址} --list
[root@zk-server-1 bin]# ./kafka-consumer-groups.sh --bootstrap-server 172.31.1.245:9091,172.31.1.86:9091,172.31.1.128:9091 --list