国产精品天干天干,亚洲毛片在线,日韩gay小鲜肉啪啪18禁,女同Gay自慰喷水

歡迎光臨散文網(wǎng) 會員登陸 & 注冊

批流一體數(shù)據(jù)集成工具ChunJun同步Hive事務(wù)表原理詳解及實戰(zhàn)分享

2022-08-04 14:09 作者:袋鼠云  | 我要投稿

本期我們帶大家回顧一下無倦同學(xué)的直播分享《Chunjun同步Hive事務(wù)表詳解》


Hive事務(wù)表的結(jié)構(gòu)及原理


Hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,用來進行數(shù)據(jù)提取、轉(zhuǎn)化、加載,這是一種可以存儲、查詢和分析存儲在Hadoop中的大規(guī)模數(shù)據(jù)的機制。Hive數(shù)據(jù)倉庫工具能將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,并提供SQL查詢功能,能將SQL語句轉(zhuǎn)變成MapReduce任務(wù)來執(zhí)行。


在分享Hive事務(wù)表的具體內(nèi)容前,我們先來了解下HIve 事務(wù)表在 HDFS 存儲上的一些限制。


Hive雖然支持了具有ACID語義的事務(wù),但是沒有像在MySQL中使用那樣方便,有很多局限性,具體限制如下:

  • 尚不支持BEGIN,COMMIT和ROLLBACK,所有語言操作都是自動提交的

  • 僅支持ORC文件格式(STORED AS ORC)

  • ?默認(rèn)情況下事務(wù)配置為關(guān)閉,需要配置參數(shù)開啟使用

  • 表必須是分桶表(Bucketed)才可以使用事務(wù)功能

  • 表必須內(nèi)部表,外部表無法創(chuàng)建事務(wù)表

  • 表參數(shù)transactional必須為true

  • 外部表不能成為ACID表,不允許從非ACID會話讀取/寫入ACID表

以下矩陣包括可以使用Hive創(chuàng)建的表的類型、是否支持ACID屬性、所需的存儲格式以及關(guān)鍵的SQL操作。


了解完Hive事務(wù)表的限制,現(xiàn)在我們具體了解下Hive事務(wù)表的內(nèi)容。

01

事務(wù)表文件名字詳解

基礎(chǔ)目錄:

$partition/base_$wid/$bucket


增量目錄:

$partition/delta_$wid_$wid_$stid/$bucket?


參數(shù)目錄:

$partition/delete_delta_$wid_$wid_$stid/$bucket?


02

事務(wù)表文件內(nèi)容詳解

$ orc-tools data bucket_00000

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","age":18}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","age":19}}

{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","age":20}}


1.operation 0 表示插入、1 表示更新,2 表示刪除。由于使用了 split-update,UPDATE 是不會出現(xiàn)的。


2.originalTransaction是該條記錄的原始寫事務(wù) ID:

a、對于 INSERT 操作,該值和 currentTransaction 是一致的;

b、對于 DELETE,則是該條記錄第一次插入時的寫事務(wù) ID。


3.bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進制位的含義為:

a、1-3 位:編碼版本,當(dāng)前是 001;

b、4 位:保留;

c、5-16 位:分桶 ID,由 0 開始。分桶 ID 是由 CLUSTERED BY 子句所指定的字段、以及分桶的數(shù)量決定的。該值和 bucket_N 中的 N 一致;

d、17-20 位:保留;

e、21-32 位:語句 ID;

舉例來說,整型 536936448 的二進制格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1。


4.rowId 是一個自增的唯一 ID,在寫事務(wù)和分桶的組合中唯一;


5.currentTransaction 當(dāng)前的寫事務(wù) ID;


6.row 具體數(shù)據(jù)。對于 DELETE 語句,則為 null。

03

更新 Hive 事務(wù)表數(shù)據(jù)

UPDATE employee SET age = 21 WHERE id = 2;


這條語句會先查詢出所有符合條件的記錄,獲取它們的 row_id 信息,然后分別創(chuàng)建 delete 和 delta 目錄:


/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000


/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000?(update)


/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000?(update)


delete_delta_0000002_0000002_0000/bucket_00000

包含了刪除的記錄:

{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}


delta_0000002_0000002_0000/bucket_00000

包含更新后的數(shù)據(jù):

{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":21}}


04

Row_ID 信息怎么查?


05

事務(wù)表壓縮(Compact)

隨著寫操作的積累,表中的 delta 和 delete 文件會越來越多,事務(wù)表的讀取過程中需要合并所有文件,數(shù)量一多勢必會影響效率,此外小文件對 HDFS 這樣的文件系統(tǒng)也不夠友好,因此Hive 引入了壓縮(Compaction)的概念,分為?Minor??Major?兩類。


●?Minor

Minor Compaction 會將所有的 delta 文件壓縮為一個文件,delete 也壓縮為一個。壓縮后的結(jié)果文件名中會包含寫事務(wù) ID 范圍,同時省略掉語句 ID。


壓縮過程是在 Hive Metastore 中運行的,會根據(jù)一定閾值自動觸發(fā)。我們也可以使用如下語句人工觸發(fā):

ALTER TABLE dtstack COMPACT 'MINOR'。


●?Major

Major Compaction 會將所有的 delta 文件,delete 文件壓縮到一個 base 文件。壓縮后的結(jié)果文件名中會包含所有寫事務(wù) ID 的最大事務(wù) ID。


壓縮過程是在 Hive Metastore 中運行的,會根據(jù)一定閾值自動觸發(fā)。我們也可以使用如下語句人工觸發(fā):

ALTER TABLE dtstack COMPACT 'MAJOR'。


06

文件內(nèi)容詳解

ALTER TABLE employee COMPACT 'minor';


語句執(zhí)行前:

/user/hive/warehouse/employee/delta_0000001_0000001_0000

/user/hive/warehouse/employee/delta_0000002_0000002_0000? ?(insert 創(chuàng)建, mary的數(shù)據(jù))

/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 (update)

/user/hive/warehouse/employee/delta_0000002_0000002_0001 (update)


語句執(zhí)行后:

/user/hive/warehouse/employee/delete_delta_0000001_0000002

/user/hive/warehouse/employee/delta_0000001_0000002


07

讀 Hive 事務(wù)表

我們可以看到 ACID 事務(wù)表中會包含三類文件,分別是?base、delta、以及 delete。文件中的每一行數(shù)據(jù)都會以 row__id 作為標(biāo)識并排序。從 ACID 事務(wù)表中讀取數(shù)據(jù)就是對這些文件進行合并,從而得到最新事務(wù)的結(jié)果。這一過程是在 OrcInputFormat 和 OrcRawRecordMerger 類中實現(xiàn)的,本質(zhì)上是一個合并排序的算法。


以下列文件為例,產(chǎn)生這些文件的操作為:

1. 插入三條記錄

2. 進行一次 Major Compaction

3. 然后更新兩條記錄。


1-0-0-1 是對 originalTransaction - bucketId - rowId - currentTra


08

合并算法

對所有數(shù)據(jù)行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:

originalTransaction-bucketId-rowId-currentTransaction

(base_1)1-0-0-1

(delete_2)1-0-1-2# 被跳過(DELETE)

(base_1)1-0-1-1 # 被跳過(當(dāng)前記錄的 row_id(1) 和上條數(shù)據(jù)一樣)

(delete_2)1-0-2-2 # 被跳過(DELETE)

(base_1)1-0-2-1 # 被跳過(當(dāng)前記錄的 row_id(2) 和上條數(shù)據(jù)一樣)

(delta_2)2-0-0-2

(delta_2)2-0-1-2


獲取第一條記錄;

?1. ?如果當(dāng)前記錄的 row_id 和上條數(shù)據(jù)一樣,則跳過;

?2. 如果當(dāng)前記錄的操作類型為 DELETE,也跳過;

通過以上兩條規(guī)則,對于 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;

如果沒有跳過,記錄將被輸出給下游;

重復(fù)以上過程。


合并過程是流式的,即 Hive 會將所有文件打開,預(yù)讀第一條記錄,并將 row__id 信息存入到 ReaderKey 類型中。

ChunJun讀寫Hive事務(wù)表實戰(zhàn)


了解完Hive事務(wù)表的基本原理后,我們來為大家分享如何在ChunJun中讀寫Hive事務(wù)表。

01

事務(wù)表數(shù)據(jù)準(zhǔn)備

-- 創(chuàng)建事務(wù)表

create table dtstack(

? ? id int,

? ? name string,

? ? age int

)

stored as orc

TBLPROPERTIES('transactional'='true');


-- 插入 10 條測試數(shù)據(jù)

insert into dtstack (id, name, age)

values (1, "aa", 11), (2, "bb", 12), (3, "cc", 13), (4, "dd", 14), (5, "ee", 15),

? ? ? ?(6, "ff", 16), (7, "gg", 17), (8, "hh", 18), (9, "ii", 19), (10, "jj", 20);


02

配置 ChunJun json 腳本



03

提交任務(wù)(讀寫事務(wù)表)

# 啟動 Session??

/root/wujuan/flink-1.12.7/bin/yarn-session.sh -t $ChunJun_HOME -d


#提交 Yarn Session 任務(wù)

# 讀取事務(wù)表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job? /root/wujuan/ChunJun/ChunJun-examples/json/hive3/hive3_transaction_stream.json -confProp {\"yarn.application.id\":\"application_1650792512832_0134\"}


# 寫入事務(wù)表

/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job? /root/wujuan/ChunJun/ChunJun-examples/json/hive3/stream_hive3_transaction.json -confProp {\"yarn.application.id\":\"application_1650792512832_0134\"}


根據(jù)上一行結(jié)果替換 yarn.application.id


ChunJun 讀寫Hive事務(wù)表源碼分析


壓縮器是在 Metastore 境內(nèi)運行的一組后臺程序,用于支持 ACID 系統(tǒng)。它由 Initiator、 Worker、 Cleaner、 AcidHouseKeeperService 和其他一些組成。

01

Compactor

●?Delta File Compaction

在不斷的對表修改中,會創(chuàng)建越來越多的delta文件,需要這些文件需要被壓縮以保證性能。有兩種類型的壓縮,即(minor)小壓縮和(major)大壓縮:

  • minor 需要一組現(xiàn)有的delta文件,并將它們重寫為每個桶的一個delta文件

  • major 需要一個或多個delta文件和桶的基礎(chǔ)文件,并將它們改寫成每個桶的新基礎(chǔ)文件。major 需要更久,但是效果更好

所有的壓縮工作都是在后臺進行的,并不妨礙對數(shù)據(jù)的并發(fā)讀寫。在壓縮之后系統(tǒng)會等待,直到所有舊文件的讀都結(jié)束,然后刪除舊文件。


Initiator

這個模塊負責(zé)發(fā)現(xiàn)哪些表或分區(qū)要進行壓縮。這應(yīng)該在元存儲中使用hive.compactor.initiator.on來啟用。??每個 Compact 任務(wù)處理一個分區(qū)(如果表是未分區(qū)的,則處理整個表)。如果某個分區(qū)的連續(xù)壓實失敗次數(shù)超過 hive.compactor.initiator.failed.compacts.threshold,這個分區(qū)的自動壓縮調(diào)度將停止。


●?Worker

每個Worker處理一個壓縮任務(wù)。?一個壓縮是一個MapReduce作業(yè),其名稱為以下形式。<hostname>-compactor-<db>.<table>.<partition>。?每個Worker將作業(yè)提交給集群(如果定義了hive.compactor.job.queue),并等待作業(yè)完成。hive.compactor.worker.threads決定了每個Metastore中Worker的數(shù)量。?Hive倉庫中的Worker總數(shù)決定了并發(fā)壓縮的最大數(shù)量。


●?Cleaner

這個進程是在壓縮后,在確定不再需要delta文件后,將其刪除。


●?AcidHouseKeeperService

這個進程尋找那些在hive.txn.timeout時間內(nèi)沒有心跳的事務(wù)并中止它們。系統(tǒng)假定發(fā)起交易的客戶端停止心跳后崩潰了,它鎖定的資源應(yīng)該被釋放。


●?SHOW COMPACTIONS

該命令顯示當(dāng)前運行的壓實和最近的壓實歷史(可配置保留期)的信息。這個歷史顯示從HIVE-12353開始可用。


●?Compact 重點配置


02

如何 debug Hive

1. debug hive client

hive --debug


2. debug hive metastore

hive --service metastore --debug:port=8881,mainSuspend=y,childSuspend=n --hiveconf hive.root.logger=DEBUG,console


3. debug hive mr 任務(wù)


03

讀寫過濾和CompactorMR排序的關(guān)鍵代碼


04

Minor&Major 合并源碼(CompactorMR Map 類)

ChunJun 文件系統(tǒng)未來規(guī)劃


最后為大家介紹ChunJun 文件系統(tǒng)未來規(guī)劃:


●?基于 FLIP-27 優(yōu)化文件系統(tǒng)

批流統(tǒng)一實現(xiàn),簡單的線程模型,分片和讀數(shù)據(jù)分離。


●?Hive 的分片優(yōu)化

分片更精細化,粒度更細,充分發(fā)揮并發(fā)能力


●?完善 Exactly Once 語義

加強異常情況健壯性。


●?HDFS 文件系統(tǒng)的斷點續(xù)傳

根據(jù)分區(qū),文件個數(shù),文件行數(shù)等確定端點位置,狀態(tài)存儲在 checkpoint 里面。


●?實時采集文件

實時監(jiān)控目錄下的多個追加文件。


●?文件系統(tǒng)格式的通用性

JSON、CSV、Text、XM、EXCELL 統(tǒng)一抽取公共包。


批流一體數(shù)據(jù)集成工具ChunJun同步Hive事務(wù)表原理詳解及實戰(zhàn)分享的評論 (共 條)

分享到微博請遵守國家法律
博罗县| 偏关县| 麦盖提县| 铁岭县| 东辽县| 河源市| 肇源县| 扬州市| 张北县| 清涧县| 中江县| 梁山县| 萝北县| 阿合奇县| 红桥区| 志丹县| 湘乡市| 白山市| 青岛市| 页游| 古蔺县| 乾安县| 桐城市| 呼图壁县| 鹿泉市| 丹江口市| 芒康县| 池州市| 台北县| 凌海市| 苍溪县| 长白| 德清县| 习水县| 滦南县| 隆林| 神池县| 塔城市| 尚志市| 玉屏| 澳门|