01

三種系統與 Unix 的智慧

半夜的中央廚房、現點現做的櫃台,還有一條 1964 年就設計好的水管——原來熱門網頁排行榜五分鐘就能做出來

「你問、它答」不是唯一的玩法

本書前面談了很多請求與查詢:你問一句,系統盡快回你一句。資料庫、快取、搜尋索引、網頁伺服器,大多長這樣。但這只是其中一種風格。這本書把資料處理系統分成三類:

🛎️
服務 Service(online)

等 client 的請求上門,收到就盡快處理、盡快回應。最在意 回應時間 與 availability——連不上,使用者就直接看到錯誤訊息。

🌙
批次處理 Batch(offline)

吃進一大批 有界輸入,跑一個工作,吐出結果。常跑幾分鐘到好幾天,沒人在旁邊等,所以排程定期執行,最在意 吞吐量。本章主角。

🌊
串流 Stream(近即時)

介於前兩者之間。跟批次一樣消費輸入、產生輸出,但改成事件一發生就處理,而不是等一整批資料集齊——所以延遲比批次低。下一章才細講。

🍜
想成三種餐廳出餐模式

服務像現點現做的櫃台:客人站在前面等,慢了就翻臉,在意「上菜速度」。批次像半夜的中央廚房:沒人在等,廚師花一整晚把明天要用的高湯全部熬好,在意的是「天亮前能不能備完所有料」——多花十分鐘沒關係,天亮還沒備完才是災難。串流則像邊有人點、邊補貨的吧台,訂單一進來就動手。

直接讀原文,旁邊就是白話

這一段是本章開場定義三種系統的關鍵句。左邊原文,右邊白話,一句對一句。

原文 · DDIA Ch.10 A service waits for a request or instruction from a client to arrive. Response time is usually the primary measure of performance of a service. A batch processing system takes a large amount of input data, runs a job to process it, and produces some output data. Jobs often take a while, so there normally isn't a user waiting for the job to finish. The primary performance measure of a batch job is usually throughput. A stream job operates on events shortly after they happen, whereas a batch job operates on a fixed set of input data. As stream processing builds upon batch processing, we discuss it in Chapter 11.
白話翻譯

服務會等 client 的請求送來。回應時間,通常是衡量一個服務效能的首要指標。

批次處理系統吃進一大批輸入資料,跑一個工作去處理它,然後產生一些輸出資料。

這種工作往往要跑好一陣子,所以通常沒有使用者在等它跑完。批次工作的首要效能指標,通常是吞吐量。

串流工作是在事件剛發生後不久就處理它,而批次工作處理的是一組固定的輸入資料。

因為串流處理是建立在批次處理的觀念之上,我們會在第 11 章討論它。

💡
先分類,才知道該優化什麼

同樣是「處理資料」,優化目標完全不同:服務要「快回應」,批次要「衝總量」。搞錯類別去優化,會白費力氣——例如硬把批次工作逼成毫秒級回應,根本沒有意義。

經典任務:五分鐘找出網站最熱門的五個頁面

網站每處理一次請求,就往日誌檔追加一行。想找出「最熱門的五個頁面」,用 Unix 工具接一條管線就搞定,完全不用寫程式。按「下一步」,看資料怎麼一節一節被水管往下沖。

📜
cat 讀日誌
✂️
awk 取欄位
🔤
sort 排序
🔢
uniq -c 計數
📊
sort -rn 依次數排
🏆
head -n 5 取前五
按「下一步」開始這趟水管之旅
🚿
每個工具只做好一件事

每個工具只做一件小事,用 管線(pipe) 把前一個的輸出餵給下一個的輸入,像水管一節節接起來。Unix 管線的發明人 McIlroy 早在 1964 年就說:接程式應該要像接花園水管一樣,需要換種方式處理資料,再接一節就好。

同一件事,也能自己寫程式做——但差別藏在「量」裡

上面那條管線,你也可以改用 Ruby 寫一支程式,維護一個 hash table 逐行計數。兩者結果一樣,但執行方式差很多,差別在處理大檔案時才會浮現。

📓
in-memory 聚合(hash table)

像手上拿一本記事本:每看到一個網址,翻到那一頁把計數加一。只要記事本(記憶體)裝得下所有「不同」網址,就很快。

🃏
排序法(sort)

像把一大疊撲克牌先按花色排好,排完後相同的牌自然疊在一起,數一疊有幾張就好。桌面(記憶體)放不下,就分堆放回盒子(硬碟)再合併。

真正決定負擔大小的,不是日誌「總共幾行」,而是 working set(工作集) ——也就是「有幾種不同的網址」。不同網址少,記事本綽綽有餘;多到記事本裝不下,排序法就更划算,因為 mergesort 溢出硬碟 是順序讀寫,對硬碟很友善。

⚙️
sort 比你想的聰明

GNU Coreutils 的 sort 資料太大時會自動溢出到硬碟,還會自動用多核 CPU 平行排序。所以那條簡單管線其實能輕鬆擴展到大型資料集,瓶頸往往只剩「硬碟讀檔的速度」。想換統計目標也很簡單:把 awk 的 '{print $7}' 改成 '{print $1}',就從統計網頁換成統計來源 IP,其他工具完全不用動。

1978 年的哲學,為什麼今天還這麼準

能夠這樣輕鬆串接工具分析日誌,不是巧合——這是 Unix 設計哲學的核心,1978 年被寫成四條原則:

1
每個程式只做好一件事

要做新工作就另寫一個,而不是把舊程式塞滿新功能。

2
預期輸出會變成別人的輸入

別在輸出塞無關資訊,別堅持互動式輸入,讓下一個未知的程式接得上。

3
儘早試跑,勇於丟掉重寫

軟體要能在幾週內就試跑,笨拙的部分別捨不得丟。

4
善用工具而非人力

就算得繞路先把工具做出來也值得,用完丟掉也沒關係。

而讓這些工具能夠任意插接的關鍵,是一個 統一介面(uniform interface) ——在 Unix 裡,介面就是「檔案」:一串有序的位元組。實體檔案、process 之間的管道、device、TCP 連線,全都用同一種介面。多數工具還約定把這串位元組當成用換行符分隔的 ASCII 文字。程式只管讀 stdin、寫 stdout,不在乎輸入從哪來、輸出去哪,這就是 邏輯與接線分離 ——使用者用 shell 自由接線。而輸入檔被當成不可變,你可以一試再試而不弄壞原始資料,還能在管線任一點接 less 偷看中間結果來除錯。

🧱
Unix 工具就像樂高積木

每塊積木只做好一件事——一塊就是一塊,不會同時想當輪子又當屋頂。統一的凸點與凹槽就是「統一介面」:因為所有積木的接點規格一致,不同盒子、不同年代、不同設計師做的積木都能互相拼接。拼錯了拆掉重拼也不傷積木,因為輸入不可變。如果每塊積木接點都不一樣,你就只能買同一盒、玩不出花樣——這正是資料被「巴爾幹化」、難以整合的窘境。

小試身手

來兩題,檢查一下你有沒有抓到這一節的骨架。

某公司每天凌晨 2 點自動跑一支程式,把前一天所有交易紀錄整理成財報。這最符合哪一種系統?
在找熱門頁面的那條管線裡,第一個 sort(在 uniq -c 之前)的作用是什麼?
🐘
下一站:MapReduce 與分散式檔案系統

Unix 工具處理的是單機上的資料,天花板也在這裡——只能在單一機器上執行。接下來要看同樣的哲學,如何被搬到成千上萬台機器組成的叢集上。

02

MapReduce 與分散式檔案系統

把幾萬台普通機器的硬碟拼成一個巨大倉庫,再讓運算自己走到資料旁邊——這是 Google 和 Hadoop 教我們的省錢哲學

先解決一個問題:資料大到單機塞不下,怎麼辦?

上一站我們看到 Unix 管線很優雅,但它假設檔案能放在一台機器上。真實世界的資料常常大到幾百 TB、幾 PB——這時候就需要 分散式檔案系統。 Hadoop 用的是 HDFS, 它就是 Google File System(GFS)的開源重新實作。

HDFS 的地基是一個很有個性的選擇—— shared-nothing 原則:不蓋一棟昂貴的中央儲存設備(那是 NAS/SAN 的 shared-disk 做法,通常要用到 Fibre Channel 這種特殊硬體),而是讓每台一般機器自己掛幾顆硬碟,靠普通網路串起來就好。

1
每台機器跑一個 daemon

對外提供存取「本機檔案」的網路服務,前提是資料中心裡每台一般機器本來就掛著幾顆硬碟。

2
NameNode 記清單

一台中央伺服器記著「哪個檔案區塊放在哪台機器」,於是所有機器的硬碟就邏輯上拼成一個大檔案系統。

3
區塊複製防故障

為了容忍機器與硬碟故障,每個檔案區塊會複製到多台機器;也可以用 erasure coding(抹除碼) 以更省空間的方式達成同等容錯。

🏘️
想成社區共享倉庫

每戶車庫都有空位(每台機器的硬碟),社區不另蓋一棟昂貴中央倉庫,而是直接把大家的車庫拼起來用——這就是 shared-nothing。NameNode 就像倉庫管理員:他不親自保管東西,但手上有本清單,記著「3 號箱在王家車庫、5 號箱在李家車庫」。怕某戶失火(機器壞掉),同一個箱子會在好幾戶各放一份,這就是區塊複製。

📊
規模有多驚人

最大的 HDFS 部署跑在數萬台機器上,合計數百 PB 的容量。因為用的是便宜的普通硬體與開源軟體,成本遠低於同等容量的專用儲存設備。

直接讀原文:shared-nothing 到底在對抗什麼

這段原文把 HDFS 的設計哲學講得很乾脆——它是刻意跟「共享磁碟」的傳統架構對著幹。

原文 · DDIA Ch.10 HDFS is based on the shared-nothing principle, in contrast to the shared-disk approach of Network Attached Storage (NAS) and Storage Area Network (SAN) architectures. Shared-disk storage is implemented by a centralized storage appliance, often using custom hardware and special network infrastructure such as Fibre Channel. On the other hand, the shared-nothing approach requires no special hardware, only computers connected by a conventional datacenter network. A central server called the NameNode keeps track of which file blocks are stored on which machine. In order to tolerate machine and disk failures, file blocks are replicated on multiple machines.
白話翻譯

HDFS 建立在「無共享」原則之上,跟 NAS、SAN 這類「共享磁碟」架構正好相反。

共享磁碟儲存靠的是一台中央儲存設備,通常要用到客製硬體和 Fibre Channel 這種特殊網路。

相對地,無共享架構完全不需要特殊硬體,只要一堆用普通資料中心網路連起來的電腦就行。

一台叫 NameNode 的中央伺服器,記錄著哪個檔案區塊存在哪台機器上。

為了容忍機器和硬碟故障,檔案區塊會被複製到多台機器上。

💰
省錢不是意外,是設計目標

不需要特殊硬體、不需要中央專用設備——這兩句話合起來就是「便宜又能橫向擴充」。這也解釋了為什麼 HDFS 能撐到數萬台機器的規模,而傳統的專用儲存設備做不到這種量級。

MapReduce 的四個步驟:其實就是那條 Unix 管線的分散式版

還記得上一個模組那條找熱門網頁的 Unix 管線嗎?MapReduce 的處理模式幾乎一模一樣,只是搬到了成千上萬台機器上跑:

1
讀入並切成記錄

讀一堆輸入檔,切成一筆筆記錄(例如日誌裡一行就是一筆),由 input format parser 處理。

2
Map

對每一筆記錄呼叫一次 mapper,抽出一個 key 與 value。對應到 awk '{print $7}':把網址當 key。

3
Sort / Shuffle

把所有 key-value 依 key 排序。這一步是 MapReduce 隱含自動做的,你完全不用寫。

4
Reduce

對排序後的 key-value 呼叫 reducer。同 key 的值已相鄰排好,框架用 iterator 交給 reducer,對應 uniq -c 數次數。

你只要寫兩個函式——mapper 和 reducer——框架就把「資料怎麼跨機器搬」「機器壞了怎麼辦」全扛下來。如果需要第二次排序(例如把網址依次數排名),就再寫一個 job,把前一個的輸出當成它的輸入,串成 workflow

🎓
想成全校作業分類計分

Mapper 像第一批助教:每人拿一疊作業,逐份貼上「科目(key)+分數(value)」的便利貼,彼此不互通有無。Sort/Shuffle 像分流大廳:便利貼集中起來按科目分堆——這步框架自動幫你做。Reducer 像第二批助教:每人負責一個科目,拿到那一整疊分數,一口氣算出平均。key 就像收件地址,同地址的信最後都送到同一戶。

跑一次完整的 MapReduce job

下面五個角色,演給你看一份輸入資料怎麼變成最終結果。留意第二步——「把運算搬到資料旁邊」是全書反覆強調的重點:搬程式碼比搬資料便宜太多了。

🗄️
HDFS(三副本)
⚙️
Mapper
🔀
Shuffle(排序+搬運)
🧮
Reducer
按「下一步」看一份輸入怎麼變成結果

這條路上,框架默默替你做了三件事:平行化(每個輸入檔區塊就是一個 partition,可各自平行處理)、決定誰收哪個 key(用 key 的雜湊送到固定的 reducer)、容錯(任務失敗就重試,因為輸入不可變,失敗任務的輸出會被直接丟棄)。

再點一次:HDFS 裡的兩個角色

整個流程的儲存地基,靠的就是 NameNode 和 DataNode 分工——點點看下面兩個節點。

HDFS 內部分工
📋 NameNode
💾 DataNode
NameNode:中央伺服器,記錄『哪個檔案區塊存在哪台機器上』,本身不儲存實際資料內容,只管位置清單。少了它,沒人知道資料散落在哪。

兩份資料要合併,該怎麼 join?

批次處理裡談的 join,指的是「把整個資料集裡所有的關聯一次解掉」。舉例:左邊是使用者活動事件(只含 user ID),右邊是使用者資料庫(含生日)。要分析「哪些頁面受哪個年齡層歡迎」,就得把兩邊以 user ID join 起來。

🚫
別做的事:逐筆查遠端資料庫

最直覺的做法——對每筆活動事件,拿 user ID 去遠端資料庫查一次——吞吐量很差,受限於來回網路延遲,還可能把資料庫打垮,而且遠端資料會變,讓結果變得不確定(nondeterministic)

正確做法是 sort-merge join——把使用者資料庫複製一份到同一個 HDFS,再用 MapReduce 把相關記錄帶到一起:一組 mapper 掃活動事件,輸出「user ID → 活動事件」;另一組 mapper 掃使用者資料庫,輸出「user ID → 生日」。框架依 user ID 分區、排序後,同一個 user ID 的生日與所有活動事件就相鄰地落進同一個 reducer。

還能用 secondary sort 讓 reducer「先看到生日,再看到依時間排序的活動事件」,reducer 就能把生日存進區域變數,再逐筆吐出「網址+觀看者年齡」。整個過程不用發任何網路請求,每次只需在記憶體保留一筆使用者記錄。

💒
想成婚禮排座位

你不會讓每位賓客自己跑去問主辦方坐哪(逐筆遠端查詢,又慢又會把主辦方問爆)。而是先發給每張邀請函一個桌號(key),把所有邀請函按桌號排好,同桌的自然排成一落——這就是 sort/shuffle。每位招待(reducer)負責一桌,拿到那一落同桌賓客,輕鬆安排入座。secondary sort 就像規定「新娘的親屬卡先放最上面」,招待一翻開就先看到主桌資訊。

Join 與 GROUP BY,其實是同一招

「把相關資料聚到同一處」這招,除了拿來 join,也用在分組(GROUP BY):讓 mapper 用想分組的欄位當 key,框架就把同 key 的記錄聚到同一 reducer,再做計數、加總、取 top k 等彙整。常見應用如 sessionization ——用 session cookie 或 user ID 當 key,把散落在各台伺服器日誌裡、屬於同一使用者的活動事件聚起來,還原他的操作順序。

但如果某個 key 的資料量遠大於其他 key,會發生 hot key / skew(傾斜) ——像社群網站裡少數名人擁有數百萬追蹤者,這一桌的招待忙到天荒地老,別桌都排完了還在等他。

名人那一桌

若某桌是大明星,幾百萬粉絲都想擠進同一桌(hot key),這桌招待就忙到天荒地老,別桌都排完了還在等他——這就是傾斜,整個 job 得等最慢的那個 reducer。

常見對策:skewed join(Pig 先跑抽樣 job 找出 hot key,再隨機分散)、sharded join(Crunch 需明確指定 hot key)、Hive 則在 metadata 標明 hot key 並對其用 map-side join。另一個有效招數是分兩階段聚合:第一階段把 hot key 隨機送到不同 reducer 各算部分結果,第二階段再把各部分合併成單一結果。

🎲
skewed join(Pig)

先跑一個抽樣 job 找出哪些是 hot key,再把它們的記錄隨機分散到多個 reducer。

🏷️
sharded join(Crunch)

需要工程師明確指定哪些 key 是 hot key,框架才知道要對它特別處理。

🪜
兩階段聚合

第一階段把 hot key 隨機送到多個 reducer 各算部分結果,第二階段再合併成最終單一結果。

小試身手

把這一站的兩個核心觀念——HDFS 的省錢哲學、MapReduce 怎麼分工——連起來檢查一下:

MapReduce 排程器傾向把 map 任務排到「存有該輸入檔副本的機器」上執行,這麼做的主要好處是?
社群網站中,極少數名人擁有數百萬追蹤者。把與某位名人相關的所有記錄都送到單一 reducer 會造成什麼後果?
🔗
下一站:更快的 Join

知道 MapReduce 怎麼跑之後,下一個問題是——當兩份資料要合併時,有沒有比「整批 shuffle」更快的辦法?

03

更快的 Join 與批次的產出

拿掉 reducer 之後,join 能有多快?做完之後,這堆資料又該去哪裡安家?

Reducer 很萬能,但很貴

上一站我們看過 reduce-side sort-merge join ——它對輸入毫無假設,什麼資料丟給它都能處理,堪稱萬能。但代價也很實在:資料要排序、要複製到 reducer、要合併,過程中可能被反覆寫進寫出硬碟。

如果你願意對輸入資料「先做一些假設」,其實有更省力的捷徑——一種砍掉 reducer、也不排序的精簡版 job,叫做 map-side join。 每個 mapper 只讀一個輸入區塊、寫一個輸出檔,就這樣結束了。

✂️
砍掉的是什麼?

砍掉的正是 reduce-side join 最貴的那部分——排序與跨網路的 shuffle。少了這兩步,job 能快上一大截,但你得先滿足它開的條件。

原文對照:broadcast hash join 是怎麼一回事

這段是全書解釋最乾淨的一段——把「廣播」這個詞的由來講得很清楚。左邊原文、右邊白話,一句對一句讀。

原文 · DDIA Ch.10 The simplest way of performing a map-side join applies in the case where a large dataset is joined with a small dataset. In particular, the small dataset needs to be small enough that it can be loaded entirely into memory in each of the mappers. This simple but effective algorithm is called a broadcast hash join: the word broadcast reflects the fact that each mapper for a partition of the large input reads the entirety of the small input. If the inputs to the map-side join are partitioned in the same way, then the hash join approach can be applied to each partition independently. If the partitioning is done correctly, you can be sure that all the records you might want to join are located in the same numbered partition.
白話翻譯

最簡單的 map-side join,用在「大表 join 小表」的情境。

重點是:小表要小到能整份塞進每一個 mapper 的記憶體裡。

這個又簡單又有效的演算法叫 broadcast hash join:「broadcast(廣播)」是因為處理大表某一分區的每個 mapper,都會讀進整份小表。

如果 map-side join 的兩邊輸入用「一樣的方式」分區,那 hash join 就能對每個分區獨立套用。

只要分區分得正確,你就能保證所有該 join 在一起的記錄,都落在同一個編號的分區裡。

💡
「broadcast」這個字沒有亂用

它字面上就是在講:小表被「廣播」到大表的每一個分區——每個 mapper 手上都有一整份小表的副本。名字直接對應行為,這是這本書很多術語的共同風格。

生活對照:核對來賓簽到表的三種方式

想像你要把一大疊「來賓簽到表」對上「VIP 名單」,找出誰是 VIP。三種 map-side join,剛好對應三種核對方式:

1
人手一本小冊子

VIP 名單很短,乾脆印成小冊子,發給每一位核對人員(broadcast)。看到簽到表上的名字,翻冊子一查就知道是不是 VIP——這就是 broadcast hash join

2
按樓層/櫃台分流

如果簽到表和 VIP 名單事先都按姓氏筆畫分成同樣的 10 個櫃台,3 號櫃台只要拿 3 號那段的 VIP 名單來對就好,名單小很多——這是 partitioned hash join

3
兩疊都已排好序

兩份名單不只分好櫃台,還都按同樣順序排好,核對人員兩手各拿一疊,像拉鍊一樣邊比邊往下走,完全不必先記在腦中——這是 map-side merge join

動手配配看:情境對上正確的 join 策略

三種情境,拖到它最該搭配的 join 方式。配完按「對答案」。

Broadcast hash join
Partitioned hash join
Map-side merge join
小表只有幾 MB,能整份塞進每個 mapper 的記憶體
拖到這裡
兩邊都很大,但已用相同 key、相同雜湊函數分成同樣數量的區
拖到這裡
兩邊都很大,且不只同分區、還都已依相同 key 排序好
拖到這裡
⚠️
天下沒有白吃的午餐

map-side join 對輸入的大小、分區、排序有更多「假設」——而這些條件通常不是憑空存在的,是前面的 MapReduce job 先整理好的。想選對捷徑,光知道檔案格式與目錄名不夠,還得知道分區數與排序用的 key;在 Hadoop 生態裡,這類中介資料常存在 HCatalog 與 Hive metastore。

Join 做完了,資料要去哪裡?

資料庫查詢常分兩種:OLTP 依 key 查少量記錄給人看;分析型查詢掃大量記錄、彙整出報表給分析師。批次處理比較像分析——也掃大量資料——但它的產物常常不是報表,而是別種結構。書裡舉了兩個典型例子:

🔍
建搜尋索引

Google 最早就是用 MapReduce 建搜尋引擎的索引(一個由 5~10 個 job 組成的 workflow)。mapper 把文件分區,每個 reducer 為自己那區建索引,索引檔寫進分散式檔案系統。查詢索引是唯讀操作,索引檔一旦寫出就不可變。文件變了,可以整批重跑取代,或像 Lucene 那樣增量合併段檔。

🤖
Key-Value 資料庫

另一個常見用途,是拿批次處理來建機器學習系統——分類器(垃圾信過濾、異常偵測)或推薦系統(像「你可能認識的人」)。這類 job 的輸出常是一個可依 user ID 或 product ID 查詢的資料庫。

千萬別在 job 裡直接寫外部資料庫

最直覺的做法,是讓 mapper/reducer 直接連上正式資料庫,逐筆寫進去。這是個壞主意,原因有三:逐筆網路請求比批次正常吞吐量慢好幾個數量級;大量任務同時併發寫入會壓垮那個資料庫;更糟的是,它破壞了 MapReduce「全有或全無」的保證——失敗或 推測執行 留下的部分結果,會外洩成外部可見的副作用。

📖
批次輸出像出版一本書

逐筆直寫正式資料庫,像叫工人衝進書店、在正在販售的書上一頁一頁手寫修改——客人(線上查詢)被打擾,改到一半的半成品被人看到,書店也被搞得大亂。正解是先在印刷廠把整本新版書印好、裝訂完成(不可變的輸出檔案),運到書店後,店員把舊版整批換成新版——原子切換(atomic switch)。新書如果印錯了,舊版還在倉庫,馬上換回去就好。

正確做法:在 job 裡建一個全新的資料庫檔案,寫進輸出目錄——就像建索引一樣。這些檔案不可變,再整批載入唯讀查詢伺服器。Voldemort 等 key-value store 就支援這種做法:載入時舊檔繼續服務,複製完成後原子地切換到新檔,出問題還能立刻切回舊檔。

這套「印書哲學」帶來什麼超能力?

把輸入當不可變、輸出整批替換,這正體現了 human fault tolerance(人為容錯) ——能從「人自己寫錯程式」中復原。

程式出 bug 能回滾

輸出不可變、舊輸出還留著:程式碼回滾重跑,或直接切回舊目錄就好——讀寫交易資料庫可沒有這種好事,資料被改爛了,回滾程式碼也救不回來。

失敗任務自動重試很安全

輸入不可變,失敗任務的輸出又會被丟棄,重跑同一份輸入不會留下重複或污染的副作用,暫時性故障自動重試就能容忍。

邏輯與接線分離

錯誤不致造成無法挽回的損害,開發能更大膽快速;一隊專心把某個 job 做好,其他隊決定何時何地跑它,方便重用。

🔤
跟 Unix 的一點小差異

Unix 工具多半假設無型別文字檔,得靠 {print $7} 這種硬抽欄位的方式解析。Hadoop 上則常改用 Avro、Parquet 等結構化格式,提供高效的 schema 編碼,還能隨時間演化 schema——省去這道低價值的解析工。

小試身手

把 join 捷徑跟「批次輸出哲學」都搞懂了,來驗收兩題:

你要把一份龐大的活動事件(大表)與一份只有幾 MB 的使用者資料(小表)join,且小表能輕鬆塞進記憶體。最適合哪種 join?
你的批次 job 算出每位使用者的推薦清單,想讓 web app 能查詢。為什麼「在 reducer 裡逐筆連線寫進正式資料庫」是不好的做法?
🐘
下一站:Hadoop 與 MapReduce 之後

Join 快了之後,我們把鏡頭拉遠,看看 Hadoop 生態系與更新的技術怎麼演化。

04

Hadoop vs 資料庫,與 MapReduce 之後

一邊是「先收貨再解讀」的超大倉庫,一邊是「進廠先建模」的訂製工廠——最後兩邊都在往彼此靠攏

MapReduce 其實不是新發明

當年 MapReduce 論文發表時,其實一點都不新奇——前幾節談的那些平行 join 演算法,早在 MPP 資料庫 裡實作超過十年了。真正的差別在於:MPP 資料庫專注在叢集上平行跑分析型 SQL 查詢;而 Hadoop(MapReduce +分散式檔案系統)提供的更像一套通用作業系統,能跑任意程式。

第一個關鍵差異,是「怎麼收資料」。資料庫要你照特定模型(關聯、文件)把資料結構化好才能進去;HDFS 的檔案卻只是位元組序列,什麼格式都行——文字、影像、感測器讀數、基因序列,通通先收下再說。

🍣
Sushi 原則:生的資料更好

Hadoop 讓你能隨手把原始資料倒進來,之後再想怎麼處理。把「詮釋資料」的負擔從生產者手上,轉移到消費者手上——這就是 schema-on-read,常用於「資料湖」。MPP 資料庫則相反,要求匯入前就得先仔細建好模——這叫 schema-on-write。

第二個差異,是「處理模型」。不是所有運算都能塞進一句 SQL——機器學習、推薦系統、影像分析都得寫程式。MapReduce 讓工程師能輕鬆對大資料跑自己的程式碼,你甚至能在 HDFS + MapReduce 上蓋一個 SQL 引擎(Hive 就是這樣做的)。Hadoop 生態的開放性,讓多種處理模型能共用同一叢集、存取同一份檔案,不必為了不同用途把資料搬來搬去。

直接讀原文:兩種世界觀的分岔點

這一段原文,把 Hadoop 與 MPP 資料庫的性格差異寫得很直白。左邊原文、右邊白話,一句對一句。

原文 · DDIA Ch.10 To put it bluntly, Hadoop opened up the possibility of indiscriminately dumping data into HDFS, and only later figuring out how to process it further. By contrast, MPP databases typically require careful up-front modeling of the data and query patterns before importing the data into the database's proprietary storage format. In practice, it appears that simply making data available quickly — even if it is in a quirky, difficult-to-use, raw format — is often more valuable than trying to decide on the ideal data model up front. If a node crashes while a query is executing, most MPP databases abort the entire query, and either let the user resubmit the query or automatically run it again. On the other hand, MapReduce can tolerate the failure of a map or reduce task without it affecting the job as a whole by retrying work at the granularity of an individual task.
白話翻譯

說白了,Hadoop 打開了一種可能:先不分青紅皂白把資料倒進 HDFS,之後再想辦法處理。

相反地,MPP 資料庫通常要求你在把資料匯入它專屬的儲存格式之前,就先仔細規劃好資料與查詢模式。

實務上看起來,讓資料快速可用——就算格式古怪、難用、很「生」——往往比一開始就想好完美資料模型更有價值。

如果某個節點在查詢執行中崩潰,大多數 MPP 資料庫會直接中止整個查詢,讓使用者重新送出,或系統自動重跑一次。

反觀 MapReduce,能容忍單一 map 或 reduce 任務失敗、而不影響整個 job——只要以「單一任務」為粒度重試就好。

💡
兩種容錯哲學,其實是兩種賭注

MPP 像短跑選手:一場只跑幾秒到幾分鐘,摔了重新起跑成本不高,所以敢把資料留在記憶體裡衝速度。MapReduce 像超級馬拉松:一跑好幾小時,途中很可能被打斷,所以它隨時把進度寫到硬碟——被打斷只需從最近的檢查點補跑,不必整場重來。

收貨倉庫 vs 訂製工廠:一次看懂性格差異

把 Hadoop 和 MPP 資料庫想成兩種收料哲學——一個先收貨再解讀,一個進廠前先畫好設計圖。

📦
Hadoop:超大型收貨倉庫

貨先全部倒進來,連奇形怪狀、半成品都收(schema-on-read)。匯集速度極快,且同一批原始資料能被不同人切出不同料理——異質資料、開放生態是它的強項。

🏭
MPP 資料庫:高級訂製工廠

東西進廠前必須先精準畫好設計圖、分門別類擺進專用櫃(schema-on-write)。查得快、整合度高,但前置作業慢,通常也比較封閉,假設資料同質。

🏃
MPP:短跑選手式容錯

查詢通常只跑幾秒到幾分鐘。一節點崩潰就中止整個查詢重跑,成本不高;偏好把資料留在記憶體(如 hash join)搶速度。

🏔️
MapReduce:馬拉松式容錯

一跑好幾小時,途中常被 搶占(preemption) 拉去讓路。以單一任務為粒度重試,很愛把資料寫到硬碟,被打斷只補跑最近的檢查點。

⚙️
真正的原因不是硬體不可靠

Google 的資料中心把線上服務跟離線批次混著跑,批次以低優先權執行,撿高優先權吃剩的資源——「撿桌底下的碎屑」。一個跑一小時的 MapReduce 任務約有 5% 機率被搶占終止,比硬體故障率高一個數量級以上。這種「隨時可能被終止」的自由,換來的是更好的整體資源利用率,而不是因為機器特別容易壞。

Dataflow 引擎怎麼「不等全部到齊」就開工

MapReduce 把中介結果寫成檔案、完全 物化(materialization) 到 HDFS——這代表下一個 job 得等前一個 job「所有」任務都做完才能開始,一個 拖後腿任務(straggler) 就拖累全局。Spark、Tez、Flink 這些 dataflow 引擎 把整個 workflow 當成一個 job,讓下一站不必等全部到齊就能開工。按「下一步」看資料怎麼在生產線上流動。

🗄️
Job 1(Reducer)
📁
HDFS(物化+複製)
🔁
Job 2(多餘的 Mapper)
⚙️
Operator A
⚙️
Operator B(輸送帶接手)
按「下一步」開始
🏭
分站作業 vs 連續輸送帶

MapReduce 像分站作業:第一站做完「所有」半成品、整批堆進倉庫,第二站才開工,一個手腳慢的工人就拖累全線。Dataflow 引擎(Flink 尤其如此)像連續輸送帶:一站做好一件就馬上送給下一站,半成品放手邊小推車(記憶體/本機硬碟),不必每件都搬進大倉庫還抄三份。排序是例外——它得先收完全部輸入才能吐出第一個結果,所以仍得暫時累積狀態。

少寫硬碟之後,怎麼容錯?

物化到 HDFS 的好處是中介狀態耐久,容錯很單純:任務失敗就重啟、重讀同一輸入。但 dataflow 引擎刻意少寫 HDFS,只好換一種容錯思路——重算。

1
記血緣,不記全部

Spark 用 RDD 的血緣(lineage)追蹤一份資料是由哪些輸入分區、套了哪些 operator 算出來的;Flink 則用 checkpoint 保存 operator 狀態。

2
機器壞了就重算

某台機器壞了、中介狀態遺失,就從仍可用的上一階段或原始輸入重新算出來,而不是去讀早已不存在的 HDFS 副本。

3
決定性是硬道理

同樣輸入必須永遠算出同樣輸出,否則重算的資料會跟已經送給下游的舊資料矛盾,只好連下游一起殺掉重跑——這叫連鎖故障(cascading faults)。

4
重算不是萬靈丹

若中介資料遠小於原始資料、或運算極耗 CPU,先把中介資料物化存起來,反而比故障時整個重算更划算。

🎲
不確定性最愛偷偷溜進來

hash table 的迭代順序、亂數、系統時鐘、外部資料源——這些都可能讓同樣的程式碼在不同時間跑出不同結果。要靠重算容錯,就得用固定種子等方式,把這些不確定性一一移除。

圖運算:為什麼 MapReduce 天生不擅長?

像估算網頁人氣的 PageRank 這類圖演算法,要「沿邊一步步傳播資訊、反覆迭代直到收斂」。但純 MapReduce 只做單趟掃描,硬套上去很沒效率——每一輪都得讀整份輸入、產生全新輸出,就算只有一小塊圖變了也照樣全部重來。

針對這個痛點,圖批次處理流行一種 Pregel(BSP)模型:一個頂點可以沿邊「送訊息」給另一個頂點,每輪迭代對每個頂點呼叫一次函式,傳入它收到的所有訊息——很像一次 reducer 呼叫。但關鍵差異是:頂點在迭代之間把狀態留在記憶體裡,函式只需要處理新進來的訊息,沒有訊息往來的區域就完全不必做事。

原文 · DDIA Ch.10 In each iteration, a function is called for each vertex, passing it all the messages that were sent to it — much like a call to the reducer. The difference from MapReduce is that in the Pregel model, a vertex remembers its state in memory from one iteration to the next, so the function only needs to process new incoming messages. If no messages are being sent in some part of the graph, no work needs to be done. Vertex state and messages between vertices are fault-tolerant and durable, and communication proceeds in fixed rounds.
白話翻譯

每一輪迭代,都會對每個頂點呼叫一次函式,傳入所有送給它的訊息——很像一次 reducer 呼叫。

跟 MapReduce 不同的地方在於:Pregel 模型裡的頂點會把狀態記在記憶體中、跨迭代保留,所以函式只需要處理新收到的訊息。

如果圖的某個區域完全沒有訊息在傳遞,那個地方就什麼事都不用做。

頂點的狀態、以及頂點之間傳的訊息,都是容錯且耐久的,而且通訊是以固定回合進行的。

📯
村莊間的傳話遊戲

把頂點想成一座座村莊:每回合聽完鄰村捎來的所有口信,更新自己知道的事,再沿路把新消息送給鄰村。村莊不會每回合都失憶——只處理「新」收到的口信,沒人傳話的區域就安靜休息。大家講好「這一輪送出的話,下一輪一定都會收到」(固定回合制);每隔幾回合,全村把所知抄一份存進保險箱(checkpoint),萬一某村失火失憶,就回到最近一次存檔重來。

值得一提:別把「dataflow 引擎把 job 排成 DAG」和「圖運算」搞混——前者是資料流長得像圖(流動的通常是關聯式 tuple),後者是資料本身就是圖。是兩件不同的事。

殊途同歸:批次框架越來越像資料庫

直接寫 MapReduce 很費工,連 join 都要自己刻演算法。Hive、Pig、Spark/Flink 的 DataFrame API 提供 宣告式 API(declarative API) ——你只要用 join、group、filter 這些關聯式積木說出「要什麼」,框架的最佳化器就會分析輸入、自動挑最合適的 join 演算法,甚至調整 join 順序讓中介狀態最小,你不必自己背完所有演算法。

更進一步,簡單的 filter 或選欄位若也宣告式表達,最佳化器就能讓欄式儲存只讀需要的欄,並用 向量化執行(vectorized execution) 在緊湊內迴圈裡成批處理,省去逐筆呼叫函式的開銷。但批次框架仍保留寫程式的自由——能呼叫任意程式碼、運用 parsing、NLP、影像等龐大函式庫生態,這是 MPP 資料庫的 UDF 難以企及的。

🔀
兩邊正在往彼此靠攏

批次框架越來越宣告式、效能逼近 MPP;MPP 資料庫也越來越可程式化、越來越靈活。到頭來,它們其實都只是「儲存與處理資料的系統」,只是歷史上從不同的起點出發。

書中指出 MapReduce 容忍「頻繁任務終止」的真正原因是什麼?
Pregel 模型與「用 MapReduce 硬做圖迭代」相比,最關鍵的效率改進是什麼?
🌊
下一站:串流處理

批次處理再快,終究要等資料整批到齊才能跑;如果我們不想等呢?下一章進入即時世界。