Kafka面試題系列(基礎篇)
點擊上方藍色字體,選擇“設為星標”

Kafka的用途有哪些?使用場景如何?
消息系統(tǒng):Kafka 和傳統(tǒng)的消息系統(tǒng)(也稱作消息中間件)都具備系統(tǒng)解耦、冗余存儲、流量削峰、緩沖、異步通信、擴展性、可恢復性等功能。與此同時,Kafka 還提供了大多數(shù)消息系統(tǒng)難以實現(xiàn)的消息順序性保障及回溯消費的功能。
存儲系統(tǒng):Kafka 把消息持久化到磁盤,相比于其他基于內(nèi)存存儲的系統(tǒng)而言,有效地降低了數(shù)據(jù)丟失的風險。也正是得益于 Kafka 的消息持久化功能和多副本機制,我們可以把 Kafka 作為長期的數(shù)據(jù)存儲系統(tǒng)來使用,只需要把對應的數(shù)據(jù)保留策略設置為“永久”或啟用主題的日志壓縮功能即可。
流式處理平臺:Kafka 不僅為每個流行的流式處理框架提供了可靠的數(shù)據(jù)來源,還提供了一個完整的流式處理類庫,比如窗口、連接、變換和聚合等各類操作。
Kafka中的ISR、AR又代表什么?ISR的伸縮又指什么
分區(qū)中的所有副本統(tǒng)稱為 AR(Assigned Replicas)。所有與 leader 副本保持一定程度同步的副本(包括 leader 副本在內(nèi))組成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的一個子集。
ISR的伸縮:
leader 副本負責維護和跟蹤 ISR 集合中所有 follower 副本的滯后狀態(tài),當 follower 副本落后太多或失效時,leader 副本會把它從 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本會把它從 OSR 集合轉移至 ISR 集合。默認情況下,當 leader 副本發(fā)生故障時,只有在 ISR 集合中的副本才有資格被選舉為新的 leader,而在 OSR 集合中的副本則沒有任何機會(不過這個原則也可以通過修改相應的參數(shù)配置來改變)。
replica.lag.time.max.ms :這個參數(shù)的含義是 Follower 副本能夠落后 Leader 副本的最長時間間隔,當前默認值是 10 秒。
unclean.leader.election.enable:是否允許 Unclean 領導者選舉。開啟 Unclean 領導者選舉可能會造成數(shù)據(jù)丟失,但好處是,它使得分區(qū) Leader 副本一直存在,不至于停止對外提供服務,因此提升了高可用性。
Kafka中的HW、LEO、LSO、LW等分別代表什么?
HW 是 High Watermark 的縮寫,俗稱高水位,它標識了一個特定的消息偏移量(offset),消費者只能拉取到這個 offset 之前的消息。
LSO是LogStartOffset,一般情況下,日志文件的起始偏移量 logStartOffset 等于第一個日志分段的 baseOffset,但這并不是絕對的,logStartOffset 的值可以通過 DeleteRecordsRequest 請求(比如使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 腳本、日志的清理和截斷等操作進行修改。
如上圖所示,它代表一個日志文件,這個日志文件中有9條消息,第一條消息的 offset(LogStartOffset)為0,最后一條消息的 offset 為8,offset 為9的消息用虛線框表示,代表下一條待寫入的消息。日志文件的 HW 為6,表示消費者只能拉取到 offset 在0至5之間的消息,而 offset 為6的消息對消費者而言是不可見的。
LEO 是 Log End Offset 的縮寫,它標識當前日志文件中下一條待寫入消息的 offset,上圖中 offset 為9的位置即為當前日志文件的 LEO,LEO 的大小相當于當前日志分區(qū)中最后一條消息的 offset 值加1。分區(qū) ISR 集合中的每個副本都會維護自身的 LEO,而 ISR 集合中最小的 LEO 即為分區(qū)的 HW,對消費者而言只能消費 HW 之前的消息。
LW 是 Low Watermark 的縮寫,俗稱“低水位”,代表 AR 集合中最小的 logStartOffset 值。副本的拉取請求(FetchRequest,它有可能觸發(fā)新建日志分段而舊的被清理,進而導致 logStartOffset 的增加)和刪除消息請求(DeleteRecordRequest)都有可能促使 LW 的增長。
Kafka中是怎么體現(xiàn)消息順序性的?
可以通過分區(qū)策略體現(xiàn)消息順序性。
分區(qū)策略有輪詢策略、隨機策略、按消息鍵保序策略。
按消息鍵保序策略:一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略
CopyListpartitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Kafka中的分區(qū)器、序列化器、攔截器是否了解?它們之間的處理順序是什么?
序列化器:生產(chǎn)者需要用序列化器(Serializer)把對象轉換成字節(jié)數(shù)組才能通過網(wǎng)絡發(fā)送給 Kafka。而在對側,消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節(jié)數(shù)組轉換成相應的對象。
分區(qū)器:分區(qū)器的作用就是為消息分配分區(qū)。如果消息 ProducerRecord 中沒有指定 partition 字段,那么就需要依賴分區(qū)器,根據(jù) key 這個字段來計算 partition 的值。
Kafka 一共有兩種攔截器:生產(chǎn)者攔截器和消費者攔截器。
生產(chǎn)者攔截器既可以用來在消息發(fā)送前做一些準備工作,比如按照某個規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計類工作。
消費者攔截器主要在消費到消息或在提交消費位移時進行一些定制化的操作。
消息在通過 send() 方法發(fā)往 broker 的過程中,有可能需要經(jīng)過攔截器(Interceptor)、序列化器(Serializer)和分區(qū)器(Partitioner)的一系列作用之后才能被真正地發(fā)往 broker。攔截器(下一章會詳細介紹)一般不是必需的,而序列化器是必需的。消息經(jīng)過序列化之后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分區(qū)器的作用,因為 partition 代表的就是所要發(fā)往的分區(qū)號。
處理順序 :攔截器->序列化器->分區(qū)器
KafkaProducer 在將消息序列化和計算分區(qū)之前會調(diào)用生產(chǎn)者攔截器的 onSend() 方法來對消息進行相應的定制化操作。
然后生產(chǎn)者需要用序列化器(Serializer)把對象轉換成字節(jié)數(shù)組才能通過網(wǎng)絡發(fā)送給 Kafka。
最后可能會被發(fā)往分區(qū)器為消息分配分區(qū)。
Kafka生產(chǎn)者客戶端的整體結構是什么樣子的?

整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和 Sender 線程(發(fā)送線程)。
在主線程中由 KafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。
Sender 線程負責從 RecordAccumulator 中獲取消息并將其發(fā)送到 Kafka 中。
RecordAccumulator 主要用來緩存消息以便 Sender 線程可以批量發(fā)送,進而減少網(wǎng)絡傳輸?shù)馁Y源消耗以提升性能。
Kafka生產(chǎn)者客戶端中使用了幾個線程來處理?分別是什么?
整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運行,這兩個線程分別為主線程和 Sender 線程(發(fā)送線程)。在主線程中由 KafkaProducer 創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也稱為消息收集器)中。Sender 線程負責從 RecordAccumulator 中獲取消息并將其發(fā)送到 Kafka 中。
Kafka的舊版Scala的消費者客戶端的設計有什么缺陷?
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一個分布式的協(xié)調(diào)服務框架,Kafka 重度依賴它實現(xiàn)各種各樣的協(xié)調(diào)管理。將位移保存在 ZooKeeper 外部系統(tǒng)的做法,最顯而易見的好處就是減少了 Kafka Broker 端的狀態(tài)保存開銷。
ZooKeeper 這類元框架其實并不適合進行頻繁的寫更新,而 Consumer Group 的位移更新卻是一個非常頻繁的操作。這種大吞吐量的寫操作會極大地拖慢 ZooKeeper 集群的性能
“消費組中的消費者個數(shù)如果超過topic的分區(qū),那么就會有消費者消費不到數(shù)據(jù)”這句話是否正確?如果正確,那么有沒有什么hack的手段?
一般來說如果消費者過多,出現(xiàn)了消費者的個數(shù)大于分區(qū)個數(shù)的情況,就會有消費者分配不到任何分區(qū)。
開發(fā)者可以繼承AbstractPartitionAssignor實現(xiàn)自定義消費策略,從而實現(xiàn)同一消費組內(nèi)的任意消費者都可以消費訂閱主題的所有分區(qū):
Copypublic class BroadcastAssignor extends AbstractPartitionAssignor{
@Override
public String name() {
return "broadcast";
}
private Map> consumersPerTopic(
MapconsumerMetadata) {
(具體實現(xiàn)請參考RandomAssignor中的consumersPerTopic()方法)
}
@Override
public Map> assign(
MappartitionsPerTopic,
Mapsubscriptions) {
Map> consumersPerTopic =
consumersPerTopic(subscriptions);
Map> assignment = new HashMap<>();
//Java8
subscriptions.keySet().forEach(memberId ->
assignment.put(memberId, new ArrayList<>()));
//針對每一個主題,為每一個訂閱的消費者分配所有的分區(qū)
consumersPerTopic.entrySet().forEach(topicEntry->{
String topic = topicEntry.getKey();
Listmembers = topicEntry.getValue();
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null || members.isEmpty())
return;
Listpartitions = AbstractPartitionAssignor
.partitions(topic, numPartitionsForTopic);
if (!partitions.isEmpty()) {
members.forEach(memberId ->
assignment.get(memberId).addAll(partitions));
}
});
return assignment;
}
}
注意組內(nèi)廣播的這種實現(xiàn)方式會有一個嚴重的問題—默認的消費位移的提交會失效。
消費者提交消費位移時提交的是當前消費到的最新消息的offset還是offset+1?
在舊消費者客戶端中,消費位移是存儲在 ZooKeeper 中的。而在新消費者客戶端中,消費位移存儲在 Kafka 內(nèi)部的主題__consumer_offsets 中。
當前消費者需要提交的消費位移是offset+1
有哪些情形會造成重復消費?
Rebalance
一個consumer正在消費一個分區(qū)的一條消息,還沒有消費完,發(fā)生了rebalance(加入了一個consumer),從而導致這條消息沒有消費成功,rebalance后,另一個consumer又把這條消息消費一遍。消費者端手動提交
如果先消費消息,再更新offset位置,導致消息重復消費。消費者端自動提交
設置offset為自動提交,關閉kafka時,如果在close之前,調(diào)用 consumer.unsubscribe() 則有可能部分offset沒提交,下次重啟會重復消費。生產(chǎn)者端
生產(chǎn)者因為業(yè)務問題導致的宕機,在重啟之后可能數(shù)據(jù)會重發(fā)
那些情景下會造成消息漏消費?
自動提交
設置offset為自動定時提交,當offset被自動定時提交時,數(shù)據(jù)還在內(nèi)存中未處理,此時剛好把線程kill掉,那么offset已經(jīng)提交,但是數(shù)據(jù)未處理,導致這部分內(nèi)存中的數(shù)據(jù)丟失。生產(chǎn)者發(fā)送消息
發(fā)送消息設置的是fire-and-forget(發(fā)后即忘),它只管往 Kafka 中發(fā)送消息而并不關心消息是否正確到達。不過在某些時候(比如發(fā)生不可重試異常時)會造成消息的丟失。這種發(fā)送方式的性能最高,可靠性也最差。消費者端
先提交位移,但是消息還沒消費完就宕機了,造成了消息沒有被消費。自動位移提交同理acks沒有設置為all
如果在broker還沒把消息同步到其他broker的時候宕機了,那么消息將會丟失
KafkaConsumer是非線程安全的,那么怎么樣實現(xiàn)多線程消費?
線程封閉,即為每個線程實例化一個 KafkaConsumer 對象

一個線程對應一個 KafkaConsumer 實例,我們可以稱之為消費線程。一個消費線程可以消費一個或多個分區(qū)中的消息,所有的消費線程都隸屬于同一個消費組。
消費者程序使用單或多線程獲取消息,同時創(chuàng)建多個消費線程執(zhí)行消息處理邏輯。
獲取消息的線程可以是一個,也可以是多個,每個線程維護專屬的 KafkaConsumer 實例,處理消息則交由特定的線程池來做,從而實現(xiàn)消息獲取與消息處理的真正解耦。具體架構如下圖所示:
兩個方案對比:
簡述消費者與消費組之間的關系
Consumer Group 下可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。
Group ID 是一個字符串,在一個 Kafka 集群中,它標識唯一的一個 Consumer Group。
Consumer Group 下所有實例訂閱的主題的單個分區(qū),只能分配給組內(nèi)的某個 Consumer 實例消費。這個分區(qū)當然也可以被其他的 Group 消費。
當你使用kafka-topics.sh創(chuàng)建(刪除)了一個topic之后,Kafka背后會執(zhí)行什么邏輯?
在執(zhí)行完腳本之后,Kafka 會在 log.dir 或 log.dirs 參數(shù)所配置的目錄下創(chuàng)建相應的主題分區(qū),默認情況下這個目錄為/tmp/kafka-logs/。
在 ZooKeeper 的/brokers/topics/目錄下創(chuàng)建一個同名的實節(jié)點,該節(jié)點中記錄了該主題的分區(qū)副本分配方案。示例如下:
Copy[zk: localhost:2181/kafka(CONNECTED) 2] get /brokers/topics/topic-create
{"version":1,"partitions":{"2":[1,2],"1":[0,1],"3":[2,1],"0":[2,0]}}
topic的分區(qū)數(shù)可不可以增加?如果可以怎么增加?如果不可以,那又是為什么?
可以增加,使用 kafka-topics 腳本,結合 --alter 參數(shù)來增加某個主題的分區(qū)數(shù),命令如下:
Copybin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic--partitions <新分區(qū)數(shù)>
當分區(qū)數(shù)增加時,就會觸發(fā)訂閱該主題的所有 Group 開啟 Rebalance。
首先,Rebalance 過程對 Consumer Group 消費過程有極大的影響。在 Rebalance 過程中,所有 Consumer 實例都會停止消費,等待 Rebalance 完成。這是 Rebalance 為人詬病的一個方面。
其次,目前 Rebalance 的設計是所有 Consumer 實例共同參與,全部重新分配所有分區(qū)。其實更高效的做法是盡量減少分配方案的變動。
最后,Rebalance 實在是太慢了。
topic的分區(qū)數(shù)可不可以減少?如果可以怎么減少?如果不可以,那又是為什么?
不支持,因為刪除的分區(qū)中的消息不好處理。如果直接存儲到現(xiàn)有分區(qū)的尾部,消息的時間戳就不會遞增,如此對于 Spark、Flink 這類需要消息時間戳(事件時間)的組件將會受到影響;如果分散插入現(xiàn)有的分區(qū),那么在消息量很大的時候,內(nèi)部的數(shù)據(jù)復制會占用很大的資源,而且在復制期間,此主題的可用性又如何得到保障?與此同時,順序性問題、事務性問題,以及分區(qū)和副本的狀態(tài)機切換問題都是不得不面對的。
創(chuàng)建topic時如何選擇合適的分區(qū)數(shù)?
在 Kafka 中,性能與分區(qū)數(shù)有著必然的關系,在設定分區(qū)數(shù)時一般也需要考慮性能的因素。對不同的硬件而言,其對應的性能也會不太一樣。
可以使用Kafka 本身提供的用于生產(chǎn)者性能測試的 kafka-producer- perf-test.sh 和用于消費者性能測試的 kafka-consumer-perf-test.sh來進行測試。
增加合適的分區(qū)數(shù)可以在一定程度上提升整體吞吐量,但超過對應的閾值之后吞吐量不升反降。如果應用對吞吐量有一定程度上的要求,則建議在投入生產(chǎn)環(huán)境之前對同款硬件資源做一個完備的吞吐量相關的測試,以找到合適的分區(qū)數(shù)閾值區(qū)間。
分區(qū)數(shù)的多少還會影響系統(tǒng)的可用性。如果分區(qū)數(shù)非常多,如果集群中的某個 broker 節(jié)點宕機,那么就會有大量的分區(qū)需要同時進行 leader 角色切換,這個切換的過程會耗費一筆可觀的時間,并且在這個時間窗口內(nèi)這些分區(qū)也會變得不可用。
分區(qū)數(shù)越多也會讓 Kafka 的正常啟動和關閉的耗時變得越長,與此同時,主題的分區(qū)數(shù)越多不僅會增加日志清理的耗時,而且在被刪除時也會耗費更多的時間。
文章不錯?點個【在看】吧!??



