三種系統與 Unix 的智慧
半夜的中央廚房、現點現做的櫃台,還有一條 1964 年就設計好的水管——原來熱門網頁排行榜五分鐘就能做出來
「你問、它答」不是唯一的玩法
本書前面談了很多請求與查詢:你問一句,系統盡快回你一句。資料庫、快取、搜尋索引、網頁伺服器,大多長這樣。但這只是其中一種風格。這本書把資料處理系統分成三類:
等 client 的請求上門,收到就盡快處理、盡快回應。最在意 回應時間 與 availability——連不上,使用者就直接看到錯誤訊息。
吃進一大批 有界輸入,跑一個工作,吐出結果。常跑幾分鐘到好幾天,沒人在旁邊等,所以排程定期執行,最在意 吞吐量。本章主角。
介於前兩者之間。跟批次一樣消費輸入、產生輸出,但改成事件一發生就處理,而不是等一整批資料集齊——所以延遲比批次低。下一章才細講。
服務像現點現做的櫃台:客人站在前面等,慢了就翻臉,在意「上菜速度」。批次像半夜的中央廚房:沒人在等,廚師花一整晚把明天要用的高湯全部熬好,在意的是「天亮前能不能備完所有料」——多花十分鐘沒關係,天亮還沒備完才是災難。串流則像邊有人點、邊補貨的吧台,訂單一進來就動手。
直接讀原文,旁邊就是白話
這一段是本章開場定義三種系統的關鍵句。左邊原文,右邊白話,一句對一句。
服務會等 client 的請求送來。回應時間,通常是衡量一個服務效能的首要指標。
批次處理系統吃進一大批輸入資料,跑一個工作去處理它,然後產生一些輸出資料。
這種工作往往要跑好一陣子,所以通常沒有使用者在等它跑完。批次工作的首要效能指標,通常是吞吐量。
串流工作是在事件剛發生後不久就處理它,而批次工作處理的是一組固定的輸入資料。
因為串流處理是建立在批次處理的觀念之上,我們會在第 11 章討論它。
同樣是「處理資料」,優化目標完全不同:服務要「快回應」,批次要「衝總量」。搞錯類別去優化,會白費力氣——例如硬把批次工作逼成毫秒級回應,根本沒有意義。
經典任務:五分鐘找出網站最熱門的五個頁面
網站每處理一次請求,就往日誌檔追加一行。想找出「最熱門的五個頁面」,用 Unix 工具接一條管線就搞定,完全不用寫程式。按「下一步」,看資料怎麼一節一節被水管往下沖。
每個工具只做一件小事,用 管線(pipe) 把前一個的輸出餵給下一個的輸入,像水管一節節接起來。Unix 管線的發明人 McIlroy 早在 1964 年就說:接程式應該要像接花園水管一樣,需要換種方式處理資料,再接一節就好。
同一件事,也能自己寫程式做——但差別藏在「量」裡
上面那條管線,你也可以改用 Ruby 寫一支程式,維護一個 hash table 逐行計數。兩者結果一樣,但執行方式差很多,差別在處理大檔案時才會浮現。
像手上拿一本記事本:每看到一個網址,翻到那一頁把計數加一。只要記事本(記憶體)裝得下所有「不同」網址,就很快。
像把一大疊撲克牌先按花色排好,排完後相同的牌自然疊在一起,數一疊有幾張就好。桌面(記憶體)放不下,就分堆放回盒子(硬碟)再合併。
真正決定負擔大小的,不是日誌「總共幾行」,而是 working set(工作集) ——也就是「有幾種不同的網址」。不同網址少,記事本綽綽有餘;多到記事本裝不下,排序法就更划算,因為 mergesort 溢出硬碟 是順序讀寫,對硬碟很友善。
GNU Coreutils 的 sort 資料太大時會自動溢出到硬碟,還會自動用多核 CPU 平行排序。所以那條簡單管線其實能輕鬆擴展到大型資料集,瓶頸往往只剩「硬碟讀檔的速度」。想換統計目標也很簡單:把 awk 的 '{print $7}' 改成 '{print $1}',就從統計網頁換成統計來源 IP,其他工具完全不用動。
1978 年的哲學,為什麼今天還這麼準
能夠這樣輕鬆串接工具分析日誌,不是巧合——這是 Unix 設計哲學的核心,1978 年被寫成四條原則:
要做新工作就另寫一個,而不是把舊程式塞滿新功能。
別在輸出塞無關資訊,別堅持互動式輸入,讓下一個未知的程式接得上。
軟體要能在幾週內就試跑,笨拙的部分別捨不得丟。
就算得繞路先把工具做出來也值得,用完丟掉也沒關係。
而讓這些工具能夠任意插接的關鍵,是一個 統一介面(uniform interface) ——在 Unix 裡,介面就是「檔案」:一串有序的位元組。實體檔案、process 之間的管道、device、TCP 連線,全都用同一種介面。多數工具還約定把這串位元組當成用換行符分隔的 ASCII 文字。程式只管讀 stdin、寫 stdout,不在乎輸入從哪來、輸出去哪,這就是 邏輯與接線分離 ——使用者用 shell 自由接線。而輸入檔被當成不可變,你可以一試再試而不弄壞原始資料,還能在管線任一點接 less 偷看中間結果來除錯。
每塊積木只做好一件事——一塊就是一塊,不會同時想當輪子又當屋頂。統一的凸點與凹槽就是「統一介面」:因為所有積木的接點規格一致,不同盒子、不同年代、不同設計師做的積木都能互相拼接。拼錯了拆掉重拼也不傷積木,因為輸入不可變。如果每塊積木接點都不一樣,你就只能買同一盒、玩不出花樣——這正是資料被「巴爾幹化」、難以整合的窘境。
小試身手
來兩題,檢查一下你有沒有抓到這一節的骨架。
sort(在 uniq -c 之前)的作用是什麼?Unix 工具處理的是單機上的資料,天花板也在這裡——只能在單一機器上執行。接下來要看同樣的哲學,如何被搬到成千上萬台機器組成的叢集上。
MapReduce 與分散式檔案系統
把幾萬台普通機器的硬碟拼成一個巨大倉庫,再讓運算自己走到資料旁邊——這是 Google 和 Hadoop 教我們的省錢哲學
先解決一個問題:資料大到單機塞不下,怎麼辦?
上一站我們看到 Unix 管線很優雅,但它假設檔案能放在一台機器上。真實世界的資料常常大到幾百 TB、幾 PB——這時候就需要 分散式檔案系統。 Hadoop 用的是 HDFS, 它就是 Google File System(GFS)的開源重新實作。
HDFS 的地基是一個很有個性的選擇—— shared-nothing 原則:不蓋一棟昂貴的中央儲存設備(那是 NAS/SAN 的 shared-disk 做法,通常要用到 Fibre Channel 這種特殊硬體),而是讓每台一般機器自己掛幾顆硬碟,靠普通網路串起來就好。
對外提供存取「本機檔案」的網路服務,前提是資料中心裡每台一般機器本來就掛著幾顆硬碟。
一台中央伺服器記著「哪個檔案區塊放在哪台機器」,於是所有機器的硬碟就邏輯上拼成一個大檔案系統。
為了容忍機器與硬碟故障,每個檔案區塊會複製到多台機器;也可以用 erasure coding(抹除碼) 以更省空間的方式達成同等容錯。
每戶車庫都有空位(每台機器的硬碟),社區不另蓋一棟昂貴中央倉庫,而是直接把大家的車庫拼起來用——這就是 shared-nothing。NameNode 就像倉庫管理員:他不親自保管東西,但手上有本清單,記著「3 號箱在王家車庫、5 號箱在李家車庫」。怕某戶失火(機器壞掉),同一個箱子會在好幾戶各放一份,這就是區塊複製。
最大的 HDFS 部署跑在數萬台機器上,合計數百 PB 的容量。因為用的是便宜的普通硬體與開源軟體,成本遠低於同等容量的專用儲存設備。
直接讀原文:shared-nothing 到底在對抗什麼
這段原文把 HDFS 的設計哲學講得很乾脆——它是刻意跟「共享磁碟」的傳統架構對著幹。
HDFS 建立在「無共享」原則之上,跟 NAS、SAN 這類「共享磁碟」架構正好相反。
共享磁碟儲存靠的是一台中央儲存設備,通常要用到客製硬體和 Fibre Channel 這種特殊網路。
相對地,無共享架構完全不需要特殊硬體,只要一堆用普通資料中心網路連起來的電腦就行。
一台叫 NameNode 的中央伺服器,記錄著哪個檔案區塊存在哪台機器上。
為了容忍機器和硬碟故障,檔案區塊會被複製到多台機器上。
不需要特殊硬體、不需要中央專用設備——這兩句話合起來就是「便宜又能橫向擴充」。這也解釋了為什麼 HDFS 能撐到數萬台機器的規模,而傳統的專用儲存設備做不到這種量級。
MapReduce 的四個步驟:其實就是那條 Unix 管線的分散式版
還記得上一個模組那條找熱門網頁的 Unix 管線嗎?MapReduce 的處理模式幾乎一模一樣,只是搬到了成千上萬台機器上跑:
讀一堆輸入檔,切成一筆筆記錄(例如日誌裡一行就是一筆),由 input format parser 處理。
對每一筆記錄呼叫一次 mapper,抽出一個 key 與 value。對應到 awk '{print $7}':把網址當 key。
把所有 key-value 依 key 排序。這一步是 MapReduce 隱含自動做的,你完全不用寫。
對排序後的 key-value 呼叫 reducer。同 key 的值已相鄰排好,框架用 iterator 交給 reducer,對應 uniq -c 數次數。
你只要寫兩個函式——mapper 和 reducer——框架就把「資料怎麼跨機器搬」「機器壞了怎麼辦」全扛下來。如果需要第二次排序(例如把網址依次數排名),就再寫一個 job,把前一個的輸出當成它的輸入,串成 workflow。
Mapper 像第一批助教:每人拿一疊作業,逐份貼上「科目(key)+分數(value)」的便利貼,彼此不互通有無。Sort/Shuffle 像分流大廳:便利貼集中起來按科目分堆——這步框架自動幫你做。Reducer 像第二批助教:每人負責一個科目,拿到那一整疊分數,一口氣算出平均。key 就像收件地址,同地址的信最後都送到同一戶。
跑一次完整的 MapReduce job
下面五個角色,演給你看一份輸入資料怎麼變成最終結果。留意第二步——「把運算搬到資料旁邊」是全書反覆強調的重點:搬程式碼比搬資料便宜太多了。
這條路上,框架默默替你做了三件事:平行化(每個輸入檔區塊就是一個 partition,可各自平行處理)、決定誰收哪個 key(用 key 的雜湊送到固定的 reducer)、容錯(任務失敗就重試,因為輸入不可變,失敗任務的輸出會被直接丟棄)。
再點一次:HDFS 裡的兩個角色
整個流程的儲存地基,靠的就是 NameNode 和 DataNode 分工——點點看下面兩個節點。
兩份資料要合併,該怎麼 join?
批次處理裡談的 join,指的是「把整個資料集裡所有的關聯一次解掉」。舉例:左邊是使用者活動事件(只含 user ID),右邊是使用者資料庫(含生日)。要分析「哪些頁面受哪個年齡層歡迎」,就得把兩邊以 user ID join 起來。
最直覺的做法——對每筆活動事件,拿 user ID 去遠端資料庫查一次——吞吐量很差,受限於來回網路延遲,還可能把資料庫打垮,而且遠端資料會變,讓結果變得不確定(nondeterministic)。
正確做法是 sort-merge join——把使用者資料庫複製一份到同一個 HDFS,再用 MapReduce 把相關記錄帶到一起:一組 mapper 掃活動事件,輸出「user ID → 活動事件」;另一組 mapper 掃使用者資料庫,輸出「user ID → 生日」。框架依 user ID 分區、排序後,同一個 user ID 的生日與所有活動事件就相鄰地落進同一個 reducer。
還能用 secondary sort 讓 reducer「先看到生日,再看到依時間排序的活動事件」,reducer 就能把生日存進區域變數,再逐筆吐出「網址+觀看者年齡」。整個過程不用發任何網路請求,每次只需在記憶體保留一筆使用者記錄。
你不會讓每位賓客自己跑去問主辦方坐哪(逐筆遠端查詢,又慢又會把主辦方問爆)。而是先發給每張邀請函一個桌號(key),把所有邀請函按桌號排好,同桌的自然排成一落——這就是 sort/shuffle。每位招待(reducer)負責一桌,拿到那一落同桌賓客,輕鬆安排入座。secondary sort 就像規定「新娘的親屬卡先放最上面」,招待一翻開就先看到主桌資訊。
Join 與 GROUP BY,其實是同一招
「把相關資料聚到同一處」這招,除了拿來 join,也用在分組(GROUP BY):讓 mapper 用想分組的欄位當 key,框架就把同 key 的記錄聚到同一 reducer,再做計數、加總、取 top k 等彙整。常見應用如 sessionization ——用 session cookie 或 user ID 當 key,把散落在各台伺服器日誌裡、屬於同一使用者的活動事件聚起來,還原他的操作順序。
但如果某個 key 的資料量遠大於其他 key,會發生 hot key / skew(傾斜) ——像社群網站裡少數名人擁有數百萬追蹤者,這一桌的招待忙到天荒地老,別桌都排完了還在等他。
若某桌是大明星,幾百萬粉絲都想擠進同一桌(hot key),這桌招待就忙到天荒地老,別桌都排完了還在等他——這就是傾斜,整個 job 得等最慢的那個 reducer。
常見對策:skewed join(Pig 先跑抽樣 job 找出 hot key,再隨機分散)、sharded join(Crunch 需明確指定 hot key)、Hive 則在 metadata 標明 hot key 並對其用 map-side join。另一個有效招數是分兩階段聚合:第一階段把 hot key 隨機送到不同 reducer 各算部分結果,第二階段再把各部分合併成單一結果。
先跑一個抽樣 job 找出哪些是 hot key,再把它們的記錄隨機分散到多個 reducer。
需要工程師明確指定哪些 key 是 hot key,框架才知道要對它特別處理。
第一階段把 hot key 隨機送到多個 reducer 各算部分結果,第二階段再合併成最終單一結果。
小試身手
把這一站的兩個核心觀念——HDFS 的省錢哲學、MapReduce 怎麼分工——連起來檢查一下:
知道 MapReduce 怎麼跑之後,下一個問題是——當兩份資料要合併時,有沒有比「整批 shuffle」更快的辦法?
更快的 Join 與批次的產出
拿掉 reducer 之後,join 能有多快?做完之後,這堆資料又該去哪裡安家?
Reducer 很萬能,但很貴
上一站我們看過 reduce-side sort-merge join ——它對輸入毫無假設,什麼資料丟給它都能處理,堪稱萬能。但代價也很實在:資料要排序、要複製到 reducer、要合併,過程中可能被反覆寫進寫出硬碟。
如果你願意對輸入資料「先做一些假設」,其實有更省力的捷徑——一種砍掉 reducer、也不排序的精簡版 job,叫做 map-side join。 每個 mapper 只讀一個輸入區塊、寫一個輸出檔,就這樣結束了。
砍掉的正是 reduce-side join 最貴的那部分——排序與跨網路的 shuffle。少了這兩步,job 能快上一大截,但你得先滿足它開的條件。
原文對照:broadcast hash join 是怎麼一回事
這段是全書解釋最乾淨的一段——把「廣播」這個詞的由來講得很清楚。左邊原文、右邊白話,一句對一句讀。
最簡單的 map-side join,用在「大表 join 小表」的情境。
重點是:小表要小到能整份塞進每一個 mapper 的記憶體裡。
這個又簡單又有效的演算法叫 broadcast hash join:「broadcast(廣播)」是因為處理大表某一分區的每個 mapper,都會讀進整份小表。
如果 map-side join 的兩邊輸入用「一樣的方式」分區,那 hash join 就能對每個分區獨立套用。
只要分區分得正確,你就能保證所有該 join 在一起的記錄,都落在同一個編號的分區裡。
它字面上就是在講:小表被「廣播」到大表的每一個分區——每個 mapper 手上都有一整份小表的副本。名字直接對應行為,這是這本書很多術語的共同風格。
生活對照:核對來賓簽到表的三種方式
想像你要把一大疊「來賓簽到表」對上「VIP 名單」,找出誰是 VIP。三種 map-side join,剛好對應三種核對方式:
VIP 名單很短,乾脆印成小冊子,發給每一位核對人員(broadcast)。看到簽到表上的名字,翻冊子一查就知道是不是 VIP——這就是 broadcast hash join。
如果簽到表和 VIP 名單事先都按姓氏筆畫分成同樣的 10 個櫃台,3 號櫃台只要拿 3 號那段的 VIP 名單來對就好,名單小很多——這是 partitioned hash join。
兩份名單不只分好櫃台,還都按同樣順序排好,核對人員兩手各拿一疊,像拉鍊一樣邊比邊往下走,完全不必先記在腦中——這是 map-side merge join。
動手配配看:情境對上正確的 join 策略
三種情境,拖到它最該搭配的 join 方式。配完按「對答案」。
map-side join 對輸入的大小、分區、排序有更多「假設」——而這些條件通常不是憑空存在的,是前面的 MapReduce job 先整理好的。想選對捷徑,光知道檔案格式與目錄名不夠,還得知道分區數與排序用的 key;在 Hadoop 生態裡,這類中介資料常存在 HCatalog 與 Hive metastore。
Join 做完了,資料要去哪裡?
資料庫查詢常分兩種:OLTP 依 key 查少量記錄給人看;分析型查詢掃大量記錄、彙整出報表給分析師。批次處理比較像分析——也掃大量資料——但它的產物常常不是報表,而是別種結構。書裡舉了兩個典型例子:
Google 最早就是用 MapReduce 建搜尋引擎的索引(一個由 5~10 個 job 組成的 workflow)。mapper 把文件分區,每個 reducer 為自己那區建索引,索引檔寫進分散式檔案系統。查詢索引是唯讀操作,索引檔一旦寫出就不可變。文件變了,可以整批重跑取代,或像 Lucene 那樣增量合併段檔。
另一個常見用途,是拿批次處理來建機器學習系統——分類器(垃圾信過濾、異常偵測)或推薦系統(像「你可能認識的人」)。這類 job 的輸出常是一個可依 user ID 或 product ID 查詢的資料庫。
千萬別在 job 裡直接寫外部資料庫
最直覺的做法,是讓 mapper/reducer 直接連上正式資料庫,逐筆寫進去。這是個壞主意,原因有三:逐筆網路請求比批次正常吞吐量慢好幾個數量級;大量任務同時併發寫入會壓垮那個資料庫;更糟的是,它破壞了 MapReduce「全有或全無」的保證——失敗或 推測執行 留下的部分結果,會外洩成外部可見的副作用。
逐筆直寫正式資料庫,像叫工人衝進書店、在正在販售的書上一頁一頁手寫修改——客人(線上查詢)被打擾,改到一半的半成品被人看到,書店也被搞得大亂。正解是先在印刷廠把整本新版書印好、裝訂完成(不可變的輸出檔案),運到書店後,店員把舊版整批換成新版——原子切換(atomic switch)。新書如果印錯了,舊版還在倉庫,馬上換回去就好。
正確做法:在 job 裡建一個全新的資料庫檔案,寫進輸出目錄——就像建索引一樣。這些檔案不可變,再整批載入唯讀查詢伺服器。Voldemort 等 key-value store 就支援這種做法:載入時舊檔繼續服務,複製完成後原子地切換到新檔,出問題還能立刻切回舊檔。
這套「印書哲學」帶來什麼超能力?
把輸入當不可變、輸出整批替換,這正體現了 human fault tolerance(人為容錯) ——能從「人自己寫錯程式」中復原。
輸出不可變、舊輸出還留著:程式碼回滾重跑,或直接切回舊目錄就好——讀寫交易資料庫可沒有這種好事,資料被改爛了,回滾程式碼也救不回來。
輸入不可變,失敗任務的輸出又會被丟棄,重跑同一份輸入不會留下重複或污染的副作用,暫時性故障自動重試就能容忍。
錯誤不致造成無法挽回的損害,開發能更大膽快速;一隊專心把某個 job 做好,其他隊決定何時何地跑它,方便重用。
Unix 工具多半假設無型別文字檔,得靠 {print $7} 這種硬抽欄位的方式解析。Hadoop 上則常改用 Avro、Parquet 等結構化格式,提供高效的 schema 編碼,還能隨時間演化 schema——省去這道低價值的解析工。
小試身手
把 join 捷徑跟「批次輸出哲學」都搞懂了,來驗收兩題:
Join 快了之後,我們把鏡頭拉遠,看看 Hadoop 生態系與更新的技術怎麼演化。
Hadoop vs 資料庫,與 MapReduce 之後
一邊是「先收貨再解讀」的超大倉庫,一邊是「進廠先建模」的訂製工廠——最後兩邊都在往彼此靠攏
MapReduce 其實不是新發明
當年 MapReduce 論文發表時,其實一點都不新奇——前幾節談的那些平行 join 演算法,早在 MPP 資料庫 裡實作超過十年了。真正的差別在於:MPP 資料庫專注在叢集上平行跑分析型 SQL 查詢;而 Hadoop(MapReduce +分散式檔案系統)提供的更像一套通用作業系統,能跑任意程式。
第一個關鍵差異,是「怎麼收資料」。資料庫要你照特定模型(關聯、文件)把資料結構化好才能進去;HDFS 的檔案卻只是位元組序列,什麼格式都行——文字、影像、感測器讀數、基因序列,通通先收下再說。
Hadoop 讓你能隨手把原始資料倒進來,之後再想怎麼處理。把「詮釋資料」的負擔從生產者手上,轉移到消費者手上——這就是 schema-on-read,常用於「資料湖」。MPP 資料庫則相反,要求匯入前就得先仔細建好模——這叫 schema-on-write。
第二個差異,是「處理模型」。不是所有運算都能塞進一句 SQL——機器學習、推薦系統、影像分析都得寫程式。MapReduce 讓工程師能輕鬆對大資料跑自己的程式碼,你甚至能在 HDFS + MapReduce 上蓋一個 SQL 引擎(Hive 就是這樣做的)。Hadoop 生態的開放性,讓多種處理模型能共用同一叢集、存取同一份檔案,不必為了不同用途把資料搬來搬去。
直接讀原文:兩種世界觀的分岔點
這一段原文,把 Hadoop 與 MPP 資料庫的性格差異寫得很直白。左邊原文、右邊白話,一句對一句。
說白了,Hadoop 打開了一種可能:先不分青紅皂白把資料倒進 HDFS,之後再想辦法處理。
相反地,MPP 資料庫通常要求你在把資料匯入它專屬的儲存格式之前,就先仔細規劃好資料與查詢模式。
實務上看起來,讓資料快速可用——就算格式古怪、難用、很「生」——往往比一開始就想好完美資料模型更有價值。
如果某個節點在查詢執行中崩潰,大多數 MPP 資料庫會直接中止整個查詢,讓使用者重新送出,或系統自動重跑一次。
反觀 MapReduce,能容忍單一 map 或 reduce 任務失敗、而不影響整個 job——只要以「單一任務」為粒度重試就好。
MPP 像短跑選手:一場只跑幾秒到幾分鐘,摔了重新起跑成本不高,所以敢把資料留在記憶體裡衝速度。MapReduce 像超級馬拉松:一跑好幾小時,途中很可能被打斷,所以它隨時把進度寫到硬碟——被打斷只需從最近的檢查點補跑,不必整場重來。
收貨倉庫 vs 訂製工廠:一次看懂性格差異
把 Hadoop 和 MPP 資料庫想成兩種收料哲學——一個先收貨再解讀,一個進廠前先畫好設計圖。
貨先全部倒進來,連奇形怪狀、半成品都收(schema-on-read)。匯集速度極快,且同一批原始資料能被不同人切出不同料理——異質資料、開放生態是它的強項。
東西進廠前必須先精準畫好設計圖、分門別類擺進專用櫃(schema-on-write)。查得快、整合度高,但前置作業慢,通常也比較封閉,假設資料同質。
查詢通常只跑幾秒到幾分鐘。一節點崩潰就中止整個查詢重跑,成本不高;偏好把資料留在記憶體(如 hash join)搶速度。
一跑好幾小時,途中常被 搶占(preemption) 拉去讓路。以單一任務為粒度重試,很愛把資料寫到硬碟,被打斷只補跑最近的檢查點。
Google 的資料中心把線上服務跟離線批次混著跑,批次以低優先權執行,撿高優先權吃剩的資源——「撿桌底下的碎屑」。一個跑一小時的 MapReduce 任務約有 5% 機率被搶占終止,比硬體故障率高一個數量級以上。這種「隨時可能被終止」的自由,換來的是更好的整體資源利用率,而不是因為機器特別容易壞。
Dataflow 引擎怎麼「不等全部到齊」就開工
MapReduce 把中介結果寫成檔案、完全 物化(materialization) 到 HDFS——這代表下一個 job 得等前一個 job「所有」任務都做完才能開始,一個 拖後腿任務(straggler) 就拖累全局。Spark、Tez、Flink 這些 dataflow 引擎 把整個 workflow 當成一個 job,讓下一站不必等全部到齊就能開工。按「下一步」看資料怎麼在生產線上流動。
MapReduce 像分站作業:第一站做完「所有」半成品、整批堆進倉庫,第二站才開工,一個手腳慢的工人就拖累全線。Dataflow 引擎(Flink 尤其如此)像連續輸送帶:一站做好一件就馬上送給下一站,半成品放手邊小推車(記憶體/本機硬碟),不必每件都搬進大倉庫還抄三份。排序是例外——它得先收完全部輸入才能吐出第一個結果,所以仍得暫時累積狀態。
少寫硬碟之後,怎麼容錯?
物化到 HDFS 的好處是中介狀態耐久,容錯很單純:任務失敗就重啟、重讀同一輸入。但 dataflow 引擎刻意少寫 HDFS,只好換一種容錯思路——重算。
Spark 用 RDD 的血緣(lineage)追蹤一份資料是由哪些輸入分區、套了哪些 operator 算出來的;Flink 則用 checkpoint 保存 operator 狀態。
某台機器壞了、中介狀態遺失,就從仍可用的上一階段或原始輸入重新算出來,而不是去讀早已不存在的 HDFS 副本。
同樣輸入必須永遠算出同樣輸出,否則重算的資料會跟已經送給下游的舊資料矛盾,只好連下游一起殺掉重跑——這叫連鎖故障(cascading faults)。
若中介資料遠小於原始資料、或運算極耗 CPU,先把中介資料物化存起來,反而比故障時整個重算更划算。
hash table 的迭代順序、亂數、系統時鐘、外部資料源——這些都可能讓同樣的程式碼在不同時間跑出不同結果。要靠重算容錯,就得用固定種子等方式,把這些不確定性一一移除。
圖運算:為什麼 MapReduce 天生不擅長?
像估算網頁人氣的 PageRank 這類圖演算法,要「沿邊一步步傳播資訊、反覆迭代直到收斂」。但純 MapReduce 只做單趟掃描,硬套上去很沒效率——每一輪都得讀整份輸入、產生全新輸出,就算只有一小塊圖變了也照樣全部重來。
針對這個痛點,圖批次處理流行一種 Pregel(BSP)模型:一個頂點可以沿邊「送訊息」給另一個頂點,每輪迭代對每個頂點呼叫一次函式,傳入它收到的所有訊息——很像一次 reducer 呼叫。但關鍵差異是:頂點在迭代之間把狀態留在記憶體裡,函式只需要處理新進來的訊息,沒有訊息往來的區域就完全不必做事。
每一輪迭代,都會對每個頂點呼叫一次函式,傳入所有送給它的訊息——很像一次 reducer 呼叫。
跟 MapReduce 不同的地方在於:Pregel 模型裡的頂點會把狀態記在記憶體中、跨迭代保留,所以函式只需要處理新收到的訊息。
如果圖的某個區域完全沒有訊息在傳遞,那個地方就什麼事都不用做。
頂點的狀態、以及頂點之間傳的訊息,都是容錯且耐久的,而且通訊是以固定回合進行的。
把頂點想成一座座村莊:每回合聽完鄰村捎來的所有口信,更新自己知道的事,再沿路把新消息送給鄰村。村莊不會每回合都失憶——只處理「新」收到的口信,沒人傳話的區域就安靜休息。大家講好「這一輪送出的話,下一輪一定都會收到」(固定回合制);每隔幾回合,全村把所知抄一份存進保險箱(checkpoint),萬一某村失火失憶,就回到最近一次存檔重來。
值得一提:別把「dataflow 引擎把 job 排成 DAG」和「圖運算」搞混——前者是資料流長得像圖(流動的通常是關聯式 tuple),後者是資料本身就是圖。是兩件不同的事。
殊途同歸:批次框架越來越像資料庫
直接寫 MapReduce 很費工,連 join 都要自己刻演算法。Hive、Pig、Spark/Flink 的 DataFrame API 提供 宣告式 API(declarative API) ——你只要用 join、group、filter 這些關聯式積木說出「要什麼」,框架的最佳化器就會分析輸入、自動挑最合適的 join 演算法,甚至調整 join 順序讓中介狀態最小,你不必自己背完所有演算法。
更進一步,簡單的 filter 或選欄位若也宣告式表達,最佳化器就能讓欄式儲存只讀需要的欄,並用 向量化執行(vectorized execution) 在緊湊內迴圈裡成批處理,省去逐筆呼叫函式的開銷。但批次框架仍保留寫程式的自由——能呼叫任意程式碼、運用 parsing、NLP、影像等龐大函式庫生態,這是 MPP 資料庫的 UDF 難以企及的。
批次框架越來越宣告式、效能逼近 MPP;MPP 資料庫也越來越可程式化、越來越靈活。到頭來,它們其實都只是「儲存與處理資料的系統」,只是歷史上從不同的起點出發。
批次處理再快,終究要等資料整批到齊才能跑;如果我們不想等呢?下一章進入即時世界。