導讀:360商業化為助力業務團隊更好推進商業化增長,實時數倉共經歷了三種模式的演進,分別是 Storm + Druid + MySQL 模式、Flink + Druid + TIDB 的模式 以及 Flink + Doris 的模式,基于 Apache Doris 的新一代架構的成功落地使得 360商業化團隊完成了實時數倉在 OLAP 引擎上的統一,成功實現廣泛實時場景下的秒級查詢響應。本文將為大家進行詳細介紹演進過程以及新一代實時數倉在廣告業務場景中的具體落地實踐。
360 公司致力于成為互聯網和安全服務提供商,是互聯網免費安全的倡導者,先后推出 360安全衛士、360手機衛士、360安全瀏覽器等安全產品以及 360導航、360搜索等用戶產品。
360商業化依托 360產品龐大的用戶覆蓋能力和超強的用戶粘性,通過專業數據處理和算法實現廣告精準投放,助力數十萬中小企業和 KA 企業實現價值增長。360商業化數據團隊主要是對整個廣告投放鏈路中所產生的數據進行計算處理,為產品運營團隊提供策略調整的分析數據,為算法團隊提供模型訓練的優化數據,為廣告主提供廣告投放的效果數據。
業務場景
在正式介紹 Apache Doris 在 360 商業化的應用之前,我們先對廣告業務中的典型使用場景進行簡要介紹:
-
實時大盤: 實時大盤場景是我們對外呈現數據的關鍵載體,需要從多個維度監控商業化大盤的指標情況,包括流量指標、消費指標、轉化指標和變現指標,因此其對數據的準確性要求非常高(保證數據不丟不重),同時對數據的時效性和穩定性要求也很高,要求實現秒級延遲、分鐘級數據恢復。
-
廣告賬戶的實時消費數據場景: 通過監控賬戶粒度下的多維度指標數據,及時發現賬戶的消費變化,便于產品團隊根據實時消費情況推動運營團隊對賬戶預算進行調整。在該場景下數據一旦出現問題,就可能導致賬戶預算的錯誤調整,從而影響廣告的投放,這對公司和廣告主將造成不可估量的損失,因此在該場景中,同樣對數據準確性有很高的要求。目前在該場景下遇到的困難是如何在數據量比較大、查詢交叉的粒度比較細的前提下,實現秒級別查詢響應。
-
AB 實驗平臺: 在廣告業務中,算法和策略同學會針對不同的場景進行實驗,在該場景下,具有報表維度不固定、多種維度靈活組合、數據分析比較復雜、數據量較大等特點,這就需要可以在百萬級 QPS 下保證數據寫入存儲引擎的性能,因此我們需要針對業務場景進行特定的模型設計和處理上的優化,提高實時數據處理的性能以及數據查詢分析的效率,只有這樣才能滿足算法和策略同學對實驗報表的查詢分析需求。
實時數倉演進
為提升各場景下數據服務的效率,助力相關業務團隊更好推進商業化增長,截至目前實時數倉共經歷了三種模式的演進,分別是 Storm + Druid + MySQL 模式、Flink + Druid + TIDB 的模式 以及 Flink + Doris 的模式,本文將為大家進行詳細介紹實時數倉演進過程以及新一代實時數倉在廣告業務場景中的具體落地。
第一代架構
該階段的實時數倉是基于 Storm + Druid + MySQL 來構建的,Storm 為實時處理引擎,數據經 Storm 處理后,將數據寫入 Druid ,利用 Druid 的預聚合能力對寫入數據進行聚合。

架構痛點:
最初我們試圖依靠該架構解決業務上所有的實時問題,經由 Druid 統一對外提供數據查詢服務,但是在實際的落地過程中我們發現 Druid 是無法滿足某些分頁查詢和 Join 場景的,為解決該問題,我們只能利用 MySQL 定時任務的方式將數據定時從 Druid 寫入 MySQL 中(類似于將 MySQL 作為 Druid 的物化視圖),再通過 Druid + MySQL 的模式對外提供服務。通過這種方式暫時可以滿足某些場景需求,但隨著業務規模的逐步擴大,當面對更大規模數據下的查詢分析需求時,該架構已難以為繼,架構的缺陷也越發明顯:
- 面對數據量的持續增長,數據倉庫壓力空前劇增,已無法滿足實時數據的時效性要求。
- MySQL 的分庫分表維護難度高、投入成本大,且 MySQL 表之間的數據一致性無法保障。
第二代架構

基于第一套架構存在的問題,我們進行了首次升級,這次升級的主要變化是將 Storm 替換成新的實時數據處理引擎 Flink ,Flink 相較于 Storm 不僅在許多語義和功能上進行了擴展,還對數據的一致性做了保證,這些特性使得報表的時效性大幅提升;其次我們使用 TiDB 替換了 MySQL ,利用 TIDB 分布式的特性,一定程度上解決了 MySQL 分庫分表難以維護的問題(TiDB 在一定程度上比 MySQL 能夠承載更大數據量,可以拆分更少表)。在升級完成后,我們按照不同業務場景的需求,將 Flink 處理完的數據分別寫入 Druid 和 TiDB ,由 Druid 和 TIDB 對外提供數據查詢服務。
架構痛點:
雖然該階段的實時數倉架構有效提升了數據的時效性、降低了 MySQL 分庫分表維護的難度,但在一段時間的使用之后又暴露出了新的問題,也迫使我們進行了第二次升級:
- Flink + TIDB 無法實現端到端的一致性,原因是當其面對大規模的數據時,開啟事務將對 TiDB 寫入性能造成很大的影響,該場景下 TiDB 的事務形同虛設,心有余而力不足。
- Druid 不支持標準 SQL ,使用有一定的門檻,相關團隊使用數據時十分不便,這也直接導致了工作效率的下降。
- 維護成本較高,需要維護兩套引擎和兩套查詢邏輯,極大增加了維護和開發成本的投入。
新一代實時數倉架構
第二次升級我們引入 Apache Doris 結合 Flink 構建了新一代實時數倉架構,借鑒離線數倉分層理念對實時數倉進行分層構建,并統一 Apache Doris 作為數倉 OLAP 引擎,由 Doris 統一對外提供服務。

我們的數據主要源自于維表物料數據和業務打點日志。維表物料數據會定時全量同步到 Redis 或者 Aerospike (類似于 Redis 的 KV 存儲)中,通過 Binlog 變更進行增量同步。業務數據由各個團隊將日志收集到 Kafka,內部稱為 ODS 原始數據(ODS 原始數據不做任何處理),我們對 ODS 層的數據進行歸一化處理,包括字段命名、字段類型等,并對一些無效字段進行刪減,并根據業務場景拆分生成 DWD 層數據,DWD 層的數據通過業務邏輯加工以及關聯 Redis 中維表數據或者多流 Join,最后生成面向具體業務的大寬表(即 DWT 層數據),我們將 DWT 層數據經過聚合、經由 Stream Load 寫入 Doris 中,由 Doris 對外提供數據查詢服務。在離線數倉部分,同樣也有一些場景需要每日將加工完的 DWS 數據經由 Broker Load 寫入到 Doris 集群中,并利用 Doris 進行查詢加速,以提升我們對外提供服務的效率。
選擇 Doris 的原因
基于 Apache Doris 高性能、極簡易用、實時統一等諸多特性,助力 360商業化成功構建了新一代實時數倉架構,本次升級不僅提升了實時數據的復用性、實現了 OLAP 引擎的統一,而且滿足了各大業務場景嚴苛的數據查詢分析需求,使得整體實時數據流程架構變得簡單,大大降低了其維護和使用的成本。我們選擇 Doris 作為統一 OLAP 引擎的重要原因大致可歸結為以下幾點:
- 物化視圖: Doris 的物化視圖與廣告業務場景的特點契合度非常高,比如廣告業務中大部分報表的查詢維度相對比較固定,利用物化視圖的特性可以提升查詢的效率,同時 Doris 可以保證物化視圖和底層數據的一致性,該特性可幫助我們降低維護成本的投入。
- 數據一致性: Doris 提供了 Stream Load Label 機制,我們可通過事務的方式與 Flink 二階段提交進行結合,以保證冪等寫入數據,另外我們通過自研 Flink Sink Doris 組件,實現了數據的端到端的一致性,保證了數據的準確性。
- SQL 協議兼容:Doris 兼容 MySQL 協議,支持標準 SQL,這無論是對于開發同學,還是數據分析、產品同學,都可以實現無成本銜接,相關同學直接使用 SQL 就可以進行查詢,使用門檻很低,為公司節省了大量培訓和使用成本,同時也提升了工作效率。
- 優秀的查詢性能: Apache Doris 已全面實現向量化查詢引擎,使 Doris 的 OLAP 性能表現更加強悍,在多種查詢場景下都有非常明顯的性能提升,可極大優化了報表的詢速度。同時依托列式存儲引擎、現代的 MPP 架構、預聚合物化視圖、數據索引的實現,在低延遲和高吞吐查詢上,都達到了極速性能
- 運維難度低: Doris 對于集群和和數據副本管理上做了很多自動化工作,這些投入使得集群運維起來非常的簡單,近乎于實現零門檻運維。
在 AB 實驗平臺的具體落地
Apache Doris 目前廣泛應用于 360商業化內部的多個業務場景。比如在實時大盤場景中,我們利用 Doris 的 Aggregate 模型對請求、曝光、點擊、轉化等多個實時流進行事實表的 Join ;依靠 Doris 事務特性保證數據的一致性;通過多個物化視圖,提前根據報表維度聚合數據、提升查詢速度,由于物化視圖和 Base 表的一致關系由 Doris 來維護保證,這也極大的降低了使用復雜度。比如在賬戶實時消費場景中,我們主要借助 Doris 優秀的查詢優化器,通過 Join 來計算同環比......
接下來僅以 AB 實驗平臺這一典型業務場景為例,詳盡的為大家介紹 Doris 在該場景下的落地實踐,在上述所舉場景中的應用將不再贅述。
AB 實驗在廣告場景中的應用非常廣泛,是衡量設計、算法、模型、策略對產品指標提升的重要工具,也是精細化運營的重要手段,我們可以通過 AB實驗平臺對迭代方案進行測試,并結合數據進行分析和驗證,從而優化產品方案、提升廣告效果。
在文章開頭也有簡單介紹,AB 實驗場景所承載的業務相對比較復雜,這里再詳細說明一下:
- 各維度之間組合靈活度很高,例如需要對從 DSP 到流量類型再到廣告位置等十幾個維度進行分析,完成從請求、競價、曝光、點擊、轉化等幾十個指標的完整流量漏斗。
- 數據量巨大,日均流量可以達到百億級別,峰值可達百萬OPS(Operations Per Second),一條流量可能包含幾十個實驗標簽 ID。
基于以上特點,我們在 AB實驗場景中一方面需要保證數據算的快、數據延遲低、用戶查詢數據快,另一方面也要保證數據的準確性,保障數據不丟不重。

數據落地
當面對一條流量可能包含幾十個實驗標簽 ID 的情況時,從分析角度出發,只需要選中一個實驗標簽和一個對照實驗標簽進行分析;而如果通過like的方式在幾十個實驗標簽中去匹配選中的實驗標簽,實現效率就會非常低。
最初我們期望從數據入口處將實驗標簽打散,將一條包含 20 個實驗標簽的流量拆分為 20 條只包含一個實驗標簽的流量,再導入 Doris 的聚合模型中進行數據分析。而在這個過程中我們遇到一個明顯的問題,當數據被打散之后會膨脹數十倍,百億級數據將膨脹為千億級數據,即便 Doris 聚合模型會對數據再次壓縮,但這個過程會對集群造成極大的壓力。因此我們放棄該實現方式,開始嘗試將壓力分攤一部分到計算引擎,這里需要注意的是,如果將數據直接在 Flink 中打散,當 Job 全局 Hash 的窗口來 Merge 數據時,膨脹數十倍的數據也會帶來幾十倍的網絡和 CPU 消耗。
接著我們開始第三次嘗試,這次嘗試我們考慮在 Flink 端將數據拆分后立刻進行 Local Merge,在同一個算子的內存中開一個窗口,先將拆分的數據進行一層聚合,再通過 Job 全局 Hash 窗口進行第二層聚合,因為 Chain 在一起的兩個算子在同一個線程內,因此可以大幅降低膨脹后數據在不同算子之間傳輸的網絡消耗。該方式通過兩層窗口的聚合,再結合 Doris 的聚合模型,有效降低了數據的膨脹程度,其次我們也同步推動實業務方定期清理已下線的實驗,減少計算資源的浪費。

考慮到 AB實驗分析場景的特點,我們將實驗 ID 作為 Doris 的第一個排序字段,利用前綴索引可以很快定位到目標查詢的數據。另外根據常用的維度組合建立物化視圖,進一步縮小查詢的數據量,Doris 物化視圖基本能夠覆蓋 80% 的查詢場景,我們會定期分析查詢 SQL 來調整物化視圖。最終我們通過模型的設計、前綴索引的應用,結合物化視圖能力,使大部分實驗查詢結果能夠實現秒級返回。
數據一致性保障
數據的準確性是 AB實驗平臺的基礎,當算法團隊嘔心瀝血優化的模型使廣告效果提升了幾個百分點,卻因數據丟失看不出實驗效果,這樣的結果確實無法令人接受,同時這也是我們內部不允許出現的問題。那么我們該如何避免數據丟失、保障數據的一致性呢?
自研 Flink Sink Doris 組件
我們內部已有一套 Flink Stream API 腳手架,因此借助 Doris 的冪等寫特性和 Flink 的二階段提交特性,自研了 Sink To Doris 組件,保證了數據端到端的一致性,并在此基礎上新增了異常情況的數據保障機制。

在 Doris 0.14 版本中(初期使用的版本),我們一般通過“同一個 Label ID 只會被寫入一次”的機制來保證數據的一致性;在 Doris 1.0 版本之后,通過 “Doris 的事務結合 Flink 二階段提交”的機制來保證數據的一致性。這里詳細分享使用 Doris 1.0 版本之后,通過 “Doris 的事務結合 Flink 二階段提交”機制保證數據的一致性的原理與實現。
在 Flink 中做到數據端到端的一致性有兩種方式,一種為通過至少一次結合冪等寫,一種為通過恰好一次的二階段事務。
如右圖所示,我們首先在數據寫入階段先將數據寫入本地文件,一階段過程中將數據預提交到 Doris,并保存事務 ID 到狀態,如果 Checkpoint 失敗,則手動放棄 Doris 事務;如果 Checkpoint 成功,則在二階段進行事務提交。對于二階段提交重試多次仍然失敗的數據,將提供數據以及事務 ID 保存到 HDFS 的選項,通過 Broker Load 進行手動恢復。為了避免單次提交數據量過大,而導致 Stream Load 時長超過 Flink Checkpoint 時間的情況,我們提供了將單次 Checkpoint 拆分為多個事務的選項。最終成功通過二階段提交的機制實現了對數據一致性的保障。
應用展示
下圖為 Sink To Doris 的具體應用,整體工具屏蔽了 API 調用以及拓撲流的組裝,只需要通過簡單的配置即可完成 Stream Load 到 Doris 的數據寫入 。

集群監控
在集群監控層面,我們采用了社區提供的監控模板,從集群指標監控、主機指標監控、數據處理監控三個方面出發來搭建 Doris 監控體系。其中集群指標監控和主機指標監控主要根據社區監控說明文檔進行監控,以便我們查看集群整體運行的情況。除社區提供的模板之外,我們還新增了有關 Stream Load 的監控指標,比如對當前 Stream Load 數量以及寫入數據量的監控,如下圖所示:

除此之外,我們對數據寫入 Doris 的時長以及寫入的速度也比較關注,根據自身業務的需求,我們對任務寫入數據速度、處理數據耗時等數據處理相關指標進行監控,幫助我們及時發現數據寫入和讀取的異常情況,借助公司內部的報警平臺進行監控告警,報警方式支持電話、短信、推推、郵件等

總結與規劃
目前 Apache Doris 主要應用于廣告業務場景,已有數十臺集群機器,覆蓋近 70% 的實時數據分析場景,實現了全量離線實驗平臺以及部分離線 DWS 層數據查詢加速。當前日均新增數據規模可以達到百億級別,在大部分實時場景中,其查詢延遲在 1s 內。同時,Apache Doris 的成功落地使得我們完成了實時數倉在 OLAP 引擎上的統一。Doris 優異的分析性能及簡單易用的特點,也使得數倉架構更加簡潔。
未來我們將對 Doris 集群進行擴展,根據業務優先級進行資源隔離,完善資源管理機制,并計劃將 Doris 應用到 360商業化內部更廣泛的業務場景中,充分發揮 Doris 在 OLAP 場景的優勢。最后我們將更加深入的參與到 Doris 社區中來,積極回饋社區,與 Doris 并肩同行,共同進步!
