傳輸事件流:把訊息送到該去的地方
從「每天跑報攤問有沒有新報紙」到「訂一份會自己送到家的日誌」——事件、生產者、消費者,以及訊息該怎麼流動
上一章的資料「讀得完」,這一章的資料「讀不完」
上一章的批次處理有個大前提:輸入是 有界(bounded) 的——大小已知、有限,程式才知道「讀完了」。
但真實世界的資料大多是無界(unbounded)的:你的使用者昨天產生資料,今天也在產生,明天還會繼續。只要公司沒倒,這個過程永遠不會結束,資料集永遠不算「完整」。
為了用批次處理硬扛無界資料,我們只好人為地把它切成固定時間塊:每天結算一次、每小時結算一次。問題是——結果會慢一整天才反映出來,對沒耐心的使用者太慢了。
不要每天處理一天份,改成每秒處理一秒份,延遲先降下來。
乾脆不要固定時間塊了,每個 事件 一發生就立刻處理——這就是串流處理(stream processing)的精神。
Unix 的 stdin/stdout、TCP 連線、串流影音,本質上都是「隨時間逐步出現的資料」。
批次處理像在河上築一座壩,每天放一桶水下來量一次;串流處理則是站在河邊,水流過來就立刻舀起來處理,不必等水蓄滿再算。
直接讀原文,旁邊就是白話
這一段是本章最重要的破題:批次處理為什麼撐不住無界資料,串流處理又是怎麼冒出來的。左邊原文、右邊白話,一句對一句。
事實上,大部分資料是「無界」的,因為它會隨時間陸續到來:使用者昨天產生資料、今天也在產生,明天還會繼續。
除非公司倒閉,這個過程永遠不會結束,資料集永遠不算真正「完整」。
每天跑一次批次處理的問題是:輸入的變化要等到隔天才會反映在輸出上——這對沒耐心的使用者來說太慢了。
廣義來說,「串流」指的就是隨時間逐步出現的資料。
用串流的術語講:一個事件由一個生產者產生一次,之後可能被多個消費者各自處理。
批次世界裡「一個檔案寫一次、可以被多個任務各自讀」;串流世界裡「一個事件產生一次、可以被多個消費者各自處理」。骨架一模一樣,只是把「一天一份」換成「隨時發生隨時處理」。
要嘛你一直去問,要嘛讓它自己通知你
有了事件之後,下一個問題是:消費者怎麼知道「有新事件」?書裡用了一個你我都熟的畫面。
自己一直跑去報攤問「有新報紙嗎?」「沒有。」五分鐘後再去問,還是沒有。問得越勤快,空手而回的比例就越高,白跑的成本越大。
訂報,有新報紙就自動送到家。你什麼都不用做,新事件一出爐就送上門——這才是串流系統真正想要的模式。
傳統資料庫天生不擅長「主動通知」。它有 觸發器(trigger) ,但功能很受限,一直被當成事後補丁。所以業界才專門發明了送通知的工具——訊息系統。
兩種訊息系統,一種讀完就刪、一種讀完還在
這是本章最關鍵的對比。按「下一步」,看同一份訊息,在傳統訊息代理和以日誌為基礎的代理裡,命運完全不同。
傳統代理像「便利貼任務板」:你撕一張下來做,做完就丟掉,沒人能回去看被撕掉的紙條。以日誌為基礎的代理則像「一直在錄的監視器」:所有畫面都按時間順序錄在磁帶上,讀取不會抹掉它,你可以隨時倒帶重看。
offset:日誌代理怎麼記得「讀到哪了」
日誌可以分割(partition),突破單一磁碟的吞吐上限。每個分割內,代理會給每則訊息一個單調遞增的序號——這就是 offset(偏移量) 。點點看下面每個節點,搞懂它們怎麼分工。
代理不用為每一則訊息追蹤「確認」,只要週期性記下每個消費者的 offset 就好,大幅減少記帳負擔,還能批次處理、管線化來提升吞吐量。這也是為什麼重播(replay)在日誌代理上這麼自然——把 offset 倒回昨天,用修好的程式重新跑一遍就好。
多個消費者搶同一個 topic,會發生什麼事
當多個消費者一起看著同一個 topic,傳統代理提供兩種玩法;而確認(acknowledgment)機制則負責防止訊息在半路搞丟。
每道菜只給一個服務生端走,大家分工把菜端完。訊息很難處理(很重)時,加人手就能加速——但同一分割的訊息永遠只會送到同一個節點,所以能分工的節點數,最多就是分割數。
每道菜都複製給所有服務生,像同一場廣播大家一起收聽,彼此互不影響。日誌代理天生就很適合扇出——因為讀取不會刪除日誌,多個消費者可以各自獨立讀完整份。
「負載平衡 + 重送」一起發生時,順序可能亂掉:生產者依序送出 m3、m4、m5,消費者2 處理 m3 時當機,未確認的 m3 被重送給正在處理 m4 的消費者1,結果變成 m4、m3、m5 —— 送達順序跟生產順序不一致了。如果訊息之間有因果關係(例如「先建立帳號」再「更新帳號」),就該避開負載平衡,改用每個消費者獨立佇列。
小試身手
把「無界資料 → 串流 → 訊息系統 → 日誌代理」這條線串起來,你就抓住本章開場的骨架了。來兩題:
訊息系統送的是事件,那資料庫裡的「寫入」,能不能也看成是一種事件?
資料庫與串流:把寫入也看成事件
同一份資料活在資料庫、快取、搜尋索引裡——誰說了算?一場「雙寫」翻車現場,換來一條「日誌」救全場
沒有一個系統,能把所有事情都做好
整本書反覆出現一句話:沒有任何單一系統能滿足所有的儲存、查詢與處理需求。所以實務上的應用程式,幾乎都要組合好幾種技術:
服務使用者的即時請求,系統的「正本」。
加速熱門請求,不用每次都打正本。
處理搜尋查詢,靠關鍵字極速命中。
做大規模分析,累積歷史一次算個夠。
每一個都有自己的一份資料副本,用各自最佳化的格式存著。問題來了:同一份(或相關的)資料出現在好幾個地方,它們就必須彼此保持同步——資料庫裡的某項一更新,快取、搜尋索引、資料倉儲也都得跟著更新。
傳統上資料倉儲靠 ETL 批次同步:整份複製、轉換、批量載入,像每晚打烊後盤一次全店庫存。但如果定期整份匯出太慢,跟不上即時的需求呢?
一個誘人但危險的捷徑:雙寫
書裡直接點名了這個常見的替代方案,也直接點名它的問題。原文對照白話,一句一句看:
一種常見的替代方案是雙寫——資料一變動,應用程式自己動手把它分別寫進每一個系統。
但雙寫藏著嚴重問題,其中一個就是競態條件。
因為時機不巧交錯了:資料庫先收到客戶端 1 把值設成 A 的寫入,再收到客戶端 2 設成 B 的寫入,所以資料庫最終的值是 B。
搜尋索引卻先收到客戶端 2 的寫入,再收到客戶端 1 的,所以搜尋索引最終的值是 A。
兩個系統從此永久不一致——而且,過程中完全沒有任何錯誤跳出來。
除非你有額外的並行偵測機制,否則你根本不會發現曾經有並行寫入——一個值就這樣靜悄悄地蓋掉另一個值。雙寫還有第二個陷阱:一個寫成功、另一個寫失敗,這是容錯問題,要保證「兩個都成功或都失敗」就是昂貴的原子提交問題。
動畫演給你看:雙寫怎麼翻車,CDC 怎麼救
按「下一步」,先看應用程式怎麼親手把同一筆資料寫壞,再看資料庫變更擷取(CDC)怎麼讓大家排隊照順序來。
雙寫翻車的根因是沒有單一領導者:資料庫有自己的節奏,搜尋索引也有自己的節奏,誰也不跟誰。CDC 的做法是讓資料庫當唯一的leader,其他系統全部當它的 follower——就像一個樂團只能有一位指揮,大家才能踩對拍子。
CDC 上線前後:新分店怎麼補歷史、老日誌怎麼瘦身
光讓新系統「從今天開始訂閱」還不夠——它沒看過以前的異動。而且日誌若無限累積,磁碟遲早爆倉。書裡給了兩個對應的解法:
新開的分店,先給它一份「某個時間點的完整庫存盤點」,這份快照對應到日誌上一個已知的 offset。分店從那個 offset 之後接續套用異動單,就能無縫銜接——不用再回頭問總店一次。
儲存引擎定期掃過日誌,同一個 key 只留最新那一筆,其餘丟掉;刪除則用墓碑(tombstone)標記。這樣磁碟用量只跟「資料庫現在的內容」有關,跟「歷來寫過幾次」無關。
壓縮後日誌所需的磁碟空間,只取決於資料庫「目前的內容」,而不是資料庫「歷來寫過幾次」。
只要 CDC 設定成每個變更都有主鍵、每次更新都會取代該 key 的舊值,那麼只保留每個 key 最新的那筆寫入就夠了。
啟動一個新消費者,從壓縮後 topic 的 offset 0 開始循序掃過所有訊息——因為日誌保證含有每個 key 的最新值,你就能拿到資料庫內容的完整副本,不必再回頭對來源資料庫做一次快照。這正是 Apache Kafka 支援的日誌壓縮功能,讓訊息代理能當「耐久儲存」,不只是短暫傳訊。
再往上一層:事件溯源,把「發生過的事」當唯一真相
事件溯源(Event Sourcing)跟 CDC 一樣把變更存成日誌,但抽象層級不同:CDC 是從資料庫底層解析出「狀態變了什麼」,事件溯源則是應用層明確寫下「使用者做了什麼」。
每筆交易只能追加到分類帳上,記錯了不塗改,而是再記一筆沖銷交易。原始那筆錯帳永遠留著,因為審計時可能用得到。當前的損益、資產負債,都是把分類帳「加總」出來的——這就是「狀態是事件流的積分」。
一個生活例子:顧客把商品加入購物車、後來又移除。若資料庫是可變的,移除時就直接刪掉那一列——這件事永遠消失了。但事件溯源會保留兩個不可變事件:加入與移除。出貨角度它們互相抵銷,但分析角度你多知道一件事——顧客曾經考慮過這個商品,但後來放棄了。
事件溯源還很嚴格地區分命令與事件:
使用者請求剛到,此時還可能失敗——例如訂位要先檢查座位有沒有被搶。
應用程式必須在「變成事件之前」同步完成所有驗證。
木已成舟:耐久、不可變。鐵則——消費者不能拒絕已經生成的事件。
高更新/刪除比例的小資料集,不可變歷史可能膨脹到難以負荷;隱私法規要求「使用者關帳後要真的刪個資」,光追加一筆「請視為已刪除」不夠,你得真的改寫歷史——但副本散落在儲存引擎、檔案系統、SSD、備份各處,真刪遠比想像中困難。
小試身手
從雙寫的陷阱到 CDC、事件溯源的解法,來檢查一下有沒有抓到重點:
訊息與資料庫的變更都變成了事件流,接下來要問:怎麼「處理」這些流——尤其是「時間」這個麻煩的東西?
處理串流:即時運算、時間與容錯
事件源源不絕地流過來,你要在「還沒收齊」的當下就給出答案——這一站,我們把「拿到事件後能做什麼」「該相信誰的時鐘」「壞掉了怎麼辦」一次講清楚
拿到事件之後,能去哪三個地方?
上一站我們搞定了「事件怎麼傳過來」。這一站要問更關鍵的問題:拿到事件後,能拿它做什麼?廣義上只有三條出路:
把事件寫進資料庫、快取、搜尋索引,供其他客戶端查詢——這是讓資料庫與系統其他部分保持同步的好方法,對應批次處理的「輸出」。
寄 email 警示、推播通知,或串到即時儀表板——這時候最終的「消費者」是人類,不是另一個程式。
把一或多個輸入串流處理成一或多個輸出串流,可以串成好幾個處理階段的管線,最後才落到出路 1 或 2。本節就聚焦在這一種。
處理串流的一段程式,叫做 operator(運算子)或 job(任務) ——它以唯讀方式消費輸入串流,以追加方式寫輸出。
這一點影響深遠——排序對無界資料沒意義,所以不能用 sort-merge join;容錯也不一樣:批次任務失敗可以從頭重跑,但一個已經跑了好幾年的串流任務,崩潰後從頭重跑往往不可行。這條線會貫穿整個模組。
串流處理最早的老本行是監控告警:詐騙偵測、交易系統、製造監控、軍事情報。要抓的是「特定的事件模式」,靠的正是這一節最有畫面感的概念——複雜事件處理(CEP)。我們直接讀原文段落,感受一下這個概念怎麼被介紹出來的。
複雜事件處理(CEP)是一種發展於 1990 年代、用來分析事件流的取向,特別適合那種需要搜尋「特定事件模式」的應用。
CEP 系統通常用類似 SQL 的高階宣告式查詢語言,或圖形化介面,來描述該被偵測出來的事件模式。
在這類系統裡,查詢和資料的關係跟一般資料庫剛好相反。一般資料庫是把資料長久存起來,查詢反而是「用完即忘」的。
CEP 引擎把這兩個角色對調:查詢被長期存著,輸入串流裡的事件不斷流過這些查詢,找有沒有哪個查詢命中了它要找的模式。
一般資料庫像圖書館:資料(書)長存,你(查詢)來了才幫你找書,找完就送客。CEP 像把通緝令(查詢)貼在牆上長期掛著,讓人流(事件)一個個走過它,一比中就響鈴。搜尋串流(像 Elasticsearch percolator)也是同樣的道理:查詢長存、文件流過查詢。
四種用途,挑對工具
CEP 之外,「產生衍生串流」還有另外三種常見用途。它們彼此界線有點模糊,但問自己「我到底想要什麼」,通常就能挑對:
找「特定的多事件序列模式」,像對事件流寫一條正規表示式。命中就吐出一個複雜事件。
重統計而非找特定序列:量測發生率、算滾動平均、跟上週同時段比較抓異常。通常算在一個時間窗(如最近 5 分鐘)上。
讓快取、搜尋索引、資料倉儲持續反映來源資料庫。不同於分析只看短視窗,這通常需要「回溯到時間原點」的所有歷史事件。
把搜尋引擎反過來用:查詢事先寫好長存,新聞或新物件不斷流過查詢,命中就通知——媒體監測、房地產到價通知都是這招。
串流分析有時用 Bloom filter、HyperLogLog 這類機率演算法換取大幅省記憶體,但這只是一種「最佳化」——串流處理本身並非天生不精確,要精確也辦得到。
訊息傳遞與 RPC(如 actor 模型)雖然也基於訊息,但通常不算串流處理:actor 著重並行與分散執行、通訊常是短暫且一對一;串流處理著重資料管理,日誌耐久且支援多訂閱者,管線通常無環。
誰的時鐘才算數?看事件怎麼「亂序」抵達
接下來是這一章最容易踩坑的地方:處理串流常要算「最近 N 分鐘」的統計,但這個「最近」該用哪個時鐘量?請跟著下面的動畫,看三筆事件怎麼因為網路延遲,用不同順序抵達處理器。
書中的比喻:星際大戰上映順序是 IV(1977)、V(1980)、VI(1983),然後才是 I、II、III(1999、2002、2005),再到 VII(2015)。集數編號=事件時間(劇情實際順序),你看的日期=處理時間。人類能應付這種跳躍,但串流演算法必須被特地寫成能容忍這種時序與亂序問題。
兩種時間、四種視窗、三個時間戳
先把兩種時間分清楚: event time(事件時間) 是事件實際發生的那一刻; processing time(處理時間) 是處理機器的本地時鐘、事件被處理的當下。批次處理一定看事件時間——一個批次幾分鐘掃完一年的歷史,你關心的是那一年,不是這幾分鐘;用事件時間還能讓處理確定性(deterministic):同樣輸入重跑得同樣結果。
一個量測「每秒請求數」的串流處理器被重新部署,停機一分鐘後重啟,把累積的 backlog 一口氣處理掉。若用處理時間算速率,圖表會出現一個根本不存在的暴增尖峰,但真實請求率其實一直平穩。這就是把事件時間和處理時間搞混會導致壞資料的經典例子。
就算選定用事件時間分組,還要選對「視窗」的形狀:
固定長度,每事件恰屬一個視窗,不重疊——一格接一格,互不重疊。
固定長度但允許重疊,提供平滑——視窗會互相搭一點邊。
包含彼此在某間隔內的所有事件——看「兩兩相距多近」。
無固定長度,同使用者活躍時聚在一起,沉默夠久就結束——看使用者「一陣忙完就收工」。
手機 app 離線時把事件存在本機,連上網才送出(可能晚數小時甚至數天)。到伺服器眼中,這些事件會像極了 落隊事件 straggler。 裝置時鐘常不可信,伺服器時鐘較準卻不代表互動時刻。解法:記三個時間戳——① 事件發生時依裝置時鐘、② 事件送往伺服器時依裝置時鐘、③ 事件被伺服器收到時依伺服器時鐘。用 ③−② 估出裝置與伺服器的時鐘偏移,再校正事件時間戳,逼近事件真正發生的時刻。
動手配配看:三種 join,各配哪個情境?
串流也需要 join,但因為新事件隨時會冒出來,比批次棘手得多。書中分三種:stream-stream(兩邊都是活動事件流,靠一個時間窗配對,例如把搜尋事件和點擊事件湊成點閱率);stream-table(活動事件流去查一份由 CDC 持續更新的本地資料庫副本,幫事件「補資料」,叫 豐富化 enrichment); table-table(兩邊都是資料庫變更日誌,互相 join 出一份物化視圖,例如 Twitter 時間軸快取)。把三種情境拖到正確的 join 類型上:
你賣東西要套對稅率,而稅率會隨時間調整。把銷售和稅率表 join 時,你通常想用銷售當下的稅率,而不是現在的稅率——尤其在重新處理歷史資料時,現在的稅率根本是錯的。資料倉儲的解法叫緩慢變化維度(Slowly Changing Dimension, SCD):給被 join 記錄的每個版本一個唯一 ID,讓 join 變確定性,代價是不能再做日誌壓縮。
壞掉了怎麼辦:從「重跑」到「冪等」
批次容錯很簡單:任務失敗就在別台重跑、丟棄失敗輸出,結果像沒出過錯——這叫 exactly-once 語意(其實叫 effectively-once 更貼切)。但串流無限,你永遠不能等任務完成才公開輸出。書中給了四招:
把串流切成約一秒的小塊當迷你批次處理(Spark Streaming),或週期性把狀態快照寫到耐久儲存(Flink),崩潰後從最近檢查點重啟。但這只在框架內部有效——一旦輸出離開處理器(寫資料庫、寄 email),重啟會讓外部副作用發生兩次。
讓一個事件的所有輸出與副作用——下游訊息、資料庫寫入、狀態變更、推進消費者 offset——要嘛全發生、要嘛全不發生。在受限環境中可高效實作(Google Cloud Dataflow、VoltDB)。
冪等操作 做幾次效果都跟做一次相同,像設定固定值;計數器 += 1 就不是冪等。即使天生非冪等,也能附上觸發這次寫入的 Kafka offset,下次判斷「這個更新是否已套用過」而跳過重複。
視窗聚合、join 用的表索引等狀態,故障後必須能復原:可以留遠端並複製、留本地並週期性複製,有時甚至能直接從輸入串流重播重建。
按電梯按鈕一次和連按十次,效果都是叫同一台電梯來——這就是冪等。而 exactly-once 更像自動門感應卡刷一次或不小心刷三次,門就是開一次,你只進去一次:實際可能重試多次,但可見效果如同只處理一次。冪等的前提是重跑要重播相同訊息相同順序、處理要確定性,故障切換時可能還需要 fencing(隔離) 防止假死節點搗亂。
小試身手
這一站資訊量不小,來兩題檢查一下有沒有抓對關鍵:
串流讓我們幾乎即時反應這個世界,最後一章要把批次與串流、正確性與責任,一起收束成對「資料系統」整體的反思。