kafka的配置有很多,这里挑一些比较感兴趣的验证一下,官方说明见 这里
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092 #监听所有网卡的9092
advertised.listeners=PLAINTEXT://kafka:9092 #注册在zookeeper上的信息
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181 #zookeeper集群信息
log.dirs=/tmp/kafka-logs
由于示例中的kafka是部署在docker中的,且连接了多个网络(即多个网卡),所以监听0.0.0.0
docker container inspect kafka
"Networks": {
"bridge": {
"IPAMConfig": null,
"Links": null,
"Aliases": null,
"NetworkID": "e3554b6f538add748eada670007eaf2bd76de26b944e8d3db0f99a8cf2a43435",
"EndpointID": "ceac7cc287def9c2241be645e0b7b739d907d645ff5e607428bcebccbea89ebe",
"Gateway": "172.17.0.1",
"IPAddress": "172.17.0.2",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:11:00:02",
"DriverOpts": null
},
"docker_default": {
"IPAMConfig": {},
"Links": null,
"Aliases": [
"f7195078ce1b",
"kafka"
],
"NetworkID": "b127a7674eb41468039d4d181b40397ccf710755594c591adb528d51cb558a90",
"EndpointID": "0f6ef9aab765ff783818da7ee2bb92c432ccaf9eb2337b2a2bff77b369eb6211",
"Gateway": "172.18.0.1",
"IPAddress": "172.18.0.5",
"IPPrefixLen": 16,
"IPv6Gateway": "",
"GlobalIPv6Address": "",
"GlobalIPv6PrefixLen": 0,
"MacAddress": "02:42:ac:12:00:05",
"DriverOpts": {}
}
}
ifconfig
eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 172.17.0.2 netmask 255.255.0.0 broadcast 172.17.255.255
ether 02:42:ac:11:00:02 txqueuelen 0 (Ethernet)
RX packets 95619 bytes 140011940 (140.0 MB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 21005 bytes 1171563 (1.1 MB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
eth1: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500
inet 172.18.0.5 netmask 255.255.0.0 broadcast 172.18.255.255
ether 02:42:ac:12:00:05 txqueuelen 0 (Ethernet)
RX packets 272 bytes 25462 (25.4 KB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 441 bytes 34869 (34.8 KB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
lo: flags=73<UP,LOOPBACK,RUNNING> mtu 65536
inet 127.0.0.1 netmask 255.0.0.0
loop txqueuelen 1000 (Local Loopback)
RX packets 28 bytes 1954 (1.9 KB)
RX errors 0 dropped 0 overruns 0 frame 0
TX packets 28 bytes 1954 (1.9 KB)
TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
启动并查看
apt install screen net-tools
screen -s /bin/bash -S kafka
bin/kafka-server-start.sh config/server.properties #ctrl + a + d to detach
netstat -tunlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:9092 0.0.0.0:* LISTEN 5762/java
zookeeper zkCli.sh执行
get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka:9092"],"jmx_port":-1,"port":9092,"host":"kafka","version":5,"timestamp":"1647406227247"}
如果把advertised.listeners注释掉重启,由于0.0.0.0不能被其它机器解释,kafka会在启动时异常
[2022-03-16 14:54:18,511] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: advertised.listeners cannot use the nonroutable meta-address 0.0.0.0. Use a routable IP address.
at scala.Predef$.require(Predef.scala:337)
at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:2136)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1992)
at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:1466)
at kafka.Kafka$.buildServer(Kafka.scala:67)
at kafka.Kafka$.main(Kafka.scala:87)
at kafka.Kafka.main(Kafka.scala)
修改listeners,重启
listeners=PLAINTEXT://kafka:9092
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 172.17.0.2:9092 0.0.0.0:* LISTEN 7591/java
这里比较奇怪,因为用nslookup查询出来是不一样的结果
apt install dnsutils
nslookup kafka
Server: 127.0.0.11
Address: 127.0.0.11#53
Non-authoritative answer:
Name: kafka
Address: 172.18.0.5
理所当然zoo1上查询失败
root@zoo1:~/kafka_2.13-3.1.0# bin/kafka-configs.sh --bootstrap-server kafka:9092 --entity-type brokers --entity-name 0 --describe
Dynamic configs for broker 0 are:
[2022-03-16 07:13:39,000] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (kafka/172.18.0.5:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
另外,kafka还支持动态配置
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --delete-config advertised.listeners
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config advertised.listeners=PLAINTEXT://172.18.0.5:9092
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --describe
Dynamic configs for broker 0 are:
advertised.listeners=PLAINTEXT://172.18.0.5:9092 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:advertised.listeners=PLAINTEXT://172.18.0.5:9092, STATIC_BROKER_CONFIG:advertised.listeners=PLAINTEXT://kafka:9092}
zookeeper上查看
[zk: localhost:2181(CONNECTED) 1] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://172.18.0.5:9092"],"jmx_port":-1,"port":9092,"host":"172.18.0.5","version":5,"timestamp":"1647415769434"}
以下是官方的说明
- read-only: Requires a broker restart for update
- per-broker: May be updated dynamically for each broker
- cluster-wide: May be updated dynamically as a cluster-wide default. May also be updated as a per-broker value for testing.
All configs that are configurable at cluster level may also be configured at per-broker level (e.g. for testing). If a config value is defined at different levels, the following order of precedence is used:
- Dynamic per-broker config stored in ZooKeeper
- Dynamic cluster-wide default config stored in ZooKeeper
- Static broker config from server.properties
- Kafka default, see broker configs
controller.*
metadata.*
node.id
process.roles
略,zookeeper版kafka不太需要。
因为消息基本上是final的,kafka使用日志来存储消息,类似于MySQL的binlog,只不过MySQL除了binlog还有其它的结构。
3.1.0版的开箱配置有个
log.retention.hours=168
可配置的还包括
log.retention.minutes
log.retention.ms
涉及到kafka的消息日志,如果配置为可以删除10s前的日志
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-name 0 --alter --add-config log.retention.ms=10000
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092 #ctrl + c to stop
查看
bin/kafka-dump-log.sh --files /tmp/kafka-logs/quickstart-events-0/00000000000000000000.log --print-data-log
而后log文件被删除,增加了后缀为.deleted的文件,最后也被删除。
文件夹后缀代表分区,文件名代表第一条消息的编号。
default.replication.factor
副本数
min.insync.replicas
最少同步副本数,需与producer配置acks配合使用,以保证消息的durability。如果创建一个3副本主题
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic my-topic --partitions 2 --replication-factor 3
可以在其它节点(示例为kafka3)查看到
root@kafkf3:/# ls /tmp/kafka-logs/
cleaner-offset-checkpoint my-topic-1
log-start-offset-checkpoint recovery-point-offset-checkpoint
meta.properties replication-offset-checkpoint
my-topic-0
修改为最少2,然后发送消息
bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --alter --add-config min.insync.replicas=2
bin/kafka-console-producer.sh --topic my-topic --producer-property acks=all --bootstrap-server localhost:9092 #ctrl + c to stop
>replica
>replica2
>[2022-03-18 01:34:08,313] WARN [Producer clientId=console-producer] Got error produce response with correlation id 7 on topic-partition my-topic-0, retrying (2 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2022-03-18 01:34:08,422] WARN [Producer clientId=console-producer] Got error produce response with correlation id 8 on topic-partition my-topic-0, retrying (1 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2022-03-18 01:34:08,528] WARN [Producer clientId=console-producer] Got error produce response with correlation id 9 on topic-partition my-topic-0, retrying (0 attempts left). Error: NOT_ENOUGH_REPLICAS (org.apache.kafka.clients.producer.internals.Sender)
[2022-03-18 01:34:08,638] ERROR Error when sending message to topic my-topic with key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NotEnoughReplicasException: Messages are rejected since there are fewer in-sync replicas than required.
发送replica消息时停止了kafka3,发送replica2消息时停止了kafka2。同时,kafka日志中仅存在replica的消息
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-1/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/my-topic-1/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 1 isTransactional: false isControl: false position: 0 CreateTime: 1647538318648 size: 75 magic: 2 compresscodec: none crc: 684786291 isvalid: true
| offset: 0 CreateTime: 1647538318648 keySize: -1 valueSize: 7 sequence: -1 headerKeys: [] payload: replica
如果producer设置为acks=1
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-console-producer.sh --topic my-topic --producer-property acks=1 --bootstrap-server localhost:9092
>replica3
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/my-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 2 isTransactional: false isControl: false position: 0 CreateTime: 1647539123622 size: 76 magic: 2 compresscodec: none crc: 2948014261 isvalid: true
| offset: 0 CreateTime: 1647539123622 keySize: -1 valueSize: 8 sequence: -1 headerKeys: [] payload: replica3
num.partitions
上面已在创建主题时指定分区数。
auto.leader.rebalance.enable
leader.imbalance.check.interval.seconds
leader.imbalance.per.broker.percentage
分区最主要的目的是平衡负载,但在运行过程中可能因为扩展性、故障等而增加、减少分区。
比较常见的做法是像Redis这样,一开始就创建远超实际节点数的分区,且分区数固定,然后为每个节点动态地分配分区。访问路由表就是分区到节点的映射。
Kafka分区比较灵活,分区数可以由用户在创建主题时指定,并且可以在运行过程中增加分区数量(不影响旧的消息),并且根据以上3个配置项进行再平衡。
再平衡与路由可能涉及共识问题,富有挑战性,这里不再深入。
不同的消费者组通常需要对消息做不同的处理。比如,A组发短信,B组发邮件。
因此一个消费者组需要订阅一个主题的所有消息。这种多个消费者组的工作模式,叫扇出式(参考DDIA)。
而一个分区只能被一个消费者组内的一个消费者消费,也就是说消费者组中的消费者数量大于分区数的话,会存在某些消费者不干活。这种模式,叫负载均衡式。
这样的话,同一分区的消息,会按照分区内的顺序被消费。
offsets.retention.minutes
offsets.topic.num.partitions
offsets.topic.replication.factor
kafka有个内部的topic【__consumer_offsets】,用来记录消费者消费了哪些消息
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
__consumer_offsets
my-topic
quickstart-events
bin/kafka-console-consumer.sh --topic quickstart-events --bootstrap-server localhost:9092 --group test_group
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-consumer-groups.sh --describe --group test_group --bootstrap-server localhost:9092
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test_group quickstart-events 0 9 9 0 console-consumer-54931ba4-ba57-4dcb-badc-a11bf87e6b2f /172.20.0.2 console-consumer
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-consumer-groups.sh --reset-offsets --group test_group --bootstrap-server localhost:9092 --execute --topic quickstart-events --to-earliest
GROUP TOPIC PARTITION NEW-OFFSET
test_group quickstart-events 0 2
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-consumer-groups.sh --describe --group test_group --bootstrap-server localhost:9092
Consumer group 'test_group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test_group quickstart-events 0 2 9 7 - - -
再次运行上面的消费者命令,可以看到之前的消息。
这里有个小插曲,因为上面设置了min.insync.replicas=2,在使用console-consumer时一直消费不了消息,删除后就可以了。
unclean.leader.election.enable
是否允许非ISR选举为分区leader,默认为false。为方便演示,新建2副本主题
bin/kafka-topics.sh --create --topic two-replica-topic --bootstrap-server localhost:9092 --config unclean.leader.election.enable=true --replication-factor 2
查看leader
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092,kafkf3:9092 --topic two-replica-topic
Topic: two-replica-topic TopicId: nwYIqUb2SBi_kiF14n35pg PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824,unclean.leader.election.enable=true
Topic: two-replica-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
停止1号机
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092,kafkf3:9092 --topic two-replica-topic
Topic: two-replica-topic TopicId: nwYIqUb2SBi_kiF14n35pg PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824,unclean.leader.election.enable=true
Topic: two-replica-topic Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2
启动消费者,生产者,生产消费1条消息,停止2号机,启动1号机
root@kafka:~/kafka_2.13-3.1.0# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092,kafkf3:9092 --topic two-replica-topic
Topic: two-replica-topic TopicId: nwYIqUb2SBi_kiF14n35pg PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824,unclean.leader.election.enable=true
Topic: two-replica-topic Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1
停止消费者,发布第2条消息,然后--from-beginning启动消费者,只能消费到第2条消息,再次启动2号机,查看日志
root@kafkf3:/# ~/kafka_2.13-3.1.0/bin/kafka-dump-log.sh --print-data-log --files /tmp/kafka-logs/two-repli
ca-topic-0/00000000000000000000.log
Dumping /tmp/kafka-logs/two-replica-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 3 isTransactional: false isControl: false position: 0 CreateTime: 1648388037650 size: 69 magic: 2 compresscodec: none crc: 1785415631 isvalid: true
| offset: 0 CreateTime: 1648388037650 keySize: -1 valueSize: 1 sequence: -1 headerKeys: [] payload: b
broker.rack
配置机架/机房,与分区、副本分配算法有关。
*.purgatory.purge.interval.requests
可以见 demo项目
从这里也可以看出,producer API是异步的,如果在消息在被发送前producer挂掉,数据就会丢失。
如果生产者是集群,一般都不会要求按某种顺序生产消息(除非业务流程上的自然顺序)。如果生产者是单机,或者要求同一机器的消息必须是顺序的,那么
另外有几个producer配置与顺序有关
retries
enable.idempotence
max.in.flight.requests.per.connection
按照官方文档的说明,似乎异步调用正常时消息也会按照send()调用的前后排序,以上配置是为了在异步异常时保证顺序,但最保险的还是在应用层有明确的offset返回值时才发送下一消息。
在获取offset后,可以以offset作为消息的顺序。实现消费者严格按照顺序消费也面临同样的问题,值得进一步深入。
在使用事件驱动这种模式时,有时不同业务事件的产生就已经有一定的顺序(如业务流程上的自然顺序)。 这种情况下,可以为每个事件附加一个时间戳,以此来当作递增的版本,消费者依据时间戳来保护资源,此时需要误差在一定的范围内。
除了上面的这种版本号保护机制(fail-fast),消费者还可以进行类似于TCP这样的处理,当发现前序事件未接收时将后发生先到达的事件缓存。