kafka基础

基本概念

Kafka 是一个分布式的基于发布/订阅模式的消息队列

举个例子,生产者消费者,生产者生产鸡蛋,消费者消费鸡蛋,生产者生产一个鸡蛋,消费者就消费一个鸡蛋,假设消费者消费鸡蛋的时候噎住了(系统宕机了),生产者还在生产鸡蛋,那新生产的鸡蛋就丢失了。再比如生产者很强劲(大交易量的情况),生产者1秒钟生产100个鸡蛋,消费者1秒钟只能吃50个鸡蛋,那要不了一会,消费者就吃不消了(消息堵塞,最终导致系统超时),消费者拒绝再吃了,”鸡蛋“又丢失了,这个时候我们放个篮子在它们中间,生产出来的鸡蛋都放到篮子里,消费者去篮子里拿鸡蛋,这样鸡蛋就不会丢失了,都在篮子里,而这个篮子就是”kafka“。 鸡蛋其实就是“数据流”,系统之间的交互都是通过“数据流”来传输的(就是tcp、https),也称为报文,也叫“消息”。 消息队列满了,其实就是篮子满了,”鸡蛋“ 放不下了,那赶紧多放几个篮子,其实就是kafka的扩容。

应用场景

  1. 消息
  2. 网站追踪活动
  3. 指标
  4. 日志聚合
  5. 流处理
  6. 时间采集
  7. 提交日志 以流处理为例: kafka中消息处理一般包含多个阶段。其中原始输入数据是从kafka主题消费的,然后汇总,丰富,或者以其他的方式处理转化为新主题,例如,一个推荐新闻文章,文章内容可能从“articles”主题获取;然后进一步处理内容,得到一个处理后的新内容,最后推荐给用户。这种处理是基于单个主题的实时数据流。

消息队列的两种模式

点对点模式

  1. 一对一,消费者主动拉取数据,消息收到后消息清除
  2. 消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。
  3. 消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。
  4. Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

发布/订阅模式

  1. 一对多,消费者消费数据之后不会清除消息
  2. 消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消
  3. 息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。

基本概念

  1. Producer :消息生产者,就是向 kafka broker 发消息的客户端;
  2. Consumer :消息消费者,向 kafka broker 取消息的客户端;
  3. Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负 责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所 有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
  4. Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
  5. Topic :Kafka将消息分门别类,每一类的消息称之为一个主题topic;
  6. Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上, 一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列;
  7. Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失,

producer:生产者,就是它来生产“鸡蛋”的。 consumer:消费者,生出的“鸡蛋”它来消费。 topic:你把它理解为标签,生产者每生产出来一个鸡蛋就贴上一个标签(topic),消费者可不是谁生产的“鸡蛋”都吃的,这样不同的生产者生产出来的“鸡蛋”,消费者就可以选择性的“吃”了。 broker:就是篮子了。

kafka有四个核心API:

  • 应用程序使用 Producer API 发布消息到1个或多个topic(主题)中。
  • 应用程序使用 Consumer API 来订阅一个或多个topic,并处理产生的消息。
  • 应用程序使用 Streams API 充当一个流处理器,从1个或多个topic消费输入流,并生产一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。例如,一个零售APP,接收销售和出货的输入流,统计数量或调整价格后输出。
  • Connector API 可构建或运行可重用的生产者或消费者,将topic连接到现有的应用程序或数据系统。例如,连接到关系数据库的连接器可以捕获表的每个变更。

在线安装

安装jdk环境

可以使用tar包安装,也可以使用rpm安装 这里演示使用rpm 暂时无法在文档外展示此内容

mkdir /usr/java

创建一个 /usr/java 目录

cp /root/jdk-8u301-linux-x64.rpm /usr/java/
chmod 755 /usr/java/jdk-8u301-linux-x64.rpm
rpm -ivh /usr/java/jdk-8u271-linux-x64.rpm
vim /etc/profile

source /etc/profile Kafka基于zookkeeper,所以首先得先安装zookkeeper

安装zookkeeper

下载地址:https://zookeeper.apache.org/releases.html 启动 zookkeeper服务 bin/zookeeper-server-start.sh config/zookeeper.properties

安装kafka wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz

启动Kafka:

bin/kafka-server-start.sh config/server.properties

Zookeeper 默认端口2181 Kafka默认端口8092

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

--replication-factor: 用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。 --partitions:指定创建分区数

查看topic

bin/kafka-topics.sh --list --zookeeper localhost:2181

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

设置多代理集群

cp config/server.properties config/server-1.properties

config/server-1.properties: broker.id=1 listeners= PLAINTEXT://:127.0.0.1:9093 log.dir=/tmp/kafka-logs-1

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的

启动新节点服务: bin/kafka-server-start.sh config/server-2.properties &

详细看示例:

启动消费者

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

结合PHP扩展示例

安装 PHP 扩展

rdkafka 安装需要依赖 librdkafka , 所以先安装 librdkafka git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make && make install

安装 php-rdkafka 扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka /www/server/php/74/bin/phpize ./configure --with-php-config=/www/server/php/74/bin/php-config make && make install

在 php-ini 加上

extension=rdkafka.so

相关文档: https://kafka.apachecn.org/intro.html https://github.com/arnaud-lb/php-rdkafka https://learnku.com/articles/44442 https://www.orchome.com/295 https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.installation.manual.html https://cwiki.apache.org/confluence/display/KAFKA/Clients

打 赏