日B视频 亚洲,啪啪啪网站一区二区,91色情精品久久,日日噜狠狠色综合久,超碰人妻少妇97在线,999青青视频,亚洲一区二卡,让本一区二区视频,日韩网站推荐

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

Kafka高性能背后的技術(shù)原理

小林coding ? 來源:君哥聊技術(shù) ? 2024-10-23 09:37 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

以下文章來源于君哥聊技術(shù),作者朱晉君

Kafka 是一款性能非常優(yōu)秀的消息隊列,每秒處理的消息體量可以達(dá)到千萬級別。

今天來聊一聊 Kafka 高性能背后的技術(shù)原理,也是面試常問的一個知識考點。

1 批量發(fā)送

Kafka 收發(fā)消息都是批量進(jìn)行處理的。我們看一下 Kafka 生產(chǎn)者發(fā)送消息的代碼:

privateFuturedoSend(ProducerRecordrecord,Callbackcallback){
TopicPartitiontp=null;
try{
//省略前面代碼
CallbackinterceptCallback=newInterceptorCallback<>(callback,this.interceptors,tp);
//把消息追加到之前緩存的這一批消息上
RecordAccumulator.RecordAppendResultresult=accumulator.append(tp,timestamp,serializedKey,
serializedValue,headers,interceptCallback,remainingWaitMs);
//積累到設(shè)置的緩存大小,則發(fā)送出去
if(result.batchIsFull||result.newBatchCreated){
log.trace("Wakingupthesendersincetopic{}partition{}iseitherfullorgettinganewbatch",record.topic(),partition);
this.sender.wakeup();
}
returnresult.future;
//handlingexceptionsandrecordtheerrors;
//forAPIexceptionsreturntheminthefuture,
//forotherexceptionsthrowdirectly
}catch/**省略catch代碼*/
}

從代碼中可以看到,生產(chǎn)者調(diào)用 doSend 方法后,并不會直接把消息發(fā)送出去,而是把消息緩存起來,緩存消息量達(dá)到配置的批量大小后,才會發(fā)送出去。

注意:從上面 accumulator.append 代碼可以看到,一批消息屬于同一個 topic 下面的同一個 partition。

Broker 收到消息后,并不會把批量消息解析成單條消息后落盤,而是作為批量消息進(jìn)行落盤,同時也會把批量消息直接同步給其他副本。

消費者拉取消息,也不會按照單條進(jìn)行拉取,而是按照批量進(jìn)行拉取,拉取到一批消息后,再解析成單條消息進(jìn)行消費。

使用批量收發(fā)消息,減輕了客戶端和 Broker 的交互次數(shù),提升了 Broker 處理能力。

2 消息壓縮

如果消息體比較大,Kafka 消息吞吐量要達(dá)到千萬級別,網(wǎng)卡支持的網(wǎng)絡(luò)傳輸帶寬會是一個瓶頸。Kafka 的解決方案是消息壓縮。發(fā)送消息時,如果增加參數(shù) compression.type,就可以開啟消息壓縮:

publicstaticvoidmain(String[]args){
Propertiesprops=newProperties();
props.put("bootstrap.servers","localhost:9092");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//開啟消息壓縮
props.put("compression.type","gzip");
Producerproducer=newKafkaProducer<>(props);

ProducerRecordrecord=newProducerRecord<>("my_topic","key1","value1");

producer.send(record,newCallback(){
@Override
publicvoidonCompletion(RecordMetadatametadata,Exceptionexception){
if(exception!=null){
logger.error("sendingmessage error:", e);
}else{
logger.info("sendingmessage successful, Offset:", metadata.offset());
}
}
});

producer.close();
}

如果 compression.type 的值設(shè)置為 none,則不開啟壓縮。那消息是在什么時候進(jìn)行壓縮呢?前面提到過,生產(chǎn)者緩存一批消息后才會發(fā)送,在發(fā)送這批消息之前就會進(jìn)行壓縮,代碼如下:

publicRecordAppendResultappend(TopicPartitiontp,
longtimestamp,
byte[]key,
byte[]value,
Header[]headers,
Callbackcallback,
longmaxTimeToBlock)throwsInterruptedException{
//...
try{
//...
buffer=free.allocate(size,maxTimeToBlock);
synchronized(dq){
//...
RecordAppendResultappendResult=tryAppend(timestamp,key,value,headers,callback,dq);
if(appendResult!=null){
//Somebodyelsefoundusabatch,returntheonewewaitedfor!Hopefullythisdoesn'thappenoften...
returnappendResult;
}
//這批消息緩存已滿,這里進(jìn)行壓縮
MemoryRecordsBuilderrecordsBuilder=recordsBuilder(buffer,maxUsableMagic);
ProducerBatchbatch=newProducerBatch(tp,recordsBuilder,time.milliseconds());
FutureRecordMetadatafuture=Utils.notNull(batch.tryAppend(timestamp,key,value,headers,callback,time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

//Don'tdeallocatethisbufferinthefinallyblockasit'sbeingusedintherecordbatch
buffer=null;

returnnewRecordAppendResult(future,dq.size()>1||batch.isFull(),true);
}
}finally{
if(buffer!=null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

上面的 recordsBuilder 方法最終調(diào)用了下面 MemoryRecordsBuilder 的構(gòu)造方法。

publicMemoryRecordsBuilder(ByteBufferOutputStreambufferStream,
bytemagic,
CompressionTypecompressionType,
TimestampTypetimestampType,
longbaseOffset,
longlogAppendTime,
longproducerId,
shortproducerEpoch,
intbaseSequence,
booleanisTransactional,
booleanisControlBatch,
intpartitionLeaderEpoch,
intwriteLimit){
//省略其他代碼
this.appendStream=newDataOutputStream(compressionType.wrapForOutput(this.bufferStream,magic));
}

上面的 wrapForOutput 方法會根據(jù)配置的壓縮算法進(jìn)行壓縮或者選擇不壓縮。目前 Kafka 支持的壓縮算法包括:gzip、snappy、lz4,從 2.1.0 版本開始,Kafka 支持 Zstandard 算法。

在 Broker 端,會解壓 header 做一些校驗,但不會解壓消息體。消息體的解壓是在消費端,消費者拉取到一批消息后,首先會進(jìn)行解壓,然后進(jìn)行消息處理。

因為壓縮和解壓都是耗費 CPU 的操作,所以在開啟消息壓縮時,也要考慮生產(chǎn)者和消費者的 CPU 資源情況。

有了消息批量收集和壓縮,kafka 生產(chǎn)者發(fā)送消息的過程如下圖:

33f1d514-90dd-11ef-a511-92fbcf53809c.png

3 磁盤順序讀寫

順序讀寫省去了尋址的時間,只要一次尋址,就可以連續(xù)讀寫。

在固態(tài)硬盤上,順序讀寫的性能是隨機(jī)讀寫的好幾倍。而在機(jī)械硬盤上,尋址時需要移動磁頭,這個機(jī)械運動會花費很多時間,因此機(jī)械硬盤的順序讀寫性能是隨機(jī)讀寫的幾十倍。

Kafka 的 Broker 在寫消息數(shù)據(jù)時,首先為每個 Partition 創(chuàng)建一個文件,然后把數(shù)據(jù)順序地追加到該文件對應(yīng)的磁盤空間中,如果這個文件寫滿了,就再創(chuàng)建一個新文件繼續(xù)追加寫。這樣大大減少了尋址時間,提高了讀寫性能。

4 PageCache

Linux 系統(tǒng)中,所有文件 IO 操作都要通過 PageCache,PageCache 是磁盤文件在內(nèi)存中建立的緩存。當(dāng)應(yīng)用程序讀寫文件時,并不會直接讀寫磁盤上的文件,而是操作 PageCache。

33fa2b24-90dd-11ef-a511-92fbcf53809c.png

應(yīng)用程序?qū)懳募r,都先會把數(shù)據(jù)寫入 PageCache,然后操作系統(tǒng)定期地將 PageCache 的數(shù)據(jù)寫到磁盤上。如下圖:

34021e6a-90dd-11ef-a511-92fbcf53809c.png

而應(yīng)用程序在讀取文件數(shù)據(jù)時,首先會判斷數(shù)據(jù)是否在 PageCache 中,如果在則直接讀取,如果不在,則讀取磁盤,并且將數(shù)據(jù)緩存到 PageCache。

3409b486-90dd-11ef-a511-92fbcf53809c.png

Kafka 充分利用了 PageCache 的優(yōu)勢,當(dāng)生產(chǎn)者生產(chǎn)消息的速率和消費者消費消息的速率差不多時,Kafka 基本可以不用落盤就能完成消息的傳輸。

5 零拷貝

Kafka Broker 將消息發(fā)送給消費端時,即使命中了 PageCache,也需要將 PageCache 中的數(shù)據(jù)先復(fù)制到應(yīng)用程序的內(nèi)存空間,然后從應(yīng)用程序的內(nèi)存空間復(fù)制到 Socket 緩存區(qū),將數(shù)據(jù)發(fā)送出去。如下圖:

341c0708-90dd-11ef-a511-92fbcf53809c.png

Kafka 采用了零拷貝技術(shù)把數(shù)據(jù)直接從 PageCache 復(fù)制到 Socket 緩沖區(qū)中,這樣數(shù)據(jù)不用復(fù)制到用戶態(tài)的內(nèi)存空間,同時 DMA 控制器直接完成數(shù)據(jù)復(fù)制,不需要 CPU 參與。如下圖:

34240c00-90dd-11ef-a511-92fbcf53809c.png

Java 零拷貝技術(shù)采用 FileChannel.transferTo() 方法,底層調(diào)用了 sendfile 方法。

6 mmap

Kafka 的日志文件分為數(shù)據(jù)文件(.log)和索引文件(.index),Kafka 為了提高索引文件的讀取性能,對索引文件采用了 mmap 內(nèi)存映射,將索引文件映射到進(jìn)程的內(nèi)存空間,這樣讀取索引文件就不需要從磁盤進(jìn)行讀取。如下圖:

3440d9d4-90dd-11ef-a511-92fbcf53809c.png

7 總結(jié)

本文介紹了 Kafka 實現(xiàn)高性能用到的關(guān)鍵技術(shù),這些技術(shù)可以為我們學(xué)習(xí)和工作提供參考。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4977

    瀏覽量

    74420
  • 消息隊列
    +關(guān)注

    關(guān)注

    0

    文章

    34

    瀏覽量

    3308
  • kafka
    +關(guān)注

    關(guān)注

    0

    文章

    55

    瀏覽量

    5595

原文標(biāo)題:面試官:你說說 Kafka 為什么是高性能的?

文章出處:【微信號:小林coding,微信公眾號:小林coding】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點推薦

    泰克30+GHz高性能示波器的關(guān)鍵技術(shù)

    泰克公司最近宣布首款經(jīng)驗證采用 IBM 8HP 硅鍺 (SiGe) BiCMOS 特殊工藝技術(shù)設(shè)計的新型示波器平臺ASIC各項技術(shù)指標(biāo)優(yōu)于規(guī)定要求,實現(xiàn)了新型高性能示波器的設(shè)計目標(biāo),使多通道帶寬達(dá)
    發(fā)表于 07-24 07:47

    基于閃存存儲的Apache Kafka性能提升方法

    作者:Dennis Lattka我是美光科技的首席存儲解決方案工程師Dennis Lattka。這個頭銜的真正含義是,我要致力于確定如何利用閃存存儲改善工作負(fù)載應(yīng)用的性能和結(jié)果。為此,我決定對大數(shù)
    發(fā)表于 07-24 06:58

    基于發(fā)布與訂閱的消息系統(tǒng)Kafka

    Kafka權(quán)威指南》——初識 Kafka
    發(fā)表于 03-05 13:46

    Kafka基礎(chǔ)入門文檔

    kafka系統(tǒng)入門教程(原理、配置、集群搭建、Java應(yīng)用、Kafka-manager)
    發(fā)表于 03-12 07:22

    Kafka集群環(huán)境的搭建

    1、環(huán)境版本版本:kafka2.11,zookeeper3.4注意:這里zookeeper3.4也是基于集群模式部署。2、解壓重命名tar -zxvf
    發(fā)表于 01-05 17:55

    大數(shù)據(jù)開發(fā)最火技術(shù)Kafka背后的“黑科技”

    、低延遲等方面有很突出的表現(xiàn)。這篇文章不同于其他介紹Kafka使用或?qū)崿F(xiàn)的文章,只是談?wù)?b class='flag-5'>Kafka用了什么“黑科技”使他在性能方面有這么突出的表現(xiàn)。消息順序?qū)懭氪疟P磁盤大多數(shù)都還是機(jī)械結(jié)構(gòu)(SSD不在
    的頭像 發(fā)表于 10-22 17:53 ?1391次閱讀
    大數(shù)據(jù)開發(fā)最火<b class='flag-5'>技術(shù)</b><b class='flag-5'>Kafka</b><b class='flag-5'>背后</b>的“黑科技”

    Kafka的概念及Kafka的宕機(jī)

    問題要從一次Kafka的宕機(jī)開始說起。 筆者所在的是一家金融科技公司,但公司內(nèi)部并沒有采用在金融支付領(lǐng)域更為流行的 RabbitMQ ,而是采用了設(shè)計之初就為日志處理而生的 Kafka ,所以我一直
    的頭像 發(fā)表于 08-27 11:21 ?3156次閱讀
    <b class='flag-5'>Kafka</b>的概念及<b class='flag-5'>Kafka</b>的宕機(jī)

    Kafka如何做到那么高的性能

    有人說:他曾在一臺配置較好的機(jī)子上對 Kafka 進(jìn)行性能壓測,壓測結(jié)果是 Kafka 單個節(jié)點的極限處理能力接近每秒 2000萬 條消息,吞吐量達(dá)到每秒 600MB。
    的頭像 發(fā)表于 09-14 17:03 ?1836次閱讀

    Kafka 的簡介

    ,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間的訪問性能 高吞吐率。即使在非常廉價的機(jī)器上也能做到單機(jī)支持每秒100K條消息的傳輸 支持Kafka Server間的消息分區(qū),及分布式消費,同時保證每個
    的頭像 發(fā)表于 07-03 11:10 ?1444次閱讀
    <b class='flag-5'>Kafka</b> 的簡介

    Kafka中學(xué)習(xí)高性能系統(tǒng)如何設(shè)計

    相信各位小伙伴之前或多或少接觸過消息隊列,比較知名的包含 Rocket MQ 和 Kafka,在京東內(nèi)部使用的是自研的消息中間件 JMQ,從 JMQ2 升級到 JMQ4 的也是帶來了性能上的明顯提升,并且 JMQ4 的底層也是參考 Ka
    的頭像 發(fā)表于 07-17 11:25 ?1430次閱讀
    從<b class='flag-5'>Kafka</b>中學(xué)習(xí)<b class='flag-5'>高性能</b>系統(tǒng)如何設(shè)計

    Kafka架構(gòu)技術(shù)Kafka的架構(gòu)和客戶端API設(shè)計

    Kafka 給自己的定位是事件流平臺(event stream platform)。因此在消息隊列中經(jīng)常使用的 "消息"一詞,在 Kafka 中被稱為 "事件"。
    的頭像 發(fā)表于 10-10 15:41 ?3263次閱讀
    <b class='flag-5'>Kafka</b>架構(gòu)<b class='flag-5'>技術(shù)</b>:<b class='flag-5'>Kafka</b>的架構(gòu)和客戶端API設(shè)計

    golang中使用kafka的綜合指南

    kafka是一個比較流行的分布式、可拓展、高性能、可靠的流處理平臺。在處理kafka的數(shù)據(jù)時,這里有確保處理效率和可靠性的多種最佳實踐。本文將介紹這幾種實踐方式,并通過sarama實現(xiàn)他們。
    的頭像 發(fā)表于 11-30 11:18 ?1563次閱讀

    華為云 FlexusX 實例下的 Kafka 集群部署實踐與性能優(yōu)化

    前言 華為云 FlexusX 實例,以創(chuàng)新的柔性算力技術(shù),為 Kafka 集群部署帶來前所未有的性能飛躍。其靈活的 CPU 與內(nèi)存配比,結(jié)合智能調(diào)度與加速技術(shù),讓
    的頭像 發(fā)表于 01-07 17:23 ?936次閱讀
    華為云 FlexusX 實例下的 <b class='flag-5'>Kafka</b> 集群部署實踐與<b class='flag-5'>性能</b>優(yōu)化

    SDX75:5G-A高性能路由器背后的機(jī)遇與困境

    5G-A高性能路由器背后的機(jī)遇與困境
    的頭像 發(fā)表于 05-24 17:03 ?1377次閱讀
    SDX75:5G-A<b class='flag-5'>高性能</b>路由器<b class='flag-5'>背后</b>的機(jī)遇與困境

    Kafka生產(chǎn)環(huán)境應(yīng)用方案

    Apache Kafka作為分布式流處理平臺,在現(xiàn)代大數(shù)據(jù)架構(gòu)中扮演著消息中間件的核心角色。本文將從運維工程師的角度,詳細(xì)介紹Kafka在生產(chǎn)環(huán)境中的部署方案、配置優(yōu)化、監(jiān)控運維等關(guān)鍵技術(shù)。通過實戰(zhàn)案例和代碼示例,幫助運維團(tuán)隊構(gòu)
    的頭像 發(fā)表于 07-09 09:56 ?695次閱讀
    略阳县| 白玉县| 淅川县| 长子县| 榆林市| 井研县| 罗平县| 偃师市| 临澧县| 邯郸县| 郁南县| 临武县| 秀山| 偃师市| 乌鲁木齐市| 松江区| 白朗县| 齐齐哈尔市| 冕宁县| 呼图壁县| 千阳县| 阿克陶县| 桐柏县| 华坪县| 澄城县| 榆中县| 西乌珠穆沁旗| 海南省| 宁城县| 潞城市| 海淀区| 桃园市| 金川县| 宜兰市| 攀枝花市| 青岛市| 扶余县| 广宁县| 普陀区| 钟山县| 庄河市|