📌
notes
  • Introduction
  • Java
    • JavaSE学习
    • 基础
    • 集合
    • 并发
    • JVM
      • GraalVM
    • I/O
    • Java8
    • Java9
    • Java学习常见问题汇总
      • 阿里巴巴Java开发手册
    • 读书和笔记
  • 常用框架
    • Disruptor
    • Guava
    • apache-commons框架
    • Servlet
    • Guice
    • Crypto
    • 字节码框架ASM
    • jOOQ
    • logging框架
    • JSON框架
    • Reflect反射
    • YAML框架
    • XML框架
    • JVM序列化框架
    • String字符串压缩
    • 序列化和反序列化
      • Avro
    • FastDFS
    • Quartz
    • JFinal
    • UUID
    • Objenesis框架
    • Proxy代理
    • Java和Kotlin、Groovy、Scala的代码和相互调用
    • 其他框架
  • MQ消息组件
    • Pulsar
    • RocketMQ
    • Kafka
      • Kafka部署
      • Kafka命令
      • Kafka运维问题
    • Jafka
    • InLong
    • RabbitMQ
    • ActiveMQ
    • OpenMessaging
    • MQTT
    • OpenMQ
    • ZeroMQ
    • HiveMQ
    • HornetQ
    • Artemis
    • Nanomsg
    • 自己做过的消息组件
  • 远程通讯和RPC框架
    • Netty
    • MINA
    • Hession
    • t-io
    • xSocket
    • Grizzly
    • Dubbo
    • gRPC
    • Thrift
    • Finagle
    • Jupiter
    • Motan
    • Tars
    • HSF
    • 自己实现simple RPC
  • CloudNative云原生
    • 容器
    • Docker
    • Kubernetes
    • Istio
    • Pouch
    • 其他
      • Docker镜像站国内网站
  • Reactive响应式编程
    • Reactor
    • ReactiveX
    • WebFlux
    • RSocket
    • Akka
    • Ratpack
  • 架构和设计
    • 设计模式
    • TDD、BDD和DDD
    • 接口幂等性设计
    • 权限设计
  • 分布式
    • 分布式算法
    • 分布式事务与一致性算法
    • 分布式事务框架
    • 常见分布式集群选举机制总结
    • 分布式锁实现
    • 分布式ID
    • 分布式缓存
    • 分布式存储系统
    • 分布式数据库
  • 数据结构与算法
    • 数据结构
      • 数据结构学习
      • 各种树的复杂度和原理
    • 算法
      • 常见算法
      • 递归算法
      • [Top K 之海量数据找出现次数最多或,不重复的](docs/tech/Algorithm/Top K之海量数据找出现次数最多或不重复的.md)
      • 256M的内存如何对16g的数组进行排序
      • 比较两个大文件的重复数据的算法
      • 负载均衡原理、种类和算法
  • 网关
    • API服务网关
      • Zuul2
      • Zuul1
      • Kong
    • 代理服务器
      • Nginx
      • Tengine
      • LittleProxy
      • Apache HTTP
    • 限流熔断
      • Sentinel
      • Resilience4j
      • Eureka
      • SnowJena
  • 模块化和类隔离
    • sofa-jarslink
    • Pandora
    • Java9
    • JarsLink
  • 网络和HTTP
    • HTTP客户端
      • Unirest
      • Feign
      • HttpClient
      • OkHttp
      • http-jersey
      • JDK NIO/BIO
      • HTTP和HTTPS、TCPIP、ajax、OSI七层协议、TCPIP四层协议
      • IP地址分类-内网IP
      • NAT和UDP穿孔打洞、HTTP隧道
      • DNS域名
      • 常用的DNS服务器
  • 缓存和KV数据库
    • Redis
      • Redis运维命令
      • Redis为什么默认16个数据库
      • Redis端口由来和命令前缀PF由来
      • 几款开源的图形化Redis客户端管理软件
      • Redis数据类型
      • Redis线程模型
      • Redis集群规范
      • Redis数据文件解析和内存分析
      • Bitmap介绍
      • Redis集群客户端命令
      • Redis快的秘诀
      • Redis的过期策略和内存淘汰策略
    • Memcached
    • Caffeine
    • JetCache
    • JCache
    • GuavaCache
    • ConcurrentLinkedHashMap
    • EhCache
    • Hazelcast
    • Codis
    • Tair
  • 注册中心和配置中心
    • 注册中心
      • ZooKeeper
      • Nacos
      • Etcd
      • Consul
      • ZKWeb
    • 配置中心
      • Apollo
      • Disconf
      • XDiamond
      • XXL-CONF
  • 系统监控
    • 进程监控
      • Prometheus
      • Zabbix
    • 在线诊断工具
      • JVM SandBox
      • Anthas
      • BTrace
      • Greys-Anatomy
      • HouseMD
  • 数据库
    • 数据库产品
      • MySQL
        • MySQL时区问题
      • Oracle
      • OceanBase
      • MongoDB
    • 数据库操作框架
      • DataSource
      • MyBatis
      • MyBatis-Plus
      • Hibernate
      • ThinkJD
      • JOOQ
    • 数据库中间件
      • MyCat
      • Druid
      • ShardingSphere
      • Zdal
    • 轻量级数据库
      • H2
      • SQLite
      • Derby
      • InfluxDB
    • 数据迁移
      • Yugong
    • Liquibase
    • Otter
    • 数据库工具
      • DataGrip
      • Navicat
      • PL/SQL Developer
      • PL/SQL
  • 大数据处理
    • 流处理
      • Flink
      • JStorm
      • Storm
      • Flume
      • Spark
      • Beam
      • Samza
      • Hadoop
      • HBase
      • druid-io
    • 搜索
      • Elasticsearch
      • Lucene
      • Solr
    • Spider爬虫
      • Jsoup
      • Crawler4j
  • SOFA
    • sofa-rpc
    • sofa-mesh
    • sofa-boot
    • sofa-bolt
    • sofa-ark
    • sofa-jarslink
  • Netflix
    • zuul2
    • zuul1
    • Eureka
    • Hystrix
    • Ribbon
    • Turbine
    • Archaius
    • Governator
  • Micronaut Framework
    • XXX
  • WebService
    • XXX
  • Web应用容器
    • Jetty
    • Tomcat
    • Undertow
    • JBoss
    • Jersey
    • QuickServer
    • WebLogic
    • WebSphere
  • Linux
    • 查看Linux基本属性信息
    • 查看Linux下CPU占用情况
    • 查看LINUX进程内存占用情况
    • 查看Linux下端口占用情况
  • Spring
    • SpringBoot
      • websocket
      • web
      • sqlite
      • mybatis-generator
      • tkmapper
      • swagger
      • webflux
      • starter
      • security
      • kafka
      • cache
      • activemq
      • actuator
      • admin
      • druid
      • jooq
      • kafka
      • multidatasource
    • SpringCloud
      • zuul
      • stream
      • web
      • sleuth
      • example
      • hystrix
      • eureka
      • consul
      • config
    • SpringMVC
      • kafka
      • jooq
      • example
      • annotation
      • activemq
    • SpringData
      • solr
      • redis
      • elasticsearch
  • 测试
    • 单元测试
      • JUnit
      • TestNG
      • Arquillian
    • Mock测试
      • Mockito
      • Spock
      • Moco框架
    • 压力测试
      • JMeter
      • LoadRunner
    • 自动化测试
      • Selenium
    • 基准测试
      • JMH
  • 物联网IoT
  • 运维
    • 日常遇到的问题
  • 操作系统OS
    • Linux
      • 操作系统
      • CPU工作原理
      • Linux环境变量修改
      • Linux用户空间与内核空间、地址空间
      • TCP参数
      • 多进程和多线程的区别
      • 操作系统-内存管理机制
    • MAC
      • MacOS使用
      • Mac软件安装
      • Homebrew
      • Darwin操作系统
      • Kap录屏软件
      • Mac让终端走代理的几种方法
    • Windows
      • Windows介绍
      • Batch批处理-CMD-MS-DOS
  • 其他语言
    • 编程语言
    • Golang
    • Python
    • Lua
    • Erlang
    • Ruby
    • C++/C
    • C#语言
  • 新技术
    • 区块链
      • Ethereum以太坊
      • Bitcoin
      • NFT
    • 机器学习
    • tensorflow
    • 人工智能技术
    • 蚂蚁金服共享智能技术实践:如何降低数据共享的难度
    • 人工智能
    • CloudComputing云计算
    • EdgeComputing边缘计算
    • GraphRAG
  • 前端开发
    • VueJS
    • Angular
    • Bootstrap
    • ECharts
    • RequireJS
    • zTree
    • Layui
    • JavaScript
  • 开发工具
    • 版本控制系统VCS
      • SVN学习
      • Git
        • Git常用命令
        • Git命令学习
        • Github学习
        • GitLab
        • Git-flow
        • Mac通过git统计代码行数
        • 巧用GithubAction同步代码到Gitee
        • 通过Git分支来规范代码上线流程
      • Mercurial
    • Gitbook
    • 集成开发环境IDE
      • IDEA
        • IDEA学习
        • IDEA插件
        • IDEA注释配置
        • IDEA热加载工具JRebel
        • [Mac下IntelliJ IDEA快捷键大全](docs/Tools/IDE/IDEA/Mac下IntelliJ IDEA快捷键大全.md)
      • Eclipse
        • Eclipse快捷键
        • Eclipse插件安装
        • 使用Eclipse进行远程调试
    • Nexus
    • 项目管理
      • Maven
        • Maven中optional和scope元素的使用
      • Gradle
      • Ant
      • Ivy
    • 代码扫描
      • SonarQube
      • PMD
      • FindBugs
      • Checkstyle
    • DevOps工具
      • Jenkins
    • Blog文章笔记写作系统
      • 写文章的模板
      • GitHubPages写博客方法
    • 文件格式
      • Pandoc标记语言转换工具
      • AsciiDoc文件
      • Markdown文件
        • Markdown语法
        • md模板
        • Markdown语法模板
        • Typora模板
  • 技术其他
    • 技术圈的反对种族歧视
    • 系统名词
  • Interview面试
    • 面试学习技术网站
    • 简历和面试
    • 面试指南
    • 备战面试
    • 面经
    • 常见的学习网站
      • 开源组织和公司开源项目地址和网站
      • 框架网站
      • 其他学习网站
      • 待学习
    • 开源项目
  • 读书和笔记
    • Java
      • [Effective Java中文版](docs/Book/Java/Effective Java中文版.md)
      • Java多线程编程核心技术
      • Java编程思想
      • 深入理解Java虚拟机JVM高级特性与最佳实践
      • 码出高效:Java开发手册
      • Java程序性能优化
    • DB数据库
      • 深入浅出MyBatis技术原理与实战
      • 高性能MySQL
    • 网络HTTP
      • 图解HTTP
      • TCPIP详解:卷一
    • Linux
      • 鸟哥的Linux私房菜
    • Netty
      • Netty权威指南
    • Spring
    • Redis
    • Python
      • Python之禅
    • JavaScript
      • JavaScript语言精粹
    • 数据结构算法
      • 大话数据结构
    • 分布式
      • 从PAXOS到ZOOKEEPER分布式一致性原理与实践
    • 并发
      • Java并发编程的艺术
      • Java并发编程之美
      • Java并发编程实战
      • 七周七并发模型
      • 多处理器编程的艺术
    • 架构设计
      • 代码整洁之道
      • 重构
      • 亿级网站架构核心技术
      • 可伸缩服务架构
      • 大型网站技术架构-核心原理与案例分析
      • 大型网站系统与Java中间件实践
      • [Design of Design](docs/Book/架构设计/Design of Design.md)
      • 人月神话-软件项目管理之道
      • 微服务架构设计模式
      • 深入浅出设计模式
      • 面向模式的软件架构
      • 没有银弹-软件工程的本质性与附属性工作
      • 设计模式-可复用面向对象软件的基础
    • Interview面试
      • 剑指Offer
      • 程序员面试宝典
      • 程序员面试金典
    • 技术其他
      • 具体数学
      • 人类网络·社会位置决定命运
      • 性能之巅
      • 浪潮之巅
      • 编写可读代码的艺术
    • 英语
    • 医学
      • 人体使用手册
      • 普通生物学
    • 历史
      • 国史大纲
      • 明朝那些事儿
      • 万历十五年
      • 中国通史
      • 第二次世界大战战史
      • 世界史:从史前到21世纪全球文明的互动
      • 全球通史:从史前史到21世纪
    • 股票
      • 炒股的智慧
    • 其他
      • 上帝掷骰子吗
      • 我不是教你诈
      • 拆掉思维力的墙
      • 最优输运理论专题
      • 说话之道
  • 其他学习
    • 英语
      • 英语学习
      • 英语单词
      • 计算机专业英语
    • 医学
      • 医学常识
      • 腰椎间盘突出治疗
    • 股票
      • 经济名词
      • 股票书籍
    • 历史
      • 中国历史
      • 中国近现代历史
    • 地理
    • 汉语文学
      • 文人
        • 王勃
        • 苏轼
        • 毛泽东诗词
      • 古诗词
        • 满江红·写怀
        • 满江红·登黄鹤楼有感
      • 名言名句
      • 近现代小说
        • 鬼吹灯
      • 三国演义
      • 上帝的指纹
      • 地球编年史
      • 橘子不是唯一的水果
      • 红楼梦
  • 日常常识
    • 搭建梯子VPN
      • VPN的使用
      • V2Ray
      • trojan
      • Shadowsocks影梭
      • SOCKS
      • 内网穿透
    • 电脑组装
      • 电脑DIY
      • CPU和主板
      • 内存和硬盘
      • 帧率FPS和分辨率和像素
    • 硬盘知识
    • 买房购房
    • 收房前预攒钱
    • 汽车
    • 信用卡分类
    • 生活小常识
    • 眼镜
    • 运动健身
    • 游戏Games
      • 常见的游戏
      • 游戏公司
      • 游戏名词
      • 魔兽争霸和魔兽世界
      • 英雄联盟LOL
      • 王者荣耀
    • 影视电影
      • 电影评分网站
      • 美剧
        • 冰与火之歌-权利的游戏
        • 哈利·波特七部
        • 指环王
        • 死亡笔记
      • 动漫
    • 圣经
      • 圣经介绍
    • 神话传说
      • 中国神话传说
      • 佛教
      • 希腊神话
      • 西游记
      • 封神演义
    • 城市介绍
      • 南京
      • 杭州
      • 河南
    • NFC
    • SIM卡的PIN码和PUK码
    • 不可能的物体
    • 中国三大电信公司
    • 八卦和六十四卦
    • 关内、关外、关中、关东
    • 切尔诺贝利核电站爆炸
    • 古代四大文明和七大奇迹
    • 各种单位换算
    • 工作总结和绩效考核
    • 手机OTG功能
    • 方舱医院
    • 移动通信标准和分类
    • 网络流行语
    • 美国常见的公司
    • 美国政治常识
    • 视频处理和图片处理软件
    • 运营知识
    • Google Cloud Platform免费申请
    • 饭圈词汇
    • 常识名词
  • 说明
  • 待办
    • ReadingList
Powered by GitBook
On this page
  • Kafka操作命令
  • 启动
  • 停止
  • 创建主题Topic
  • 列出主题列表
  • 查看主题详细信息
  • 查看topic指定分区offset的最大值或最小值
  • 修改主题配置
  • 为topic增加分区(不支持减partition)
  • 删除主题
  • 生产者Producer发送消息
  • 消费者Consumer消费主题
  • 查看有哪些消费者Group
  • 查看Group详情,查看消费者消费偏移量
  • 获取指定Consumer Group的位移信息(新的2.x的版本不能使用)
  • 修改消费组的offset
  • 查看内部主题__consumer_offsets(保存Consumer Group消费位移信息的Topic)
  • 如何增加__consumer_offsets的副本数?其他Topic主题也是一样
  • 设置修改Consumer Group的offset
  • 删除Group
  • 查看新消费者详情
  • 查看broker的配置
  • 新增修改broker的配置
  • 查看kafka的zookeeper上的数据

Was this helpful?

  1. MQ消息组件
  2. Kafka

Kafka命令

PreviousKafka部署NextKafka运维问题

Last updated 2 years ago

Was this helpful?


Kafka操作命令

启动

bin/zookeeper-server-start.sh config/zookeeper.properties &

bin/kafka-server-start.sh config/server.properties &

或者

nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties &

nohup sh bin/kafka-server-start.sh config/server.properties &

停止

bin/zookeeper-server-stop.sh

bin/kafka-server-stop.sh

创建主题Topic

#replication-factor 表示该topic需要在不同的broker中保存几份, partitions为几个分区

现在我们要创建一个含有6个Partition分区,每个分区3个备份的topic:
bin/kafka-topics.sh --create --partitions 6 --replication-factor 3  --topic topic03 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

老的版本使用zookeeper地址
bin/kafka-topics.sh --create --partitions 6 --replication-factor 3 --topic localhost.test.topic --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

列出主题列表

bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

查看主题详细信息

bin/kafka-topics.sh --describe --topic topic03 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

一次查询多个topic
bin/kafka-topics.sh --describe --topic k2.tomcat.log,k2.tomcat.log2 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094


bin/kafka-topics.sh --describe --topic topic03 --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

一次查询多个topic
bin/kafka-topics.sh --describe --topic topic01,topic02,topic03 --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

Topic:dream    PartitionCount:5    ReplicationFactor:2    Configs:
    Topic: topic01    Partition: 0    Leader: 1    Replicas: 1,2    Isr: 1,2
    Topic: topic01    Partition: 1    Leader: 2    Replicas: 2,3    Isr: 2,3
    Topic: topic01    Partition: 2    Leader: 3    Replicas: 3,1    Isr: 3,1
    Topic: topic01    Partition: 3    Leader: 1    Replicas: 1,3    Isr: 1,3
    Topic: topic01    Partition: 4    Leader: 2    Replicas: 2,1    Isr: 2,1

leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
Replicas:列出了所有的副本节点,不管节点是否在服务中.
Lsr:是正在服务中的节点.

查看topic指定分区offset的最大值或最小值

--time,为毫秒值,-time=-1时表示最大值latest,为-2时表示最小值earliest。该offset值是所写时间戳之后的第一条数据。结果为:

具体的时间戳
bin/kafka-run-class.sh kafka.tools.GetOffsetShell  -topic topicName --time 1656643531770 --broker-list broker1:9092,broker2:9092


查询offset的最大值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic03 --time -1 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic03 --time -1 --partitions 0 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094


查询offset的最小值
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --topic topic03 --time -2 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

修改主题配置

bin/kafka-configs.sh --alter --topic topic03 --add-config max.message.bytes=20480000 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

# 这个是错误的语法,已经改成上面的kafka-configs.sh这种方式了
bin/kafka-topics.sh --alter --topic topic03 --config max.message.bytes=102400000 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-topics.sh --alter --topic topic03 --config max.message.bytes=102400000 --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka

为topic增加分区(不支持减partition)

为topic增加partition

bin/kafka-topics.sh --alter --topic topic03 --partitions 3 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-topics.sh --alter --topic topic03 --partitions 6 --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka


如何增加__consumer_offsets的副本数?其他Topic主题也是一样
可使用kafka-reassign-partitions.sh来增加__consumer_offsets的副本数,方法如下:

构造一JSON文件reassign.json:
    {
    "version":1,
    "partitions":[
    {"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
    ...
    {"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]}
    ]
    }

然后执行:
bin/kafka-reassign-partitions.sh --reassignment-json-file reassign.json --execute --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

“[1,2,3]”中的数字为broker.id值。

删除主题

bin/kafka-topics.sh --delete --topic topic03 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
bin/kafka-topics.sh --delete --topic topic03 --zookeeper 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183/kafka


kafka删除topic方法
1) bin/kafka-topics.sh --delete --zookeeper master:2181 --topic DreamTopic
如果删除后查看topic显示为:marked for deletion  则需要在每一台机器中的 config/server.properties 文件加入  delete.topic.enable=true,然后重启kafka

2) 删除kafka存储目录(server.properties文件log.dirs配置,默认为"/tmp/kafka-logs")相关topic目录删除zookeeper "/brokers/topics/"目录下相关topic节点

生产者Producer发送消息

bin/kafka-console-producer.sh --topic topic03 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-console-producer.sh --topic topic03 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

消费者Consumer消费主题

bin/kafka-console-consumer.sh --topic topic03 --from-beginning --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-console-consumer.sh --topic topic03 --from-beginning --group test.group --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094


1) 从头开始 
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
bin/kafka-console-consumer.sh --topic test --offset earliest --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

2) 从尾部开始(默认)
bin/kafka-console-consumer.sh --topic test --offset latest --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

3) 指定分区 
bin/kafka-console-consumer.sh --topic test --offset latest --partition 1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

4) 取指定个数 
bin/kafka-console-consumer.sh --topic test --offset latest --partition 1 --max-messages 1 --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

5) 新消费者(ver>=0.9) 
bin/kafka-console-consumer.sh --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

查看有哪些消费者Group

2) API方式(新)
bin/kafka-consumer-groups.sh --list --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094


1) 分ZooKeeper方式(老)
bin/kafka-consumer-groups.sh --list --zookeeper 127.0.0.1:2181/kafka

查看Group详情,查看消费者消费偏移量

bin/kafka-consumer-groups.sh --list --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
bin/kafka-consumer-groups.sh --group testgroup --describe --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

bin/kafka-consumer-groups.sh --list --zookeeper 10.1.128.60:2181,10.1.128.61:2181,10.1.128.62:2181
bin/kafka-consumer-groups.sh --group testgroup --describe --zookeeper 10.1.128.60:2181,10.1.128.61:2181,10.1.128.62:2181

获取指定Consumer Group的位移信息(新的2.x的版本不能使用)

需consumer.properties中设置exclude.internal.topics=false:
1) 0.11.0.0版本之前: 
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" 

2) 0.11.0.0版本以后(含): 
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 

修改消费组的offset

设置为最初偏移量:
bin/kafka-consumer-groups.sh --group testgroup --topic topic03 --reset-offsets --to-earliest --execute --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

设置最近偏移量
bin/kafka-consumer-groups.sh --group testgroup --topic topic03 --reset-offsets --to-latest --execute --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094


设置任意偏移量:
bin/kafka-consumer-groups.sh --group lengfeng.consumer.group --topic kafka_flink_mysql --reset-offsets --to-offset 3 --execute --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

查看内部主题__consumer_offsets(保存Consumer Group消费位移信息的Topic)

需consumer.properties中设置exclude.internal.topics=false:

1) 0.11.0.0之前版本 
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning 

2) 0.11.0.0之后版本(含) 
bin/kafka-console-consumer.sh --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094

老的版本使用zookeeper查询
bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

如何增加__consumer_offsets的副本数?其他Topic主题也是一样

可使用kafka-reassign-partitions.sh来增加__consumer_offsets的副本数,方法如下:

构造一JSON文件reassign.json:
    {
    "version":1,
    "partitions":[
    {"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]},
    {"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},
    {"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},
    {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},
    ...
    {"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]}
    ]
    }

然后执行:
kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute
“[1,2,3]”中的数字为broker.id值。


__consumer_offsets
__consumer_offsets是kafka内置的Topic,在0.9.0.0之后的Kafka,将topic的offset 信息由之前存储在zookeeper上改为存储到内置的__consumer_offsets中。

server.properties中的配置项num.partitions和default.replication.factor对__consumer_offsets无效,而是受offsets.topic.num.partitions和offsets.topic.replication.factor两个控制。 

设置修改Consumer Group的offset

执行zkCli.sh进入zookeeper命令行界面,假设需将group为testgroup的topic的offset设置为2018,则:set /consumers/testgroup/offsets/test/0 2018

如果kakfa在zookeeper中的根目录不是“/”,而是“/kafka”,则: 

set /kafka/consumers/testgroup/offsets/test/0 2018 

另外,还可以使用kafka自带工具kafka-run-class.sh kafka.tools.UpdateOffsetsInZK修改,命令用法: 

kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic 

从用法提示可以看出,只能修改为earliest或latest,没有直接修改zookeeper灵活。 

删除Group

老版本的ZooKeeper方式可以删除Group,新版本则自动删除,当执行:
kafka-consumer-groups.sh --group test --delete --bootstrap-server 127.0.0.1:9092

输出如下提示: 

查看新消费者详情

# 仅支持offset存储在zookeeper上的:

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test 

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group CONSUMER_TOPICTEST_ALL_MSGTEST_CLUSTER_GROUP --topic topicTest --zookeeper 10.1.243.23:52181

查看broker的配置

bin/kafka-configs.sh --describe --all --broker 0 --bootstrap-server localhost:9092

新增修改broker的配置

bin/kafka-configs.sh --alter --add-config k3=v3 --broker 0 --bootstrap-server localhost:9092
bin/kafka-configs.sh --alter --add-config 'log.cleaner.threads.config=50' --entity-default --entity-type brokers  --bootstrap-server localhost:9092

查看kafka的zookeeper上的数据

  1. 查看Kakfa在zookeeper的根目录 ls /kafka

  2. 查看brokers ls /kafka/brokers

  3. 查看有哪些brokers(214和215等为server.properties中配置的broker.id值): ls /kafka/brokers/ids

  4. 查看broker 214,下列数据显示该broker没有设置JMX_PORT: get /kafka/brokers/ids/214

  5. 查看controller,下列数据显示broker 214为controller: get /kafka/controller

  6. 查看kafka集群的id: get /kafka/cluster/id

  7. 查看有哪些topics: ls /kafka/brokers/topics

  8. 查看topic下有哪些partitions: ls /kafka/brokers/topics/__consumer_offsets/partitions

  9. 查看“partition 0”的状态: get /kafka/brokers/topics/__consumer_offsets/partitions/0/state

  10. 如何增加__consumer_offsets的副本数? 可使用kafka-reassign-partitions.sh来增加__consumer_offsets的副本数,方法如下:

    构造一JSON文件reassign.json:

{ "version":1, "partitions":[ {"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]}, {"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]}, {"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]}, {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]}, ... {"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]} ] } 然后执行:

kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute “[1,2,3]”中的数字为broker.id值。

__consumer_offsets __consumer_offsets是kafka内置的Topic,在0.9.0.0之后的Kafka,将topic的offset 信息由之前存储在zookeeper上改为存储到内置的__consumer_offsets中。

server.properties中的配置项num.partitions和default.replication.factor对__consumer_offsets无效,而是受offsets.topic.num.partitions和offsets.topic.replication.factor两个控制。 

kafka不支持topic修改副本数量:

bin/kafka-topics.sh --alter --topic db.192_168_5_14_3319_wac_trinity.position --replication-factor 3 --partitions 1 --bootstrap-server 172.16.48.182:9011,172.16.48.182:9012,172.16.48.183:9011 Option "[replication-factor]" can't be used with option "[alter]"


Kafka操作命令
查看kafka的zookeeper上的数据
Kafka常用命令收录
Kafka操作命令