<kbd id="5sdj3"></kbd>
<th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>

    Kafka原理分析之基礎篇

    共 11212字,需瀏覽 23分鐘

     ·

    2021-05-07 01:38

    點擊上方藍色字體,選擇“標星公眾號”

    優(yōu)質文章,第一時間送達

      作者 |  碼頭工人

    來源 |  urlify.cn/ZRFFZf

    76套java從入門到精通實戰(zhàn)課程分享

    一、Kafka

    Kafka是一個分布式的消息系統(tǒng)。

    二、解決問題

    消息系統(tǒng)通常被應用于異步處理、應用解耦、流量削峰、消息通信等場景。

    異步處理

    image-20210315230129937

    生產(chǎn)者將消息寫入消息隊列中,消費者異步拉取消息隊列消息,從而提升消息處理能力。

    應用解耦

    image-20210315230734421

    Kafka作為消息傳遞的媒介,各子系統(tǒng)只需要做系統(tǒng)責任內的事情。生產(chǎn)者-消費者模式,Kafka就是消息隊列。

    流量削峰

    image-20210315232052783

    正常情況下,上游服務(如報價、營銷等)常年流量較大,面對大流量時能夠較為從容地應對,但下游應用(如:交易、訂單等)由于常年流量較小,面對大流量時會因為準備不足,而導致系統(tǒng)被打垮,引發(fā)雪崩。

    為了應對這一問題,可以利用消息隊列作為臨時數(shù)據(jù)存儲節(jié)點,消費者根據(jù)自身消費能力,通過拉取的方式控制消費速度,達到流量削峰的目的。

    三、特性

    讀寫效率

    Kafka在面對大流量數(shù)據(jù)時,能夠高效地處理消息的存儲與查詢。通過軟件設計避免硬件讀取磁盤的性能瓶頸。

    網(wǎng)絡傳輸

    批量讀取消息,對消息進行批量壓縮,從而提升網(wǎng)絡利用率。

    并發(fā)能力

    Kafka支持消息分區(qū),每個分區(qū)內保證消息的順序性,多分區(qū)之間能夠支持并發(fā)操作,提升Kafka并發(fā)操作。

    持久化能力

    Kafka將消息持久化至硬盤。網(wǎng)絡傳輸不可靠,所以需要將數(shù)據(jù)進行持久化。其中利用了零拷貝、順序讀、順序寫、頁緩存等技術使Kafa具備高吞吐特性。

    可靠性

    支持分區(qū)多副本,Leader副本負責讀寫,F(xiàn)ollow副本只負責同步Leader副本數(shù)據(jù),實現(xiàn)消息冗余備份,提升Kafka容災能力。

    水平擴展

    多Producer、Broker、Consumer,均為分布式,多Consumer可以加入同一Consumer Group,每個分區(qū)只能分配一個Consumer,當Kafka服務端增加分區(qū)數(shù)量進行水平擴展時,可以向Consumer Group添加Consumer,提升消費能力。當Consumer Group中有Consumer出現(xiàn)故障下線時,能通過再平衡(Rebalance)對分區(qū)進行再分配。

    四、基本概念

    消息&批次

    消息

    (1)消息是Kafka的基本單位;

    (2)消息由key和value的byte數(shù)組構成;

    (3)key能夠根據(jù)策略將消息發(fā)送到指定分區(qū)。

    批次

    (1)為了提升效率,消息被分批寫入kafka,同一組消息必須屬于同一主題的同一分區(qū);

    (2)分批發(fā)送能夠降低網(wǎng)絡開銷,提升傳輸速度。

    主題&分區(qū)

    主題(Topic)是用于存儲消息分類關系的邏輯單元,可以看做存儲消息的集合。分區(qū)(partition)是Kafka數(shù)據(jù)存儲的基本單元,可以看做存儲消息的集合的子集。Kafka消息通過主題進行分類,同一Topic的不同分區(qū)(partition)會分配在不用的Broker上,分區(qū)機制提供橫向擴展的基礎,可以通過增加并在其上分配partition來提升Kafka的消息并行處理能力。

    image-20210417181748356

    日志

    Log基本概念

    (1)分區(qū)邏輯上對應一個Log,生產(chǎn)者將消息寫入分區(qū)實際是寫入分區(qū)對應的Log;

    (2)Log可以對應磁盤上的文件夾,其由多個Segment組成,每個Segment對應一個日志文件和索引文件;

    (3)當Segment大小超出限制時,就會創(chuàng)建新的Segment;

    (4)Kafka采用順序I/O,所以只會向最新的Segment追加數(shù)據(jù);

    (5)索引采用稀疏索引,運行時將其映射至內存中,提升索引速度。

    image-20210417191546608

    Log保存與壓縮

    日志保存

    (1)時間限制

    根據(jù)保留時間,當消息在kafka中保存的時間超過指定時間,就會被刪除。

    (2)大小限制

    根據(jù)Topic存儲大小,當Topic所占日志的大小大于一個閾值,則可以開始刪除最舊的消息。Kafka會啟動一個新的線程,定期檢查是否存在可以刪除的消息。

    日志壓縮

    很多場景中,Kafka消息的key與value值會不斷變化,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改,消費者只會關心最新的key對應的value。如果開啟日志壓縮功能,Kafka會開啟線程,定時對相同key的消息進行合并,并保留最新的value值。

    Broker

    獨立的Kafka服務就是一個broker,broker主要的工作就是接受生產(chǎn)者發(fā)送來的消息,分配offset并保存到磁盤中。Broker除了接受生產(chǎn)者發(fā)送的消息,還處理消費者、其他Broker的請求,根據(jù)請求類型進行相應處理行和響應返回。正常情況下一臺機器對應一個broker。

    副本

    所謂副本就是對消息進程冗余備份,分布式系統(tǒng)在不同機器上相互保存對方數(shù)據(jù)。在Kafka中,每個分區(qū)(partition)可以有多個副本,每個副本中的消息是一樣的(在同一時刻,多臺機器之間的消息并不完全一致)。

    生產(chǎn)者

    生產(chǎn)者(Producer)的主要工作是生成消息。將消息發(fā)布根據(jù)規(guī)則推送到Topic的對應分區(qū)中。例如:(1)對key進行hash;(2)輪詢;(3)自定義。

    消費者

    消費者(Consumer)的主要工作消費消息。從對應分區(qū)中拉取Topic的消息進行消費。消費者需要通過offset記錄自己的消費位置。

    消費者組

    多個消費者(Consumer)構成消費者組(Consumer Group)。消費者組(Consumer Group)訂閱的主題(Topic)的每個分區(qū)只能被分配給,在同一個消費者組中的一個消費者處理。但一個消費者可以消費同一主題(Topic)的多個分區(qū)。

    image-20210418163922518

    消息傳遞模式

    image-20210417120121342

    kafka沒有消息推送,只有消息拉取。但消費者可以通過輪詢拉取的方式實現(xiàn)消息推送功能。

    Kafka架構概圖

    image-20210316000534417

    五、核心特性詳解

    消費者

    (1)消費者從訂閱的主題消費消息的偏移量保存至名字為"__consumer_offsets"的主題中;

    (2)推薦使用Kafka來存儲消費者偏移量,zookeeper不適合高并發(fā)。

    單消費者組

    多個消費同一主題的消費者只要將group_id設置相同,就可以組成消費者組。

    情況一:一個消費者組中,只有一個消費者。

    image-20210418213958556

    情況二:消費者組中有多個消費者。

    image-20210418213256470

    情況三:分區(qū)數(shù)與消費者組數(shù)相同。

    image-20210418213522999

    情況四:消費者組中消費者數(shù)量大于分區(qū)數(shù)。閑置的消費者不會接收消息。

    image-20210418214525504

    多消費者組

    一個主題對應多個消費者組,每個消費者組都能夠消費該主題的所有消息。

    image-20210418215624764

    心跳機制

    Kafka的心跳機制保證Consumer和Broker之間的健康,當Broker Coordinator正常時,Consumer才會發(fā)送心跳。

    再平衡機制

    再平衡是規(guī)定消費者組下消費者與主題的分區(qū)之間發(fā)生變化時如何分配的協(xié)議。

    再平衡觸發(fā)條件

    (1)消費組內消費者發(fā)生變化。(消費組數(shù)量變化,例如消費組宕機退出消費組)

    (2)主題對應分區(qū)數(shù)發(fā)生變化。(kafka只支持增加分區(qū))

    (3)訂閱主題發(fā)生變化。(消費組使用正則表達式訂閱主題,此時恰好新建了對應主題)

    情況一:正常情況,每個分區(qū)只能分配給一個消費者。

    image-20210419000612825

    情況二:消費者機器宕機,消費者退出消費組,觸發(fā)再平衡,重新給消費者組中的消費者分配分區(qū)。

    image-20210419000928251

    情況三:Broker機器宕機,導致分區(qū)3無法提供服務。如果分區(qū)有副本則觸發(fā)再平衡,如果沒有副本則消費者3閑置。

    image-20210419001510688

    情況四:使用正則表達式訂閱主題,當新增主題時,主題對應的分區(qū)會分配給當前消費者,會觸發(fā)再平衡。

    image-20210419003311991

    避免再平衡

    訂閱主題數(shù)和主題分區(qū)數(shù)發(fā)生變化,一般情況下是運維主動觸發(fā),正常情況下不需要避免再平衡。所以我們可以重點關注由消費者組消費者數(shù)量變化而引發(fā)的重平衡。

    在再平衡完成后,每個消費者實例會定時向Coodinator發(fā)送心跳請求。

    消費者判“死”條件
    消費者沒有定期地向Coordinator發(fā)送心跳請求

    (1)session.timeout.ms參數(shù)標識判定消費者死亡的時間閾值。參數(shù)默認值為10秒,即如果10秒內沒有收到Group下的某Consumer實例的心跳請求,則被判定該Consumer實例“死亡”,移出Group。

    (2)heartbeat.interval.ms參數(shù)標識心跳請求發(fā)送的頻率。值越小,Consumer實例發(fā)送心跳請求的頻率就越高。

    規(guī)定時間內沒有消費完poll方法返回的消息

    (1)max.poll.interval.ms參數(shù)標識Consumer實例調用poll方法的最大時間間隔。默認值是5分鐘,表示Comsumer如果在5分鐘內無法消費完poll方法返回的消息,則會被移出Group。

    避免消費者被判“死”
    避免被“條件1”判死

    session.timeout.ms >= 3 * heartbeat.interval.ms。保證Consumer被判死前至少經(jīng)過3輪心跳請求。

    例如:設置 session.timeout.ms = 6s;設置 heartbeat.interval.ms = 2s。

    避免被“條件2”判死

    盡可能將max.poll.interval.ms時間設置大一些??梢詫⑾M者實例中的最長耗時作為依據(jù),再此基礎之上擴大1-1.5倍。為業(yè)務處理留下充足的處理時間,避免由于消息消費時間過長而導致再平衡。

    位移管理

    位移主題

    Kafka中消費者根據(jù)消息的位移順序消費消息,消費者的位移由消費者管理,可以存儲在zookeeper中,可以存儲于Kafka主題__consumer_offse中hjmgbknjk.n,jvgnvmnn/.vt。sconsumer_offsets就是位移主題。

    引入原因

    (1)老版本的位移管理依托Zookeeper,會自動或手動的方式將位移數(shù)據(jù)提交至Zookeeper進行保存。當Consumer重啟后,它就能自動從Zookeeper中讀取位移數(shù)據(jù),從上次截止消費的地方繼續(xù)消費。這種設計是的Kafka Broker不需要保存位移數(shù)據(jù)。

    (2)但Zookeeper不適合高頻寫操作,所以在0.8.2.x版本后新版本的Consumer推出了全新的位移管理機制。將Consumer的位移數(shù)據(jù)作為一條普通的Kafka消息,提交到__consumer_offsets。

    (3)正情況下不需要修改它,也不可以隨意地向該主題寫消息,因為這會導致Kafka無法正常解析。

    消息格式

    (1)Key中包含GroupID、主題名、分區(qū)號;

    (2)Value中包含位移值。

    位移提交

    (1)Consumer需要向Kafka記錄自己的位移數(shù)據(jù),這個匯報過程稱為 提交位移(Committing Offsets)

    (2)Consumer 需要為分配給它的每個分區(qū)提交各自的位移數(shù)據(jù)

    (3)位移提交的由Consumer端負責的,Kafka只負責保管。__consumer_offsets

    (4)位移提交分為自動提交和手動提交

    (5)位移提交分為同步提交和異步提交

    自動提交

    (1)設置enable.auto.commit值為true;

    (2)通過auto.commit.interval.ms,可以設置自動提交的時間間隔,默認值為5秒;

    (3)Kafka會保證在開始調用poll方法時,提交上次poll返回的所有消息的位移信息。poll方法的邏輯是先提交上一批消息的位移,再處理下一批消息,因此它能保證不出現(xiàn)消費丟失的情況;

    (4)自動提交會出現(xiàn)消息重復消費。例:Consumer每 5s提交offset,當提交位移信息后3秒發(fā)生了再平衡,所有Consumer都會從上次提交的offset開始消費,但此時獲取的offset已經(jīng)是3秒前的offset了,所以我們又會重新消費再平衡前3秒的所有數(shù)據(jù)。我們只能夠縮小提交offset的時間窗口,但無法避免重復消費。

    手動提交

    1、同步提交

    (1)使用 KafkaConsumer#commitSync():會提交 KafkaConsumer#poll() 返回的最新offset;

    (2)該方法為同步操作,等待直到 offset 被成功提交才返回;

    1while (true) {
    2            ConsumerRecords<String, String> records =
    3                        consumer.poll(Duration.ofSeconds(1));
    4            process(records); // 處理消息
    5            try {
    6                        consumer.commitSync();
    7            } catch (CommitFailedException e) {
    8                        handle(e); // 處理提交失敗異常
    9            }

    (3)同步提交會使Consumer處于阻塞狀態(tài);

    (4)同步提交在出現(xiàn)異常時會自動重試。

    2、異步提交

    (1)使用異步提交規(guī)避Consumer阻塞;

    (2)異常(GC、網(wǎng)絡抖動)時使用同步提交進行重試。

     1try {
     2     while(true) {
     3          ConsumerRecords<String, String> records = 
     4                                    consumer.poll(Duration.ofSeconds(1));
     5          process(records); // 處理消息
     6          commitAysnc(); // 使用異步提交規(guī)避阻塞
     7     }
     8} catch(Exception e) {
     9            handle(e); // 處理異常
    10} finally {
    11    try {
    12        consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
    13    } finally {
    14       consumer.close();
    15    }

    分區(qū)

    分區(qū)(Partition)是Kafka數(shù)據(jù)的基本單元。同一個主題(topic)數(shù)據(jù)會被分散存儲到多個partion中,這些分區(qū)可以被分配到同一臺機器或不同機器上。優(yōu)點是有利于水平擴展,避免單臺機器在磁盤空間和性能上的限制,同時可以通過復制來增加數(shù)據(jù)冗余,從而提升容災能力。為了做到均勻分布,一般partition的數(shù)量一般是Broker Server數(shù)量的整數(shù)倍。

    副本機制

    副本機制的優(yōu)點

    分區(qū)擁有多個副本,提供冗余數(shù)據(jù),有利于確保kafka的高可用性。

    副本定義

    (1)每個主題可以分為多個分區(qū),每個分區(qū)下配置多個副本;

    (2)副本的本質是一個只能追加寫消息的提交日志;

    (3)同一分區(qū)下的所有副本保存相同的消息序列;

    (4)分區(qū)寫的不同副本分散保存在不同Broker上,應對Broker宕機時分區(qū)數(shù)據(jù)不可用的情況。

    image-20210501184109411

    副本角色

    同分區(qū)多副本,如何保證副本消息一致性?

    最常見的解決方案是基于領導者(Leader-based)的副本機制。

    image-20210501190758088

    1、副本分為兩類

    (1)領導者副本;

    (2)追隨者副本。

    2、在Kafka的副本機制與其他分布式系統(tǒng)不同

    (1)在kafka中,追隨者副本不對外提供服務。所有請求都必須由領導者副本來處理;

    (2)追隨者副本的唯一任務就是從領導者副本異步拉取消息,并寫入自己的提交日志中,從而實現(xiàn)與領導者副本的同步。

    3、領導者副本所處Broker宕機

    (1)Kafka依托Zookeeper提供的監(jiān)控功能能夠感知Broker宕機,并開啟一輪新的選舉;

    (2)老Leader副本重啟后,只能作為追隨者副本加入集群中。

    追隨者副本不對外提供服務的原因

    1、方便實現(xiàn)“Read-your-writes”

    (1)生產(chǎn)者使用API想Kafka寫入消息成功后,能夠立馬使用消費者API查看到剛才生產(chǎn)的信息。

    (2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,因此就可能出現(xiàn)追隨者副本沒有從領導者副本拉取到最新消息的情況,就會出現(xiàn)無法立刻讀到最新寫入的消息。

    2、方便實現(xiàn)單調讀(Monotonic Reads)

    (1)什么是單調讀?對于消息消費者而言,消息不會時有時無。

    (2)如果允許追隨者副本對外提供服務,由于追隨者副本是異步的,多個副本從領導者副本拉取的消息不一定同步,就會出現(xiàn)多次請求讀取不同的追隨者副本的情況,數(shù)據(jù)讀取時有時無。如果讀取全由領導者副本來處理,那么Kafka就很實現(xiàn)單調讀一致性。

    同步副本(ISR)與非同步副本(OSR)

    由于追隨者副本需要異步去拉取領導者副本,那么我們就需要確定再怎么樣才算與領導者副本同步。

    Kafka引入了In-Sync Replicas,也就是ISR(同步)副本集合,該副本在Zookeeper上維護。如果存在于ISR中則意味著與領導者副本同步,相反則為非同步副本(OSR)

    同步副本的標準
    image-20210503235617510

    (1)replica.lag.time.max.ms參數(shù)值標識Follower副本能夠慢于Leader副本的最長時間間隔,默認值為10秒。

    (2)若Follower副本落后于Leader副本的最長連續(xù)時間間隔不超過該replica.lag.time.max.ms參數(shù)值設定的大小,則認定該Follower副本與Leader副本是同步的,否則認定為非同步,會將副本從ISR副本集合中移出(Follower副本的拉取速度慢于Leader副本寫入消息的速度,且時間間隔超過設定閾值)。

    (3)ISR是動態(tài)調整集合,非靜態(tài)不變的。當Follower副本追上進度時,就會重新被添加會ISR集合。

    HW

    高水位(HW是High Watermark的縮寫),表示一個特定消息的偏移量,消費者只能拉取到這個offset之前的數(shù)據(jù)。

    LEO

    LEO是Log End Offset的縮寫,表示當前日志文件下一條待寫入消息的offset

    image-20210503235536271

    Leader選舉

    少部分副本宕機

    (1)當Leader副本對應的broker宕機后,就會從Follower副本中選擇一個副本作為Leader;

    (2)當宕機的broker恢復后就會重新從leader中pull數(shù)據(jù)。

    全部副本宕機

    unclean.leader.election.enable 控制是否允許 Unclean 領導者選舉。

    (1)不開啟Unclean。等待ISR中的一個恢復,并選擇其當leader;(等待時間較長,可用性降低)

    (2)開啟Unclean。選擇第一個恢復的副本作為新的leader,無論是否是ISR副本。(開啟會造成數(shù)據(jù)丟失)

    (3)正常情況下建議不開啟,雖然犧牲了高可用性,但維護了數(shù)據(jù)一致性,避免消息丟失。

    為什么不少數(shù)服從多數(shù)?

    選擇Leader副本時如果需要超過半數(shù)的同步副本同意,算法所需的冗余同步副本較多。(一臺機器失敗,就需要3個同步副本)

    物理存儲

    存儲概述

    基本概念

    (1)Kafka使用日志文件保存生產(chǎn)者發(fā)送的消息;

    (2)每條消息都有一個offset值表示它在分區(qū)中的偏移量;

    (3)offset值是邏輯值并不是真實存在的物理地址。其類似于數(shù)據(jù)庫中的主鍵,唯一標識了數(shù)據(jù)庫表中的一條數(shù)據(jù)。而offset在Kafka中的某個分區(qū)唯一標識一條消息。

    (4)Log與分區(qū)一一對應,Log并不是一個文件而是一個文件夾;

    (5)文件夾以topicName_pratiitonID命名,分區(qū)消息全部都存儲在次文件夾下的日志文件中;

    (6)Kafka通過分段的方式將Log分為多個LogSegment,LogSegment是邏輯概念,對應磁盤上的Log目錄下的一個日志文件和索引文件;

    (7)日志文件的命名規(guī)則是[baseOffset].log,baseOffset是日志文件中第一條消息的offset;

    (8)Kafka日志是順序追加的;

    (9)每個日志文件都對應一個索引文件,索引文件使用稀疏索引的方式為文件日志中部分消息建立索引。

    (10)日志文件結構圖

    image-20210503153647283

    (11)Log示例

    image-20210504001658365

    創(chuàng)建了一個tp_demo_01的主題,其中存在6個partition,對應的每個partition下存在一個Topic-partition命名的消息日志。

    (12)LogSegment示例

    image-20210504001721206

    文件類別

    image-20210504001748640

    日志存儲

    索引

    為了提升消息查找的速度,Kafka從0.8版本開始,為每個日志文件添加對應的索引文件。IndexFile與MassageSet File共同組成了LogSegment。

    偏移量索引文件用于記錄消息偏移量與物理地址之間的映射關系。時間戳索引文件則根據(jù)時間戳查找對應的偏移量。

    索引文件中的索引項的格式:每個索引項為8字節(jié),分為兩部分,第一部分是相對offset(4字節(jié)),即相對于baseOffset的偏移量(baseOffset就是基準偏移量,日志文件的命名以基準偏移量來命名)。第二部分是物理地址(4字節(jié)),即其索引消息在日志文件中對應的position位置。通過這兩部分就能實現(xiàn)offset與物理地址之間的映射。

    消息壓縮
    image-20210503214424850
    偏移量索引
    image-20210503214049171

    舉例

    假設需要查找startOffset為1067。需要將offset=1067轉換成對應的物理地址,其過程是怎樣的?

    (1)將絕對offset轉化為相對offset,絕對offset減去baseOffset,得到相對offset=67;

    (2)通過相對offset查找索引文件,得到(58,1632)索引項(通過跳表的方式定位到某一個index文件,再通過二分法找到不大于相對offset的最大索引項);

    (3)從position為1632處開始順序查找,找到絕對offset=1067的消息;

    (4)最終會得到offset為1070的位置消息。(因為消息被壓縮,offset=1067這條消息被壓縮后一起構成offset=1070這條外層消息)。

    六、參考

    《Apache Kafka源碼剖析》

    《極客時間-Kafka核心技術與實戰(zhàn)》

    《拉鉤Java-Kafka》

    七、總結

    本文從場景、特性、基本概念、核心特性等多個維度較為詳細地闡述了Kafka的相關知識。關于kafka穩(wěn)定性與具體源碼實現(xiàn)會在進階篇中闡述。




    粉絲福利:Java從入門到入土學習路線圖

    ??????

    ??長按上方微信二維碼 2 秒


    感謝點贊支持下哈 

    瀏覽 41
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報
    評論
    圖片
    表情
    推薦
    點贊
    評論
    收藏
    分享

    手機掃一掃分享

    分享
    舉報

    <kbd id="5sdj3"></kbd>
    <th id="5sdj3"></th>

  • <dd id="5sdj3"><form id="5sdj3"></form></dd>
    <td id="5sdj3"><form id="5sdj3"><big id="5sdj3"></big></form></td><del id="5sdj3"></del>

  • <dd id="5sdj3"></dd>
    <dfn id="5sdj3"></dfn>
  • <th id="5sdj3"></th>
    <tfoot id="5sdj3"><menuitem id="5sdj3"></menuitem></tfoot>

  • <td id="5sdj3"><form id="5sdj3"><menu id="5sdj3"></menu></form></td>
  • <kbd id="5sdj3"><form id="5sdj3"></form></kbd>
    91超碰人妻 | 北条麻妃熟女60分钟 | 高清一区二区三区 | 男人操女人免费网站 | 香蕉国产成人毛片 |