批流一體數(shù)據(jù)集成工具ChunJun同步Hive事務(wù)表原理詳解及實戰(zhàn)分享
本期我們帶大家回顧一下無倦同學(xué)的直播分享《Chunjun同步Hive事務(wù)表詳解》

Hive事務(wù)表的結(jié)構(gòu)及原理
Hive是基于Hadoop的一個數(shù)據(jù)倉庫工具,用來進行數(shù)據(jù)提取、轉(zhuǎn)化、加載,這是一種可以存儲、查詢和分析存儲在Hadoop中的大規(guī)模數(shù)據(jù)的機制。Hive數(shù)據(jù)倉庫工具能將結(jié)構(gòu)化的數(shù)據(jù)文件映射為一張數(shù)據(jù)庫表,并提供SQL查詢功能,能將SQL語句轉(zhuǎn)變成MapReduce任務(wù)來執(zhí)行。
在分享Hive事務(wù)表的具體內(nèi)容前,我們先來了解下HIve 事務(wù)表在 HDFS 存儲上的一些限制。
Hive雖然支持了具有ACID語義的事務(wù),但是沒有像在MySQL中使用那樣方便,有很多局限性,具體限制如下:
尚不支持BEGIN,COMMIT和ROLLBACK,所有語言操作都是自動提交的
僅支持ORC文件格式(STORED AS ORC)
?默認(rèn)情況下事務(wù)配置為關(guān)閉,需要配置參數(shù)開啟使用
表必須是分桶表(Bucketed)才可以使用事務(wù)功能
表必須內(nèi)部表,外部表無法創(chuàng)建事務(wù)表
表參數(shù)transactional必須為true
外部表不能成為ACID表,不允許從非ACID會話讀取/寫入ACID表
以下矩陣包括可以使用Hive創(chuàng)建的表的類型、是否支持ACID屬性、所需的存儲格式以及關(guān)鍵的SQL操作。
了解完Hive事務(wù)表的限制,現(xiàn)在我們具體了解下Hive事務(wù)表的內(nèi)容。
01
事務(wù)表文件名字詳解
基礎(chǔ)目錄:
$partition/base_$wid/$bucket
增量目錄:
$partition/delta_$wid_$wid_$stid/$bucket?
參數(shù)目錄:
$partition/delete_delta_$wid_$wid_$stid/$bucket?

02
事務(wù)表文件內(nèi)容詳解
$ orc-tools data bucket_00000
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":0,"currentTransaction":1,"row":{"id":1,"name":"Jerry","age":18}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":1,"row":{"id":2,"name":"Tom","age":19}}
{"operation":0,"originalTransaction":1,"bucket":536870912,"rowId":2,"currentTransaction":1,"row":{"id":3,"name":"Kate","age":20}}
1.operation 0 表示插入、1 表示更新,2 表示刪除。由于使用了 split-update,UPDATE 是不會出現(xiàn)的。
2.originalTransaction是該條記錄的原始寫事務(wù) ID:
a、對于 INSERT 操作,該值和 currentTransaction 是一致的;
b、對于 DELETE,則是該條記錄第一次插入時的寫事務(wù) ID。
3.bucket 是一個 32 位整型,由 BucketCodec 編碼,各個二進制位的含義為:
a、1-3 位:編碼版本,當(dāng)前是 001;
b、4 位:保留;
c、5-16 位:分桶 ID,由 0 開始。分桶 ID 是由 CLUSTERED BY 子句所指定的字段、以及分桶的數(shù)量決定的。該值和 bucket_N 中的 N 一致;
d、17-20 位:保留;
e、21-32 位:語句 ID;
舉例來說,整型 536936448 的二進制格式為 00100000000000010000000000000000,即它是按版本 1 的格式編碼的,分桶 ID 為 1。
4.rowId 是一個自增的唯一 ID,在寫事務(wù)和分桶的組合中唯一;
5.currentTransaction 當(dāng)前的寫事務(wù) ID;
6.row 具體數(shù)據(jù)。對于 DELETE 語句,則為 null。
03
更新 Hive 事務(wù)表數(shù)據(jù)
UPDATE employee SET age = 21 WHERE id = 2;
這條語句會先查詢出所有符合條件的記錄,獲取它們的 row_id 信息,然后分別創(chuàng)建 delete 和 delta 目錄:
/user/hive/warehouse/employee/delta_0000001_0000001_0000/bucket_00000
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0000/bucket_00000?(update)
/user/hive/warehouse/employee/delta_0000002_0000002_0000/bucket_00000?(update)
delete_delta_0000002_0000002_0000/bucket_00000
包含了刪除的記錄:
{"operation":2,"originalTransaction":1,"bucket":536870912,"rowId":1,"currentTransaction":2,"row":null}
delta_0000002_0000002_0000/bucket_00000
包含更新后的數(shù)據(jù):
{"operation":0,"originalTransaction":2,"bucket":536870912,"rowId":0,"currentTransaction":2,"row":{"id":2,"name":"Tom","salary":21}}
04
Row_ID 信息怎么查?

05
事務(wù)表壓縮(Compact)
隨著寫操作的積累,表中的 delta 和 delete 文件會越來越多,事務(wù)表的讀取過程中需要合并所有文件,數(shù)量一多勢必會影響效率,此外小文件對 HDFS 這樣的文件系統(tǒng)也不夠友好,因此Hive 引入了壓縮(Compaction)的概念,分為?Minor?和?Major?兩類。
●?Minor
Minor Compaction 會將所有的 delta 文件壓縮為一個文件,delete 也壓縮為一個。壓縮后的結(jié)果文件名中會包含寫事務(wù) ID 范圍,同時省略掉語句 ID。
壓縮過程是在 Hive Metastore 中運行的,會根據(jù)一定閾值自動觸發(fā)。我們也可以使用如下語句人工觸發(fā):
ALTER TABLE dtstack COMPACT 'MINOR'。
●?Major
Major Compaction 會將所有的 delta 文件,delete 文件壓縮到一個 base 文件。壓縮后的結(jié)果文件名中會包含所有寫事務(wù) ID 的最大事務(wù) ID。
壓縮過程是在 Hive Metastore 中運行的,會根據(jù)一定閾值自動觸發(fā)。我們也可以使用如下語句人工觸發(fā):
ALTER TABLE dtstack COMPACT 'MAJOR'。
06
文件內(nèi)容詳解
ALTER TABLE employee COMPACT 'minor';
語句執(zhí)行前:
/user/hive/warehouse/employee/delta_0000001_0000001_0000
/user/hive/warehouse/employee/delta_0000002_0000002_0000? ?(insert 創(chuàng)建, mary的數(shù)據(jù))
/user/hive/warehouse/employee/delete_delta_0000002_0000002_0001 (update)
/user/hive/warehouse/employee/delta_0000002_0000002_0001 (update)
語句執(zhí)行后:
/user/hive/warehouse/employee/delete_delta_0000001_0000002
/user/hive/warehouse/employee/delta_0000001_0000002
07
讀 Hive 事務(wù)表
我們可以看到 ACID 事務(wù)表中會包含三類文件,分別是?base、delta、以及 delete。文件中的每一行數(shù)據(jù)都會以 row__id 作為標(biāo)識并排序。從 ACID 事務(wù)表中讀取數(shù)據(jù)就是對這些文件進行合并,從而得到最新事務(wù)的結(jié)果。這一過程是在 OrcInputFormat 和 OrcRawRecordMerger 類中實現(xiàn)的,本質(zhì)上是一個合并排序的算法。
以下列文件為例,產(chǎn)生這些文件的操作為:
1. 插入三條記錄
2. 進行一次 Major Compaction
3. 然后更新兩條記錄。
1-0-0-1 是對 originalTransaction - bucketId - rowId - currentTra

08
合并算法
對所有數(shù)據(jù)行按照 (originalTransaction, bucketId, rowId) 正序排列,(currentTransaction) 倒序排列,即:
originalTransaction-bucketId-rowId-currentTransaction
(base_1)1-0-0-1
(delete_2)1-0-1-2# 被跳過(DELETE)
(base_1)1-0-1-1 # 被跳過(當(dāng)前記錄的 row_id(1) 和上條數(shù)據(jù)一樣)
(delete_2)1-0-2-2 # 被跳過(DELETE)
(base_1)1-0-2-1 # 被跳過(當(dāng)前記錄的 row_id(2) 和上條數(shù)據(jù)一樣)
(delta_2)2-0-0-2
(delta_2)2-0-1-2
獲取第一條記錄;
?1. ?如果當(dāng)前記錄的 row_id 和上條數(shù)據(jù)一樣,則跳過;
?2. 如果當(dāng)前記錄的操作類型為 DELETE,也跳過;
通過以上兩條規(guī)則,對于 1-0-1-2 和 1-0-1-1,這條記錄會被跳過;
如果沒有跳過,記錄將被輸出給下游;
重復(fù)以上過程。
合并過程是流式的,即 Hive 會將所有文件打開,預(yù)讀第一條記錄,并將 row__id 信息存入到 ReaderKey 類型中。


ChunJun讀寫Hive事務(wù)表實戰(zhàn)
了解完Hive事務(wù)表的基本原理后,我們來為大家分享如何在ChunJun中讀寫Hive事務(wù)表。
01
事務(wù)表數(shù)據(jù)準(zhǔn)備
-- 創(chuàng)建事務(wù)表
create table dtstack(
? ? id int,
? ? name string,
? ? age int
)
stored as orc
TBLPROPERTIES('transactional'='true');
-- 插入 10 條測試數(shù)據(jù)
insert into dtstack (id, name, age)
values (1, "aa", 11), (2, "bb", 12), (3, "cc", 13), (4, "dd", 14), (5, "ee", 15),
? ? ? ?(6, "ff", 16), (7, "gg", 17), (8, "hh", 18), (9, "ii", 19), (10, "jj", 20);
02
配置 ChunJun json 腳本


03
提交任務(wù)(讀寫事務(wù)表)
# 啟動 Session??
/root/wujuan/flink-1.12.7/bin/yarn-session.sh -t $ChunJun_HOME -d
#提交 Yarn Session 任務(wù)
# 讀取事務(wù)表
/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job? /root/wujuan/ChunJun/ChunJun-examples/json/hive3/hive3_transaction_stream.json -confProp {\"yarn.application.id\":\"application_1650792512832_0134\"}
# 寫入事務(wù)表
/root/wujuan/ChunJun/bin/ChunJun-yarn-session.sh -job? /root/wujuan/ChunJun/ChunJun-examples/json/hive3/stream_hive3_transaction.json -confProp {\"yarn.application.id\":\"application_1650792512832_0134\"}
根據(jù)上一行結(jié)果替換 yarn.application.id
ChunJun 讀寫Hive事務(wù)表源碼分析
壓縮器是在 Metastore 境內(nèi)運行的一組后臺程序,用于支持 ACID 系統(tǒng)。它由 Initiator、 Worker、 Cleaner、 AcidHouseKeeperService 和其他一些組成。
01
Compactor
●?Delta File Compaction
在不斷的對表修改中,會創(chuàng)建越來越多的delta文件,需要這些文件需要被壓縮以保證性能。有兩種類型的壓縮,即(minor)小壓縮和(major)大壓縮:
minor 需要一組現(xiàn)有的delta文件,并將它們重寫為每個桶的一個delta文件
major 需要一個或多個delta文件和桶的基礎(chǔ)文件,并將它們改寫成每個桶的新基礎(chǔ)文件。major 需要更久,但是效果更好
所有的壓縮工作都是在后臺進行的,并不妨礙對數(shù)據(jù)的并發(fā)讀寫。在壓縮之后系統(tǒng)會等待,直到所有舊文件的讀都結(jié)束,然后刪除舊文件。
●Initiator
這個模塊負責(zé)發(fā)現(xiàn)哪些表或分區(qū)要進行壓縮。這應(yīng)該在元存儲中使用hive.compactor.initiator.on來啟用。??每個 Compact 任務(wù)處理一個分區(qū)(如果表是未分區(qū)的,則處理整個表)。如果某個分區(qū)的連續(xù)壓實失敗次數(shù)超過 hive.compactor.initiator.failed.compacts.threshold,這個分區(qū)的自動壓縮調(diào)度將停止。
●?Worker
每個Worker處理一個壓縮任務(wù)。?一個壓縮是一個MapReduce作業(yè),其名稱為以下形式。<hostname>-compactor-<db>.<table>.<partition>。?每個Worker將作業(yè)提交給集群(如果定義了hive.compactor.job.queue),并等待作業(yè)完成。hive.compactor.worker.threads決定了每個Metastore中Worker的數(shù)量。?Hive倉庫中的Worker總數(shù)決定了并發(fā)壓縮的最大數(shù)量。
●?Cleaner
這個進程是在壓縮后,在確定不再需要delta文件后,將其刪除。
●?AcidHouseKeeperService
這個進程尋找那些在hive.txn.timeout時間內(nèi)沒有心跳的事務(wù)并中止它們。系統(tǒng)假定發(fā)起交易的客戶端停止心跳后崩潰了,它鎖定的資源應(yīng)該被釋放。
●?SHOW COMPACTIONS
該命令顯示當(dāng)前運行的壓實和最近的壓實歷史(可配置保留期)的信息。這個歷史顯示從HIVE-12353開始可用。
●?Compact 重點配置
02
如何 debug Hive
1. debug hive client
hive --debug
2. debug hive metastore
hive --service metastore --debug:port=8881,mainSuspend=y,childSuspend=n --hiveconf hive.root.logger=DEBUG,console

3. debug hive mr 任務(wù)

03
讀寫過濾和CompactorMR排序的關(guān)鍵代碼


04
Minor&Major 合并源碼(CompactorMR Map 類)


ChunJun 文件系統(tǒng)未來規(guī)劃
最后為大家介紹ChunJun 文件系統(tǒng)未來規(guī)劃:
●?基于 FLIP-27 優(yōu)化文件系統(tǒng)
批流統(tǒng)一實現(xiàn),簡單的線程模型,分片和讀數(shù)據(jù)分離。
●?Hive 的分片優(yōu)化
分片更精細化,粒度更細,充分發(fā)揮并發(fā)能力
●?完善 Exactly Once 語義
加強異常情況健壯性。
●?HDFS 文件系統(tǒng)的斷點續(xù)傳
根據(jù)分區(qū),文件個數(shù),文件行數(shù)等確定端點位置,狀態(tài)存儲在 checkpoint 里面。
●?實時采集文件
實時監(jiān)控目錄下的多個追加文件。
●?文件系統(tǒng)格式的通用性
JSON、CSV、Text、XM、EXCELL 統(tǒng)一抽取公共包。