Kafka Producer 如何保證容錯和僅一次語義?
啟用 Flink 的 checkpointing 后,F(xiàn)linkKafkaProducer 可以提供精確一次的語義保證。
除了啟用 Flink 的 checkpointing,你也可以通過將適當(dāng)?shù)?semantic 參數(shù)傳遞給 FlinkKafkaProducer 來選擇三種不同的操作模式:
Semantic.NONE:Flink 不會有任何語義的保證,產(chǎn)生的記錄可能會丟失或重復(fù)。
Semantic.AT_LEAST_ONCE(默認(rèn)設(shè)置):可以保證不會丟失任何記錄(但是記錄可能會重復(fù))
Semantic.EXACTLY_ONCE:使用 Kafka 事務(wù)提供精確一次語義。無論何時,在使用事務(wù)寫入 Kafka 時,都要記得為所有消費 Kafka 消息的應(yīng)用程序設(shè)置所需的 isolation.level(read_committed 或 read_uncommitted - 后者是默認(rèn)值)。
Semantic.EXACTLY_ONCE 模式依賴于事務(wù)提交的能力。事務(wù)提交發(fā)生于觸發(fā) checkpoint 之前,以及從 checkpoint 恢復(fù)之后。如果從 Flink 應(yīng)用程序崩潰到完全重啟的時間超過了 Kafka 的事務(wù)超時時間,那么將會有數(shù)據(jù)丟失(Kafka 會自動丟棄超出超時時間的事務(wù))??紤]到這一點,請根據(jù)預(yù)期的宕機時間來合理地配置事務(wù)超時時間。
默認(rèn)情況下,Kafka broker 將 transaction.max.timeout.ms 設(shè)置為 15 分鐘。此屬性不允許為大于其值的 producer 設(shè)置事務(wù)超時時間。默認(rèn)情況下,F(xiàn)linkKafkaProducer 將 producer config 中的 transaction.timeout.ms 屬性設(shè)置為 1 小時,因此在使用 Semantic.EXACTLY_ONCE 模式之前應(yīng)該增加 transaction.max.timeout.ms 的值。
在 KafkaConsumer 的 read_committed 模式中,任何未結(jié)束(既未中止也未完成)的事務(wù)將阻塞來自給定 Kafka topic 的未結(jié)束事務(wù)之后的所有讀取數(shù)據(jù)。換句話說,在遵循如下一系列事件之后:
用戶啟動了 transaction1 并使用它寫了一些記錄
用戶啟動了 transaction2 并使用它編寫了一些其他記錄
用戶提交了 transaction2
即使 transaction2 中的記錄已提交,在提交或中止 transaction1 之前,消費者也不會看到這些記錄。這有 2 層含義:
首先,在 Flink 應(yīng)用程序的正常工作期間,用戶可以預(yù)料 Kafka 主題中生成的記錄的可見性會延遲,相當(dāng)于已完成 checkpoint 之間的平均時間。
其次,在 Flink 應(yīng)用程序失敗的情況下,此應(yīng)用程序正在寫入的供消費者讀取的主題將被阻塞,直到應(yīng)用程序重新啟動或配置的事務(wù)超時時間過去后,才恢復(fù)正常。此標(biāo)注僅適用于有多個 agent 或者應(yīng)用程序?qū)懭胪?Kafka 主題的情況。
注意:Semantic.EXACTLY_ONCE 模式為每個 FlinkKafkaProducer 實例使用固定大小的 KafkaProducer 池。每個 checkpoint 使用其中一個 producer。如果并發(fā) checkpoint 的數(shù)量超過池的大小,F(xiàn)linkKafkaProducer 將拋出異常,并導(dǎo)致整個應(yīng)用程序失敗。請合理地配置最大池大小和最大并發(fā) checkpoint 數(shù)量。
注意:Semantic.EXACTLY_ONCE 會盡一切可能不留下任何逗留的事務(wù),否則會阻塞其他消費者從這個 Kafka topic 中讀取數(shù)據(jù)。但是,如果 Flink 應(yīng)用程序在第一次 checkpoint 之前就失敗了,那么在重新啟動此類應(yīng)用程序后,系統(tǒng)中不會有先前池大小(pool size)相關(guān)的信息。因此,在第一次 checkpoint 完成前對 Flink 應(yīng)用程序進(jìn)行縮容,且并發(fā)數(shù)縮容倍數(shù)大于安全系數(shù) FlinkKafkaProducer.SAFE_SCALE_DOWN_FACTOR 的值的話,是不安全的。
