在直播、電商等業(yè)務(wù)場景中存在著大量實時數(shù)據(jù),這些數(shù)據(jù)對業(yè)務(wù)發(fā)展至關(guān)重要。而在處理實時數(shù)據(jù)時,我們也遇到了諸多挑戰(zhàn),比如實時數(shù)據(jù)開發(fā)門檻高、運(yùn)維成本高以及資源浪費(fèi)等。
此外,實時數(shù)據(jù)處理比離線數(shù)據(jù)更復(fù)雜,需要應(yīng)對多流 JOIN、維度表變化等技術(shù)難題,并確保系統(tǒng)的穩(wěn)定性和數(shù)據(jù)的準(zhǔn)確性。本文將分享基于 Apache Doris 的實時數(shù)倉架構(gòu)在不同業(yè)務(wù)場景的實踐經(jīng)驗,以及該架構(gòu)帶來的收益。
存儲實時數(shù)倉架構(gòu)背景
首先介紹存儲實時數(shù)倉架構(gòu)的背景。
01 實時數(shù)據(jù)數(shù)倉鏈路

目前實時數(shù)據(jù)主要使用 Flink 作為中轉(zhuǎn)工具,Kafka 作為 Flink 的邏輯表,實現(xiàn)數(shù)據(jù)在不同數(shù)據(jù)分層之間的流轉(zhuǎn)。Kafka 本身沒有邏輯表,無法像 Hive 那樣清晰地進(jìn)行開發(fā)過程。
實時數(shù)據(jù)和離線數(shù)據(jù)的內(nèi)容生產(chǎn)量級會有比較大落差,主要原因在于實時數(shù)據(jù)開發(fā)成本、運(yùn)維成本以及資源成本,尤其是前兩者相較離線開發(fā)更高,因此盡管有一部分實時數(shù)據(jù)的需求,我們經(jīng)常會想辦法將其降級。
02 Flink 數(shù)倉問題與挑戰(zhàn)
-
開發(fā)門檻高:Flink 是有狀態(tài)的一套數(shù)據(jù)流引擎,具有狀態(tài)的增量特性,需要更清晰的底層認(rèn)知,特別是在多流 JOIN 等場景下。增量的狀態(tài),導(dǎo)致無法像 Hive 那樣把全量的數(shù)據(jù)狀態(tài)存到內(nèi)存里,進(jìn)一步進(jìn)行簡單的數(shù)據(jù)操作。
實時數(shù)據(jù)涉及的數(shù)據(jù)存儲量較大,需要使用多種計算引擎,如 OLTP 引擎(MySQL、PostgreSQL)、OLAP 引擎(ClickHouse、Doris)、KV 存儲(Abase、Tier、Redis)等,以適應(yīng)不同的計算需求,這也增加了開發(fā)難度。另外,由于其增量狀態(tài),也讓測試變得困難。
-
開發(fā)運(yùn)維成本高:復(fù)雜的多流 JOIN 操作經(jīng)常需要存儲大量狀態(tài)數(shù)據(jù),這可能會導(dǎo)致穩(wěn)定性問題,尤其是在處理連續(xù)直播等情況下。
在多個業(yè)務(wù)線的平臺中,一些發(fā)展中的業(yè)務(wù)線由于需要不斷進(jìn)行業(yè)務(wù)創(chuàng)新,業(yè)務(wù)口徑隨之變化,而 Flink 作為增量狀態(tài)存儲的系統(tǒng),遇到狀態(tài)不可恢復(fù)的問題是不可避免的。當(dāng)數(shù)據(jù)口徑變更時,直接上線可能會由于狀態(tài)結(jié)構(gòu)改變而無法進(jìn)行數(shù)據(jù)恢復(fù)。
-
資源浪費(fèi):在實時場景中,資源浪費(fèi)是很常見的情況,雖然資源浪費(fèi)不是核心問題,但是目前各個公司都有治理的需求。
對于一個任務(wù)來說,比如在大促活動剛開始的時候,會有大的潮汐洪峰,但過了幾分鐘之后,流量會迅速地遞減退變,為了保證穩(wěn)定性,我們需要保持高資源位,來穩(wěn)定地進(jìn)行 24x7 的運(yùn)行,這就會導(dǎo)致資源浪費(fèi)。
03 目標(biāo)與愿景
我們希望找到一個架構(gòu),能在三個方面做出提升:
- 降低開發(fā)門檻,為終極目標(biāo)。 通過降低門檻,提升效率,希望能夠達(dá)到類似于離線開發(fā)一樣的效率。同時,解決實時領(lǐng)域復(fù)雜的方案設(shè)計問題,比如多表 JOIN 和維度表實時變更。維表實施變更之后,相應(yīng)的值如何迅速進(jìn)行更正,這也是一大業(yè)務(wù)痛點(diǎn)。從而更好地應(yīng)對創(chuàng)新業(yè)務(wù)口徑經(jīng)常變更的情況。
- 提高開發(fā)效率,只需開發(fā) SQL,無需關(guān)心底層運(yùn)維設(shè)置,實現(xiàn)單一職責(zé)化。 由于 Flink 狀態(tài)中間數(shù)據(jù)不可查,如何進(jìn)行更快速更高效的數(shù)據(jù)測試也非常關(guān)鍵,畢竟不是把數(shù)據(jù)開發(fā)好就夠了,還要保障數(shù)據(jù)的準(zhǔn)確性。實時數(shù)據(jù)的錯誤,可能會造成主播、電商或平臺三方的資損。
- 資源成本節(jié)省。 Flink 任務(wù)是常駐任務(wù)會有大量的資源消耗,我們希望通過架構(gòu)優(yōu)化降低資源成本。
存儲實時數(shù)倉架構(gòu)體系
接下來介紹實時數(shù)倉的運(yùn)轉(zhuǎn)方式。
01 存儲實時數(shù)倉架構(gòu)

上圖中簡明地展示了目前運(yùn)行架構(gòu)。
左側(cè)是我們所采用的一套已較為成熟的架構(gòu),主要用于一些成熟業(yè)務(wù)。數(shù)據(jù)存儲方面使用了 Kafka 的邏輯表形式。雖然這種邏輯表缺少字段和約束,并且數(shù)據(jù)的可查性也不是很好,但卻負(fù)責(zé)了一半以上的實時數(shù)據(jù)開發(fā)。
右側(cè)的架構(gòu)則更為簡單,類似于離線 Hive,采用了 Doris 存儲架構(gòu)。通過 OLAP 引擎和秒級調(diào)度,實現(xiàn)了數(shù)據(jù)分層,可以復(fù)用離線開發(fā)的內(nèi)容,使實時數(shù)據(jù)開發(fā)變得更加清晰簡潔。整體架構(gòu)的核心是調(diào)度引擎(秒級調(diào)度)加上 OLAP 引擎。
02 存儲實時數(shù)倉架構(gòu)生態(tài)

這個架構(gòu)看似簡單,但實際上有著復(fù)雜的生態(tài)系統(tǒng)在支撐。
這套架構(gòu)已經(jīng)運(yùn)行多年,但仍需要相應(yīng)的生態(tài)系統(tǒng)配合,比如數(shù)據(jù)質(zhì)量檢查平臺和數(shù)據(jù)質(zhì)量保障措施。另外,數(shù)據(jù)治理也是必不可少的,特別是在處理大量數(shù)據(jù)表、數(shù)據(jù)模型和數(shù)據(jù)任務(wù)時。
應(yīng)用數(shù)據(jù)開發(fā)方面,可以通過 Doris 引擎進(jìn)行數(shù)據(jù)生產(chǎn),但如何對外提供數(shù)據(jù)則需要考慮不同的透出形式。我們通過數(shù)倉表直接透出,也可以通過 ETL 數(shù)據(jù)集成將數(shù)據(jù)導(dǎo)入到 KV 存儲,以滿足一些高 QPS 的場景需求。
此外,從數(shù)倉模型、數(shù)據(jù)開發(fā)、開發(fā)規(guī)范到指標(biāo)體系的建設(shè)也是必要的。
這套架構(gòu)在宏觀上與離線系統(tǒng)有類似之處。
03 一站式研發(fā)平臺

我們提供了一站式的數(shù)據(jù)開發(fā)服務(wù)。首先是注冊數(shù)據(jù)源,然后通過簡單的 SQL 語句即可輕松地進(jìn)行任務(wù)開發(fā)。
開發(fā)完成后,通過一些配置,實現(xiàn)版本管理、上線、Review、數(shù)據(jù)回溯、告警、大盤等一系列操作。
04 調(diào)度引擎挑戰(zhàn)
實時生態(tài)系統(tǒng)非常復(fù)雜,實踐中會遇到一些困難。
實時場景核心有兩套引擎:調(diào)度引擎和 OLAP 引擎。
調(diào)度引擎面臨的挑戰(zhàn)主要有以下三方面:
4-1: T+0 調(diào)度支持
原本我們計劃直接復(fù)用離線調(diào)度引擎,但實際落地時發(fā)現(xiàn)了一些問題。比如,離線調(diào)度通常是 T+1 的,業(yè)務(wù)時間的替換可能是不符合準(zhǔn)實時開發(fā)要求的,準(zhǔn)實時或?qū)崟r開發(fā)需要 T+0 的日期參數(shù),一些重跑和依賴調(diào)度能力等都需要重新構(gòu)建。
T+1 離線調(diào)度對延時的容忍度較高,稍微延遲幾分鐘是可以接受的,并且離線調(diào)度引擎會采用打散任務(wù)的策略來處理這種情況。比如,在 0 點(diǎn)的時候,系統(tǒng)會將一些任務(wù)進(jìn)行打散,部分任務(wù)稍晚執(zhí)行,這在離線環(huán)境中非常常見。
但是,在實時場景下,這種延遲是不允許的。另外,實時場景和離線場景的數(shù)據(jù)量差異很大,實例存儲的數(shù)據(jù)量可能有兩、三個數(shù)量級以上的差距。
比如天級任務(wù)每天只有一個實例,小時級任務(wù)有幾十個,而分鐘級任務(wù)則有上千個實例,相差了兩個數(shù)量級以上了,而秒級任務(wù)相差的數(shù)量級會更大。這種數(shù)據(jù)量的差異對存儲和調(diào)度造成挑戰(zhàn)。
4-2: 實時數(shù)據(jù)容易晚到
因為要處理當(dāng)天或小時內(nèi)的數(shù)據(jù),而數(shù)據(jù)的到達(dá)可能會有延遲。在這里,類似 Flink 中的 watermark 概念變得非常重要,調(diào)度引擎需要支持類似的機(jī)制來容忍數(shù)據(jù)的晚到,并保證數(shù)據(jù)的完整性。
4-3: 調(diào)度間隔
這是一個非常嚴(yán)格的要求,比如 15 秒間隔的任務(wù)可能因數(shù)據(jù)量的關(guān)系需要 16 秒完成,這也是需要解決的難題之一。
針對 T+0 調(diào)度中的三個難題,我們采取了相應(yīng)的解決方案:
- 首先,支持了 T+0 參數(shù)替換功能,提供了高級的運(yùn)算法則,可以進(jìn)行秒級或分鐘級的時間偏移。
- 其次,對調(diào)度引擎進(jìn)行了深度改造,實現(xiàn)了水平擴(kuò)展,支持多個 scheduler,使得調(diào)度引擎可以橫向無限擴(kuò)展。
- 調(diào)度間隔。這是一個非常嚴(yán)格的要求,比如 15 秒間隔的任務(wù)可能因數(shù)據(jù)量的關(guān)系需要 16 秒完成,這也是需要解決的難題之一。
- 另外,針對數(shù)據(jù)容易晚到的問題,我們采取了數(shù)據(jù)補(bǔ)償機(jī)制,即定時進(jìn)行數(shù)據(jù)補(bǔ)償操作來確保數(shù)據(jù)的完整性。例如,對于一個分鐘級時效的任務(wù),每分鐘執(zhí)行一次后,我們會在數(shù)據(jù)可能晚到的情況下進(jìn)行定時補(bǔ)償,以覆蓋完整數(shù)據(jù)。
針對任務(wù)跑的時間長于調(diào)度間隔的問題,我們提出了 MisFire 處理策略,這個策略源自于 Quartz 的一些思想。 針對不同的情況,有多種處理方式。最簡單的是任務(wù)并行,這也是離線開發(fā)的默認(rèn)方式。
另外一種方式是任務(wù)串行,特別適用于實時數(shù)據(jù)場景,避免數(shù)據(jù)亂序?qū)е聰?shù)據(jù)不準(zhǔn)確。
還有一種方式是數(shù)據(jù)跳過,如果出現(xiàn)任務(wù)積壓的情況,系統(tǒng)會自動跳過一些任務(wù)實例,以確保任務(wù)能夠相對健康地運(yùn)行。比如說,當(dāng)任務(wù)積壓了幾百個實例時,下一次運(yùn)行時會將相應(yīng)的實例 Kill 掉,然后繼續(xù)運(yùn)行最新的實例。具體的處理方式需要根據(jù)業(yè)務(wù)場景來確定。
05 Doris 引擎挑戰(zhàn)
前面介紹了調(diào)度引擎面臨的挑戰(zhàn)和解決方案,接下來看一下 OLAP 引擎。OLAP 引擎主要面臨以下三方面挑戰(zhàn):
-
跨機(jī)房容災(zāi)能力:準(zhǔn)實時領(lǐng)域跟服務(wù)端的一些情況有些類似,即在穩(wěn)定性方面有著高的要求。一旦出現(xiàn)主播跟播時在線人數(shù)突然跳零,就會導(dǎo)致主播的一些話術(shù)無法及時組織和應(yīng)變,進(jìn)而產(chǎn)生嚴(yán)重的資損。
因此,我們需要跨機(jī)房容災(zāi)的能力,來應(yīng)對單機(jī)房故障帶來的整體服務(wù)不可用,以及實時數(shù)據(jù)無法對外提供的問題。
-
讀寫隔離能力:這涉及到 Doris 平臺上的操作。我們同時進(jìn)行數(shù)據(jù)的生產(chǎn)和消費(fèi),但在數(shù)據(jù)最初階段,缺乏有效的隔離措施,而這對數(shù)據(jù)的穩(wěn)定性是至關(guān)重要的。
-
跨集群 ETL 能力:我們對不同業(yè)務(wù)場景有著嚴(yán)格的重要等級要求,會將數(shù)據(jù)分散到多個集群中,比如 A 業(yè)務(wù)集群、B 業(yè)務(wù)集群和 C 業(yè)務(wù)集群等。
B 或 C 都是交易類的依賴訂單流的數(shù)據(jù),會有公共數(shù)倉的建設(shè),這些公共數(shù)倉的建設(shè)如果無法實現(xiàn)從 B 集群同步到 C 集群,就會導(dǎo)致不同業(yè)務(wù)線或集群之間的重復(fù)建設(shè),無論從人力還是資源方面都會給我們帶來負(fù)擔(dān)。
特別是對于涉及交易類數(shù)據(jù)的集群,這種同步工作顯得尤為重要。因此,跨集群 ETL 是我們數(shù)倉建設(shè)中非常核心的一個能力。

針對上述問題,一一進(jìn)行解決。
- 首先,關(guān)于多機(jī)房容災(zāi)能力的問題,在三個機(jī)房中每個機(jī)房都有一張表的情況下,每張表有三個副本,其中一個副本分?jǐn)傇谝粋€機(jī)房,從生產(chǎn)端的 MQ 數(shù)據(jù)寫入到 Doris 后,經(jīng)過中間加工端再到消費(fèi)端,最終形成了數(shù)據(jù)服務(wù)的全鏈路高可用性。在單個機(jī)房掛掉時,無論是生產(chǎn)還是消費(fèi),都會有同機(jī)房優(yōu)先和跨機(jī)房降級策略來保障高效性和穩(wěn)定性。
- 讀寫隔離機(jī)制較為簡單,將讀寫流量分流到不同的集群組上。
- 跨集群讀寫采用兩種機(jī)制:一種是借助 Spark 將數(shù)據(jù)源格式讀到 Yarn 集群,再同步到不同集群;另一種是在 Doris 內(nèi)部使用 Doris 原生能力將集群數(shù)據(jù)同步到另一個集群。兩種方式各有優(yōu)勢,Spark on Doris 相對更加穩(wěn)定且不消耗 Doris 計算資源,而第二種方案效率更高,根據(jù)業(yè)務(wù)場景和時效性訴求選擇不同的跨集群讀寫方式。
存儲實時數(shù)倉架構(gòu)實踐
接下來簡要介紹一些實際的應(yīng)用場景。
01 Flink 鏈路

Flink 鏈路如上圖所示,第一條鏈路看起來比較復(fù)雜,需要執(zhí)行多條流的 JOIN 操作。
使用基于存儲的實時數(shù)倉架構(gòu)后,整體結(jié)構(gòu)變得更加簡潔,雖然數(shù)據(jù)來源仍為多條流,但實際上在一張表里進(jìn)行了 JOIN 操作。整體涉及了四五個甚至更多流式 JOIN,流式 JOIN 復(fù)雜度大家都比較了解。不過,實際負(fù)責(zé)的 JOIN 可能僅有三個。開發(fā)成本和后期維護(hù)成本都大幅降低。
02 實時榜單解決方案
另一個是實時榜單解決方案。

針對這種場景,我們進(jìn)行了解決方案的抽象,并在存儲數(shù)倉中實施了一個方案。
最初的方案是基于 Flink 的,出現(xiàn)了一些問題,于是后期遷移到了基于 Doris 的存儲數(shù)倉方案。這套方案的特點(diǎn)是元數(shù)據(jù)定義比較清晰。
元數(shù)據(jù)由實時表從 MQ 中的字段解析而來,解析后對其進(jìn)行了一些元數(shù)據(jù)定義,即對榜單場景業(yè)務(wù)邏輯進(jìn)行抽象,比如會定義周期、原子指標(biāo)以及如何加工這些原子指標(biāo)。
另外,還定義了榜單如何進(jìn)行分區(qū),分區(qū)可以根據(jù)實體類型來確定,例如對商家、視頻或直播進(jìn)行排名。通過簡單的配置,能夠快速創(chuàng)建出相應(yīng)的 Flink 任務(wù)。
在業(yè)務(wù)實際運(yùn)營中,有許多類似的榜單場景,這樣的榜單場景過多導(dǎo)致出現(xiàn)了兩個問題。
首先,榜單場景過多導(dǎo)致任務(wù)量激增,這會給資源治理帶來較多困難。特別是對于實時流處理,需要 24 小時全天候運(yùn)行,任務(wù)量增加會讓資源治理問題變得更加嚴(yán)峻。
其次,報警運(yùn)維也是一個挑戰(zhàn),實時任務(wù)報警頻率高,甚至一個任務(wù)可能隨時都會產(chǎn)生警報。而隨著任務(wù)數(shù)量的增加,報警更加頻繁。此外,由于大量任務(wù)消費(fèi)同一個消息隊列,會放大流量,給 HDFS 帶來額外負(fù)擔(dān)。
另外,電商領(lǐng)域的大型促銷活動常常伴隨著長周期狀態(tài),這種長周期計算會對 Flink 的大狀態(tài)穩(wěn)定性產(chǎn)生影響,同時也使回溯變得困難。為應(yīng)對這些問題,運(yùn)維人員經(jīng)常需要在零點(diǎn)進(jìn)行操作,只有在這個時間點(diǎn)才重新計算,相對來說狀態(tài)比較小,回溯壓力也比較小。

基于上述痛點(diǎn),我們將 Flink 架構(gòu)遷移到了存儲數(shù)倉架構(gòu),使得運(yùn)維工作變得更加高效。相比 Flink,在榜單場景下資源量和報警量都有下降。并且解決了長周期計算的難題。由于狀態(tài)保存在 Doris 的表中,長周期計算變得更加靈活。
存儲實時數(shù)倉架構(gòu)規(guī)劃
最后分享我們在未來要做的一些工作。
首先是對解決方案的封裝。我們已經(jīng)封裝了一個榜單業(yè)務(wù)場景,還有許多其他場景,比如 DMP、標(biāo)簽和中間層數(shù)據(jù)等,這些場景都可以被打包成解決方案。除了模式和方法論的封裝之外,還有存儲架構(gòu)的封裝。
在存儲架構(gòu)方面,不斷演進(jìn)自研的數(shù)據(jù)湖產(chǎn)品,擴(kuò)展更多的存儲架構(gòu)。
另外是智能化運(yùn)維整合,實時數(shù)據(jù)的穩(wěn)定性對開發(fā)和運(yùn)維人員壓力是非常大的,我們希望整合一些規(guī)則和算法,實現(xiàn)自動化處理部分場景,剩下的做推薦化預(yù)案,從而提升 MTTR,提升故障恢復(fù)的時效性并降低成本。
以上就是本次分享的內(nèi)容,謝謝大家。


