安装
解压放到/opt/kafka, 软链一个latest出来, 先要启动zookeeper. 可以使用独立的zookeeper服务, 也可以用kafka自带的, 在lib目录下带了zookeeper. 启动zookeeper
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
然后启动kafka
./bin/kafka-server-start.sh ./config/server.properties
此时数据日志在 /tmp/kafka-logs/ , 而应用日志的路径是 ./logs , 直接运行时, 会报 Cannot open file /opt/kafka/kafka_2.12-2.0.0/bin/../logs/kafkaServer-gc.log due to Permission denied 这样的错误.
修改数据日志路径
在 server.properties, 修改 log.dirs=/tmp/kafka-logs
修改应用日志路径
这个是在log4j.properties里指定的, 变量是 ${kafka.logs.dir}, 可以通过设置环境变量 LOG_DIR 修改, 例如创建如下的sh脚本, 使用-daemon参数后, 启动后进入后台执行
export LOG_DIR=/tmp/kafka-application-logs/echo 'Kafka application logs set to ' $LOG_DIR/somewhere/bin/kafka-server-start.sh -daemon /somewhere/config/server.properties
在命令行测试Kafka
创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
读取topic列表
./bin/kafka-topics.sh --list --zookeeper localhost:2181test
另外, 可以配置为自动创建topic, 当发布的topic不存在时自动创建
发布消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testThis is a messageThis is another message
启动消费端
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginningThis is a messageThis is another message
应用场景
- 监控: 发送系统和应用程序健康相关的指标, 用独立的服务器收集和处理这些数据创建监控仪表盘并发送警告. LinkedIn还利用Apache Samza实现了一个能够实时处理事件的富调用图分析系统
- 传统的消息: 作为传统的消息队列实现消息的发布和订阅
- 分析: 为了更好地理解用户行为,改善用户体验,LinkedIn会将用户查看了哪个页面, 点击了哪些内容等信息发送到每个数据中心的Kafka集群上, 并通过Hadoop进行分析, 生成日常报告.
- 日志处理: 作为分布式应用程序或平台的构件, 大数据解决方案Pinot等产品将Kafka作为核心构件(分布式日志), 分布式数据库Espresso将其作为内部副本并改变传播层.