日B视频 亚洲,啪啪啪网站一区二区,91色情精品久久,日日噜狠狠色综合久,超碰人妻少妇97在线,999青青视频,亚洲一区二卡,让本一区二区视频,日韩网站推荐

0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

利用KoP如何將Pulsar數(shù)據(jù)快速且無縫接入Apache Doris

電子工程師 ? 來源:OSC開源社區(qū) ? 作者:OSC開源社區(qū) ? 2022-08-08 15:13 ? 次閱讀
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

KoP 架構(gòu)介紹

KoP 是 Kafka on Pulsar 的簡寫,顧名思義就是如何在 Pulsar 上實(shí)現(xiàn)對 Kafka 數(shù)據(jù)的讀寫。KoP 將 Kafka 協(xié)議處理插件引入 Pulsar Broker 來實(shí)現(xiàn) Apache Pulsar 對 Apache Kafka 協(xié)議的支持。將 KoP 協(xié)議處理插件添加到現(xiàn)有 Pulsar 集群后,用戶不用修改代碼就可以將現(xiàn)有的 Kafka 應(yīng)用程序和服務(wù)遷移到 Pulsar。

Apache Pulsar 主要特點(diǎn)如下:

利用企業(yè)級多租戶特性簡化運(yùn)營。

避免數(shù)據(jù)搬遷,簡化操作。

利用 Apache BookKeeper 和分層存儲持久保留事件流。

利用 Pulsar Functions 進(jìn)行無服務(wù)器化事件處理。

KoP 架構(gòu)如下圖,通過圖可以看到 KoP 引入一個新的協(xié)議處理插件,該協(xié)議處理插件利用 Pulsar 的現(xiàn)有組件(例如 Topic 發(fā)現(xiàn)、分布式日志庫-ManagedLedger、cursor 等)來實(shí)現(xiàn) Kafka 傳輸協(xié)議。

Routine Load 訂閱 Pulsar 數(shù)據(jù)思路

Apache Doris Routine Load 支持了將 Kafka 數(shù)據(jù)接入 Apache Doris,并保障了數(shù)據(jù)接入過程中的事務(wù)性操作。Apache Pulsar 定位為一個云原生時代企業(yè)級的消息發(fā)布和訂閱系統(tǒng),已經(jīng)在很多線上服務(wù)使用。那么 Apache Pulsar 用戶如何將數(shù)據(jù)接入 Apache Doris 呢,答案是通過 KoP 實(shí)現(xiàn)。

由于 KoP 直接在 Pulsar 側(cè)提供了對 Kafka 的兼容,那么對于 Apache Doris 來說可以像使用 Kafka 一樣使用 Plusar。整個過程對于 Apache Doris 來說無需任務(wù)改變,就能將 Pulsar 數(shù)據(jù)接入 Apache Doris,并且可以獲得 Routine Load 的事務(wù)性保障。

--------------------------

| Apache Doris |

| --------------- |

| | Routine Load | |

| --------------- |

--------------------------

|Kafka Protocol(librdkafka)

------------v--------------

| --------------- |

| | KoP | |

| --------------- |

| Apache Pulsar |

--------------------------

操作實(shí)踐

Pulsar Standalone 安裝環(huán)境準(zhǔn)備:

JDK 安裝:略

下載 Pulsar 二進(jìn)制包,并解壓:

#下載

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz

#解壓并進(jìn)入安裝目錄

tar xvfz apache-pulsar-2.10.0-bin.tar.gz

cd apache-pulsar-2.10.0

組件編譯和安裝

1. 下載 KoP 源碼

git clone https://github.com/streamnative/kop.git

cd kop

2. 編譯 KoP 項(xiàng)目

mvn clean install -DskipTests

3. protocols 配置:在解壓后的 apache-pulsar 目錄下創(chuàng)建 protocols文 件夾,并把編譯好的 nar 包復(fù)制到 protocols 文件夾中。

mkdir apache-pulsar-2.10.0/protocols

# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols

cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

4. 添加后的結(jié)果查看

[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/

pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

KoP 配置添加

1. 在 standalone.conf 或者 broker.conf 添加如下配置

#kop適配的協(xié)議

messagingProtocols=kafka

#kop 的NAR文件路徑

protocolHandlerDirectory=。/protocols

#是否允許自動創(chuàng)建topic

allowAutoTopicCreationType=partitioned

2. 添加如下服務(wù)監(jiān)聽配置

# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0

kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.

# If it’s not configured, it will be the same with `kafkaListeners` config by default

kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092

brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

brokerDeleteInactiveTopicsEnabled=false

當(dāng)出現(xiàn)如下錯誤:

java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

添加如下配置,開啟 transactionCoordinatorEnabled

kafkaTransactionCoordinatorEnabled=true

transactionCoordinatorEnabled=true

Pulsar 啟動

#前臺啟動

#bin/pulsar standalone

#后臺啟動

pulsar-daemon start standalone

創(chuàng)建 Doris 數(shù)據(jù)庫和建表

#進(jìn)入Doris

mysql -u root -h 127.0.0.1 -P 9030

# 創(chuàng)建數(shù)據(jù)庫

create database pulsar_doris;

#切換數(shù)據(jù)庫

use pulsar_doris;

#創(chuàng)建clicklog表

CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog

`clickTime` DATETIME NOT NULL COMMENT “點(diǎn)擊時間”,

`type` String NOT NULL COMMENT “點(diǎn)擊類型”,

`id` VARCHAR(100) COMMENT “唯一id”,

`user` VARCHAR(100) COMMENT “用戶名稱”,

`city` VARCHAR(50) COMMENT “所在城市”

DUPLICATE KEY(`clickTime`, `type`)

DISTRIBUTED BY HASH(`type`) BUCKETS 1

PROPERTIES (

“replication_allocation” = “tag.location.default: 1”

);

創(chuàng)建 Routine Load 任務(wù)

CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog

COLUMNS(clickTime,id,type,user)

PROPERTIES

“desired_concurrent_number”=“3”,

“max_batch_interval” = “20”,

“max_batch_rows” = “300000”,

“max_batch_size” = “209715200”,

“strict_mode” = “false”,

“format” = “json”

FROM KAFKA

“kafka_broker_list” = “127.0.0.1:9092”,

“kafka_topic” = “test”,

“property.group.id” = “doris”

);

上述命令中的參數(shù)解釋如下:

pulsar_doris :Routine Load 任務(wù)所在的數(shù)據(jù)庫

load_from_pulsar_test:Routine Load 任務(wù)名稱

clicklog:Routine Load 任務(wù)的目標(biāo)表,也就是配置 Routine Load 任務(wù)將數(shù)據(jù)導(dǎo)入到 Doris 哪個表中。

strict_mode:導(dǎo)入是否為嚴(yán)格模式,這里設(shè)置為 False。

format:導(dǎo)入數(shù)據(jù)的類型,這里配置為 Json。

kafka_broker_list:Kafka Broker 服務(wù)的地址

kafka_broker_list:Kafka Topic 名稱,也就是同步哪個 Topic 上的數(shù)據(jù)。

property.group.id:消費(fèi)組 ID

數(shù)據(jù)導(dǎo)入和測試

1. 數(shù)據(jù)導(dǎo)入 構(gòu)造一個 ClickLog 的數(shù)據(jù)結(jié)構(gòu),并調(diào)用 Kafka 的 Producer 發(fā)送 5000 萬條數(shù)據(jù)到 Pulsar。 ClickLog 數(shù)據(jù)結(jié)構(gòu)如下:

public class ClickLog {

private String id;

private String user;

private String city;

private String clickTime;

private String type;

。.. //省略getter和setter

}

消息構(gòu)造和發(fā)送的核心代碼邏輯如下:

String strDateFormat = “yyyy-MM-dd HHss”;

@Autowired

private Producer producer;

try {

for(int j =0 ; j《50000;j++){

int batchSize = 1000;

for(int i = 0 ; i《batchSize ;i++){

ClickLog clickLog = new ClickLog();

clickLog.setId(UUID.randomUUID().toString());

SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);

clickLog.setClickTime(simpleDateFormat.format(new Date()));

clickLog.setType(“webset”);

clickLog.setUser(“user”+ new Random().nextInt(1000) +i);

producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));

}

}

} catch (Exception e) {

e.printStackTrace();

}

2. ROUTINE LOAD 任務(wù)查看執(zhí)行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;命令,查看導(dǎo)入任務(wù)的狀態(tài)。

mysql》 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test G;

*************************** 1. row ***************************

Id: 87873

Name: load_from_pulsar_test

CreateTime: 2022-05-31 1234

PauseTime: NULL

EndTime: NULL

DbName: default_cluster:pulsar_doris

TableName: clicklog1

State: RUNNING

DataSourceType: KAFKA

CurrentTaskNum: 1

JobProperties: {“partitions”:“*”,“columnToColumnExpr”:“clickTime,id,type,user”,“maxBatchIntervalS”:“20”,“whereExpr”:“*”,“dataFormat”:“json”,“timezone”:“Europe/London”,“send_batch_parallelism”:“1”,“precedingFilter”:“*”,“mergeType”:“APPEND”,“format”:“json”,“json_root”:“”,“maxBatchSizeBytes”:“209715200”,“exec_mem_limit”:“2147483648”,“strict_mode”:“false”,“jsonpaths”:“”,“deleteCondition”:“*”,“desireTaskConcurrentNum”:“3”,“maxErrorNum”:“0”,“strip_outer_array”:“false”,“currentTaskConcurrentNum”:“1”,“execMemLimit”:“2147483648”,“num_as_string”:“false”,“fuzzy_parse”:“false”,“maxBatchRows”:“300000”}

DataSourceProperties: {“topic”:“test”,“currentKafkaPartitions”:“0”,“brokerList”:“127.0.0.1:9092”}

CustomProperties: {“group.id”:“doris”,“kafka_default_offsets”:“OFFSET_END”,“client.id”:“doris.client”}

Statistic: {“receivedBytes”:5739001913,“runningTxns”:[],“errorRows”:0,“committedTaskNum”:168,“l(fā)oadedRows”:50000000,“l(fā)oadRowsRate”:23000,“abortedTaskNum”:1,“errorRowsAfterResumed”:0,“totalRows”:50000000,“unselectedRows”:0,“receivedBytesRate”:2675000,“taskExecuteTimeMs”:2144799}

Progress: {“0”:“51139566”}

Lag: {“0”:0}

ReasonOfStateChanged:

ErrorLogUrls:

OtherMsg:

1 row in set (0.00 sec)

ERROR:

No query specified

從上面結(jié)果可以看到 totalRows 為 50000000,errorRows 為 0。說明數(shù)據(jù)不丟不重的導(dǎo)入 Apache Doris 了。

3. 數(shù)據(jù)統(tǒng)計驗(yàn)證執(zhí)行如下命令統(tǒng)計表中的數(shù)據(jù),發(fā)現(xiàn)統(tǒng)計的結(jié)果也是 50000000,符合預(yù)期。

mysql》 select count(*) from clicklog;

+----------+

| count(*) |

+----------+

| 50000000 |

+----------+

1 row in set (3.73 sec)

mysql》

通過 KoP 我們實(shí)現(xiàn)了將 Apache Pulsar 數(shù)據(jù)無縫接入 Apache Doris ,無需對 Routine Load 任務(wù)進(jìn)行任何修改,并保障了數(shù)據(jù)導(dǎo)入過程中的事務(wù)性。與此同時,Apache Doris 社區(qū)已經(jīng)啟動了 Apache Pulsar 原生導(dǎo)入支持的設(shè)計,相信在不久后就可以直接訂閱 Pulsar 中的消息數(shù)據(jù),并保證數(shù)據(jù)導(dǎo)入過程中的 Exactly-Once 語義。

審核編輯:郭婷

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 服務(wù)器
    +關(guān)注

    關(guān)注

    14

    文章

    10371

    瀏覽量

    91771
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4977

    瀏覽量

    74417

原文標(biāo)題:如何將Pulsar數(shù)據(jù)快速且無縫接入Apache Doris

文章出處:【微信號:OSC開源社區(qū),微信公眾號:OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請注明出處。

收藏 人收藏
加入交流群
微信小助手二維碼

掃碼添加小助手

加入工程師交流群

    評論

    相關(guān)推薦
    熱點(diǎn)推薦

    AD7623:16位、1.33 MSPS PulSAR? ADC的深度解析

    的16位、1.33 MSPS PulSAR? ADC——AD7623,它在醫(yī)療儀器、高速數(shù)據(jù)采集、數(shù)字信號處理等眾多領(lǐng)域都有著廣泛的應(yīng)用。 文件下載: AD7623.pdf 一、AD7623的特性亮點(diǎn)
    的頭像 發(fā)表于 04-03 09:35 ?430次閱讀

    探索AD7699:16位8通道500 kSPS PulSAR ADC的卓越性能與應(yīng)用

    通道、500 kSPS的PulSAR ADC,以其出色的性能和豐富的功能,成為眾多應(yīng)用場景中的理想選擇。本文深入剖析AD7699的特性、工作原理、應(yīng)用以及設(shè)計要點(diǎn),為電子工程師們提供全面的參考。 文件
    的頭像 發(fā)表于 04-02 11:20 ?493次閱讀

    16位6MSPS PulSAR差分ADC AD7625:高性能數(shù)據(jù)轉(zhuǎn)換的理想之選

    16位6MSPS PulSAR差分ADC AD7625:高性能數(shù)據(jù)轉(zhuǎn)換的理想之選 一、引言 在電子設(shè)計領(lǐng)域,模擬到數(shù)字的轉(zhuǎn)換是一項(xiàng)至關(guān)重要的技術(shù)。ADC(模擬 - 數(shù)字轉(zhuǎn)換器)作為連接模擬世界和數(shù)
    的頭像 發(fā)表于 04-01 15:35 ?186次閱讀

    電子工程師必看:AD7903 雙差分 16 位 1 MSPS PulSAR ADC 深度解析

    、AD7903 的特性亮點(diǎn) 1. 高精度與高速度 AD7903 具有 16 位分辨率無失碼,這意味著它能夠提供非常精確的數(shù)字輸出。其吞吐量高達(dá) 1 MSPS,能夠快速模擬信
    的頭像 發(fā)表于 03-30 12:10 ?223次閱讀

    EM儲能網(wǎng)關(guān) ZWS智慧儲能云應(yīng)用(25) — 如何將電站高效接入省站平臺?

    隨著儲能電站的增長,各省市正在逐步出臺儲能監(jiān)管政策,要求業(yè)主將電站接入省站平臺。ZLG智慧儲能云平臺,支持省站平臺一鍵接入,助力儲能終端業(yè)主將電站快速接入省站平臺。簡介隨著新能源儲能電
    的頭像 發(fā)表于 03-11 11:38 ?276次閱讀
    EM儲能網(wǎng)關(guān) ZWS智慧儲能云應(yīng)用(25) — <b class='flag-5'>如何將</b>電站高效<b class='flag-5'>接入</b>省站平臺?

    工業(yè)數(shù)據(jù)中臺支持接入MySQL數(shù)據(jù)庫嗎

    工業(yè)數(shù)據(jù)中臺完全支持接入MySQL數(shù)據(jù)庫 ,通過數(shù)據(jù)同步、集成與治理等技術(shù)手段,能夠充分發(fā)揮MySQL在
    的頭像 發(fā)表于 12-04 11:23 ?516次閱讀
    工業(yè)<b class='flag-5'>數(shù)據(jù)</b>中臺支持<b class='flag-5'>接入</b>MySQL<b class='flag-5'>數(shù)據(jù)</b>庫嗎

    如何將GCC項(xiàng)目導(dǎo)入NuEclipse?

    如何將GCC項(xiàng)目導(dǎo)入NuEclipse?
    發(fā)表于 09-01 07:04

    請問編譯程序時如何將數(shù)據(jù)放入Flash固定地址?

    編譯程序時如何將數(shù)據(jù)放入Flash固定地址?
    發(fā)表于 08-29 06:40

    編譯程序時如何將數(shù)據(jù)放入Flash固定地址?

    編譯程序時如何將數(shù)據(jù)放入Flash固定地址?
    發(fā)表于 08-27 13:16

    請問如何將ISP寫入Nuvoton 8051 MCU系列?

    如何將ISP寫入Nuvoton 8051 MCU系列?
    發(fā)表于 08-18 07:34

    臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子1200 PLC的完整指南

    在工業(yè)自動化領(lǐng)域,設(shè)備間的無縫通信是提升生產(chǎn)效率的關(guān)鍵。本文詳細(xì)解析如何將臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子1200系列PLC,實(shí)現(xiàn)
    的頭像 發(fā)表于 08-05 14:29 ?1056次閱讀
    臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)<b class='flag-5'>接入</b>西門子1200 PLC的完整指南

    工業(yè)自動化通信方案:臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子S7-1200 PLC系統(tǒng)詳解

    在工業(yè)自動化領(lǐng)域,設(shè)備間的通信如同神經(jīng)網(wǎng)絡(luò)的信號傳遞,需要精準(zhǔn)高效的連接方式。本文聚焦如何將臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)接入西門子S7-1200 PLC系統(tǒng),
    的頭像 發(fā)表于 08-04 18:25 ?1169次閱讀
    工業(yè)自動化通信方案:臺灣偉斯掃碼槍通過RS232轉(zhuǎn)Profinet網(wǎng)關(guān)<b class='flag-5'>接入</b>西門子S7-1200 PLC系統(tǒng)詳解

    使用NVIDIA GPU加速Apache Spark中Parquet數(shù)據(jù)掃描

    隨著各行各業(yè)的企業(yè)數(shù)據(jù)規(guī)模不斷增長,Apache Parquet 已經(jīng)成為了一種主流數(shù)據(jù)存儲格式。Apache Parquet 是一種列式存儲格式,專為高效的大規(guī)模
    的頭像 發(fā)表于 07-23 10:52 ?1247次閱讀
    使用NVIDIA GPU加速<b class='flag-5'>Apache</b> Spark中Parquet<b class='flag-5'>數(shù)據(jù)</b>掃描

    Modbus TCP轉(zhuǎn)Profibus網(wǎng)關(guān)如何快速把流量計接入到DCS?

    在工業(yè)自動化領(lǐng)域,設(shè)備間的協(xié)議互通往往如同語言不通的對話者,需要一位“翻譯官”才能實(shí)現(xiàn)高效協(xié)作。本文圍繞Modbus TCP轉(zhuǎn)Profibus網(wǎng)關(guān)的應(yīng)用,解析如何通過這一技術(shù)橋梁,流量計數(shù)據(jù)
    的頭像 發(fā)表于 07-07 16:50 ?688次閱讀
    Modbus TCP轉(zhuǎn)Profibus網(wǎng)關(guān)如何<b class='flag-5'>快速</b>把流量計<b class='flag-5'>接入</b>到DCS?

    如何將K230 image轉(zhuǎn)成jpg?

    如何將K230image轉(zhuǎn)成jpg,我想獲取圖片,傳回我自己的系統(tǒng) 你好,可以使用img.to_jpeg()來對其進(jìn)行壓縮。
    發(fā)表于 06-12 06:41
    桑植县| 诸城市| 耿马| 房产| 莱阳市| 阳东县| 闽侯县| 朝阳区| 吴桥县| 阿克| 湖州市| 阿拉善右旗| 类乌齐县| 北碚区| 东莞市| 乌拉特后旗| 锦州市| 洛南县| 广东省| 习水县| 扶风县| 秭归县| 湘潭市| 玛纳斯县| 仁怀市| 宜丰县| 辽宁省| 灵武市| 四平市| 封开县| 光泽县| 廊坊市| 浦县| 中西区| 和林格尔县| 南召县| 辉县市| 凤翔县| 富蕴县| 和平县| 栾城县|