在直播、電商等業務場景中存在著大量實時數據,這些數據對業務發展至關重要。而在處理實時數據時,我們也遇到了諸多挑戰,比如實時數據開發門檻高、運維成本高以及資源浪費等。
此外,實時數據處理比離線數據更復雜,需要應對多流 JOIN、維度表變化等技術難題,并確保系統的穩定性和數據的準確性。本文將分享基于 Apache Doris 的實時數倉架構在不同業務場景的實踐經驗,以及該架構帶來的收益。
存儲實時數倉架構背景
首先介紹存儲實時數倉架構的背景。
01 實時數據數倉鏈路

目前實時數據主要使用 Flink 作為中轉工具,Kafka 作為 Flink 的邏輯表,實現數據在不同數據分層之間的流轉。Kafka 本身沒有邏輯表,無法像 Hive 那樣清晰地進行開發過程。
實時數據和離線數據的內容生產量級會有比較大落差,主要原因在于實時數據開發成本、運維成本以及資源成本,尤其是前兩者相較離線開發更高,因此盡管有一部分實時數據的需求,我們經常會想辦法將其降級。
02 Flink 數倉問題與挑戰
-
開發門檻高:Flink 是有狀態的一套數據流引擎,具有狀態的增量特性,需要更清晰的底層認知,特別是在多流 JOIN 等場景下。增量的狀態,導致無法像 Hive 那樣把全量的數據狀態存到內存里,進一步進行簡單的數據操作。
實時數據涉及的數據存儲量較大,需要使用多種計算引擎,如 OLTP 引擎(MySQL、PostgreSQL)、OLAP 引擎(ClickHouse、Doris)、KV 存儲(Abase、Tier、Redis)等,以適應不同的計算需求,這也增加了開發難度。另外,由于其增量狀態,也讓測試變得困難。
-
開發運維成本高:復雜的多流 JOIN 操作經常需要存儲大量狀態數據,這可能會導致穩定性問題,尤其是在處理連續直播等情況下。
在多個業務線的平臺中,一些發展中的業務線由于需要不斷進行業務創新,業務口徑隨之變化,而 Flink 作為增量狀態存儲的系統,遇到狀態不可恢復的問題是不可避免的。當數據口徑變更時,直接上線可能會由于狀態結構改變而無法進行數據恢復。
-
資源浪費:在實時場景中,資源浪費是很常見的情況,雖然資源浪費不是核心問題,但是目前各個公司都有治理的需求。
對于一個任務來說,比如在大促活動剛開始的時候,會有大的潮汐洪峰,但過了幾分鐘之后,流量會迅速地遞減退變,為了保證穩定性,我們需要保持高資源位,來穩定地進行 24x7 的運行,這就會導致資源浪費。
03 目標與愿景
我們希望找到一個架構,能在三個方面做出提升:
- 降低開發門檻,為終極目標。 通過降低門檻,提升效率,希望能夠達到類似于離線開發一樣的效率。同時,解決實時領域復雜的方案設計問題,比如多表 JOIN 和維度表實時變更。維表實施變更之后,相應的值如何迅速進行更正,這也是一大業務痛點。從而更好地應對創新業務口徑經常變更的情況。
- 提高開發效率,只需開發 SQL,無需關心底層運維設置,實現單一職責化。 由于 Flink 狀態中間數據不可查,如何進行更快速更高效的數據測試也非常關鍵,畢竟不是把數據開發好就夠了,還要保障數據的準確性。實時數據的錯誤,可能會造成主播、電商或平臺三方的資損。
- 資源成本節省。 Flink 任務是常駐任務會有大量的資源消耗,我們希望通過架構優化降低資源成本。
存儲實時數倉架構體系
接下來介紹實時數倉的運轉方式。
01 存儲實時數倉架構

上圖中簡明地展示了目前運行架構。
左側是我們所采用的一套已較為成熟的架構,主要用于一些成熟業務。數據存儲方面使用了 Kafka 的邏輯表形式。雖然這種邏輯表缺少字段和約束,并且數據的可查性也不是很好,但卻負責了一半以上的實時數據開發。
右側的架構則更為簡單,類似于離線 Hive,采用了 Doris 存儲架構。通過 OLAP 引擎和秒級調度,實現了數據分層,可以復用離線開發的內容,使實時數據開發變得更加清晰簡潔。整體架構的核心是調度引擎(秒級調度)加上 OLAP 引擎。
02 存儲實時數倉架構生態

這個架構看似簡單,但實際上有著復雜的生態系統在支撐。
這套架構已經運行多年,但仍需要相應的生態系統配合,比如數據質量檢查平臺和數據質量保障措施。另外,數據治理也是必不可少的,特別是在處理大量數據表、數據模型和數據任務時。
應用數據開發方面,可以通過 Doris 引擎進行數據生產,但如何對外提供數據則需要考慮不同的透出形式。我們通過數倉表直接透出,也可以通過 ETL 數據集成將數據導入到 KV 存儲,以滿足一些高 QPS 的場景需求。
此外,從數倉模型、數據開發、開發規范到指標體系的建設也是必要的。
這套架構在宏觀上與離線系統有類似之處。
03 一站式研發平臺

我們提供了一站式的數據開發服務。首先是注冊數據源,然后通過簡單的 SQL 語句即可輕松地進行任務開發。
開發完成后,通過一些配置,實現版本管理、上線、Review、數據回溯、告警、大盤等一系列操作。
04 調度引擎挑戰
實時生態系統非常復雜,實踐中會遇到一些困難。
實時場景核心有兩套引擎:調度引擎和 OLAP 引擎。
調度引擎面臨的挑戰主要有以下三方面:
4-1: T+0 調度支持
原本我們計劃直接復用離線調度引擎,但實際落地時發現了一些問題。比如,離線調度通常是 T+1 的,業務時間的替換可能是不符合準實時開發要求的,準實時或實時開發需要 T+0 的日期參數,一些重跑和依賴調度能力等都需要重新構建。
T+1 離線調度對延時的容忍度較高,稍微延遲幾分鐘是可以接受的,并且離線調度引擎會采用打散任務的策略來處理這種情況。比如,在 0 點的時候,系統會將一些任務進行打散,部分任務稍晚執行,這在離線環境中非常常見。
但是,在實時場景下,這種延遲是不允許的。另外,實時場景和離線場景的數據量差異很大,實例存儲的數據量可能有兩、三個數量級以上的差距。
比如天級任務每天只有一個實例,小時級任務有幾十個,而分鐘級任務則有上千個實例,相差了兩個數量級以上了,而秒級任務相差的數量級會更大。這種數據量的差異對存儲和調度造成挑戰。
4-2: 實時數據容易晚到
因為要處理當天或小時內的數據,而數據的到達可能會有延遲。在這里,類似 Flink 中的 watermark 概念變得非常重要,調度引擎需要支持類似的機制來容忍數據的晚到,并保證數據的完整性。
4-3: 調度間隔
這是一個非常嚴格的要求,比如 15 秒間隔的任務可能因數據量的關系需要 16 秒完成,這也是需要解決的難題之一。
針對 T+0 調度中的三個難題,我們采取了相應的解決方案:
- 首先,支持了 T+0 參數替換功能,提供了高級的運算法則,可以進行秒級或分鐘級的時間偏移。
- 其次,對調度引擎進行了深度改造,實現了水平擴展,支持多個 scheduler,使得調度引擎可以橫向無限擴展。
- 調度間隔。這是一個非常嚴格的要求,比如 15 秒間隔的任務可能因數據量的關系需要 16 秒完成,這也是需要解決的難題之一。
- 另外,針對數據容易晚到的問題,我們采取了數據補償機制,即定時進行數據補償操作來確保數據的完整性。例如,對于一個分鐘級時效的任務,每分鐘執行一次后,我們會在數據可能晚到的情況下進行定時補償,以覆蓋完整數據。
針對任務跑的時間長于調度間隔的問題,我們提出了 MisFire 處理策略,這個策略源自于 Quartz 的一些思想。 針對不同的情況,有多種處理方式。最簡單的是任務并行,這也是離線開發的默認方式。
另外一種方式是任務串行,特別適用于實時數據場景,避免數據亂序導致數據不準確。
還有一種方式是數據跳過,如果出現任務積壓的情況,系統會自動跳過一些任務實例,以確保任務能夠相對健康地運行。比如說,當任務積壓了幾百個實例時,下一次運行時會將相應的實例 Kill 掉,然后繼續運行最新的實例。具體的處理方式需要根據業務場景來確定。
05 Doris 引擎挑戰
前面介紹了調度引擎面臨的挑戰和解決方案,接下來看一下 OLAP 引擎。OLAP 引擎主要面臨以下三方面挑戰:
-
跨機房容災能力:準實時領域跟服務端的一些情況有些類似,即在穩定性方面有著高的要求。一旦出現主播跟播時在線人數突然跳零,就會導致主播的一些話術無法及時組織和應變,進而產生嚴重的資損。
因此,我們需要跨機房容災的能力,來應對單機房故障帶來的整體服務不可用,以及實時數據無法對外提供的問題。
-
讀寫隔離能力:這涉及到 Doris 平臺上的操作。我們同時進行數據的生產和消費,但在數據最初階段,缺乏有效的隔離措施,而這對數據的穩定性是至關重要的。
-
跨集群 ETL 能力:我們對不同業務場景有著嚴格的重要等級要求,會將數據分散到多個集群中,比如 A 業務集群、B 業務集群和 C 業務集群等。
B 或 C 都是交易類的依賴訂單流的數據,會有公共數倉的建設,這些公共數倉的建設如果無法實現從 B 集群同步到 C 集群,就會導致不同業務線或集群之間的重復建設,無論從人力還是資源方面都會給我們帶來負擔。
特別是對于涉及交易類數據的集群,這種同步工作顯得尤為重要。因此,跨集群 ETL 是我們數倉建設中非常核心的一個能力。

針對上述問題,一一進行解決。
- 首先,關于多機房容災能力的問題,在三個機房中每個機房都有一張表的情況下,每張表有三個副本,其中一個副本分攤在一個機房,從生產端的 MQ 數據寫入到 Doris 后,經過中間加工端再到消費端,最終形成了數據服務的全鏈路高可用性。在單個機房掛掉時,無論是生產還是消費,都會有同機房優先和跨機房降級策略來保障高效性和穩定性。
- 讀寫隔離機制較為簡單,將讀寫流量分流到不同的集群組上。
- 跨集群讀寫采用兩種機制:一種是借助 Spark 將數據源格式讀到 Yarn 集群,再同步到不同集群;另一種是在 Doris 內部使用 Doris 原生能力將集群數據同步到另一個集群。兩種方式各有優勢,Spark on Doris 相對更加穩定且不消耗 Doris 計算資源,而第二種方案效率更高,根據業務場景和時效性訴求選擇不同的跨集群讀寫方式。
存儲實時數倉架構實踐
接下來簡要介紹一些實際的應用場景。
01 Flink 鏈路

Flink 鏈路如上圖所示,第一條鏈路看起來比較復雜,需要執行多條流的 JOIN 操作。
使用基于存儲的實時數倉架構后,整體結構變得更加簡潔,雖然數據來源仍為多條流,但實際上在一張表里進行了 JOIN 操作。整體涉及了四五個甚至更多流式 JOIN,流式 JOIN 復雜度大家都比較了解。不過,實際負責的 JOIN 可能僅有三個。開發成本和后期維護成本都大幅降低。
02 實時榜單解決方案
另一個是實時榜單解決方案。

針對這種場景,我們進行了解決方案的抽象,并在存儲數倉中實施了一個方案。
最初的方案是基于 Flink 的,出現了一些問題,于是后期遷移到了基于 Doris 的存儲數倉方案。這套方案的特點是元數據定義比較清晰。
元數據由實時表從 MQ 中的字段解析而來,解析后對其進行了一些元數據定義,即對榜單場景業務邏輯進行抽象,比如會定義周期、原子指標以及如何加工這些原子指標。
另外,還定義了榜單如何進行分區,分區可以根據實體類型來確定,例如對商家、視頻或直播進行排名。通過簡單的配置,能夠快速創建出相應的 Flink 任務。
在業務實際運營中,有許多類似的榜單場景,這樣的榜單場景過多導致出現了兩個問題。
首先,榜單場景過多導致任務量激增,這會給資源治理帶來較多困難。特別是對于實時流處理,需要 24 小時全天候運行,任務量增加會讓資源治理問題變得更加嚴峻。
其次,報警運維也是一個挑戰,實時任務報警頻率高,甚至一個任務可能隨時都會產生警報。而隨著任務數量的增加,報警更加頻繁。此外,由于大量任務消費同一個消息隊列,會放大流量,給 HDFS 帶來額外負擔。
另外,電商領域的大型促銷活動常常伴隨著長周期狀態,這種長周期計算會對 Flink 的大狀態穩定性產生影響,同時也使回溯變得困難。為應對這些問題,運維人員經常需要在零點進行操作,只有在這個時間點才重新計算,相對來說狀態比較小,回溯壓力也比較小。

基于上述痛點,我們將 Flink 架構遷移到了存儲數倉架構,使得運維工作變得更加高效。相比 Flink,在榜單場景下資源量和報警量都有下降。并且解決了長周期計算的難題。由于狀態保存在 Doris 的表中,長周期計算變得更加靈活。
存儲實時數倉架構規劃
最后分享我們在未來要做的一些工作。
首先是對解決方案的封裝。我們已經封裝了一個榜單業務場景,還有許多其他場景,比如 DMP、標簽和中間層數據等,這些場景都可以被打包成解決方案。除了模式和方法論的封裝之外,還有存儲架構的封裝。
在存儲架構方面,不斷演進自研的數據湖產品,擴展更多的存儲架構。
另外是智能化運維整合,實時數據的穩定性對開發和運維人員壓力是非常大的,我們希望整合一些規則和算法,實現自動化處理部分場景,剩下的做推薦化預案,從而提升 MTTR,提升故障恢復的時效性并降低成本。
以上就是本次分享的內容,謝謝大家。


