📝ノート💻テック

Apache Spark — 究極のガイド(メモ)

Tony Duong

Tony Duong

5月 24, 20263

他の言語:🇫🇷🇬🇧
#spark#big-data#data-engineering#video
Apache Spark — 究極のガイド(メモ)

Ansh LambaのApache Sparkマスタークラス(6時間)のメモです。最初から最後まで視聴しました — 前半(ブロードキャストジョインまで)と後半(メモリ管理 → AQE)の両方を以下にまとめています。

なぜ分散コンピューティングなのか

コースは、垂直スケーリング(1台のマシンをアップグレード)と水平スケーリング(マシンを増やす)の対比から始まります。

  • 垂直スケーリングはすぐに天井に達します:ハードウェアの限界、単一障害点、可用性の低さ。
  • 水平スケーリング:マシンを増やせば、計算能力と冗長性が増えます。Sparkはこれを前提に作られています。

メンタルモデル:driver はチームリーダー、executor はワーカーです。何台、どれくらいのサイズで欲しいかを宣言すると、クラスタマネージャ(YARN、standalone、またはDatabricksではプラットフォーム自体)がそれらをプロビジョニングします。

Sparkのアーキテクチャ

  • Driver はアプリケーションコードを保持し、論理プランを構築し、作業をスケジュールします。
  • Executor はデータのパーティションに対してタスクを実行します。
  • driverはクラスタマネージャと通信してリソースを要求し、executorにタスクを送ります。
  • SparkはScala/Javaで書かれています。PySpark はPy4J経由でJava APIの上に重ねられたPythonラッパーで、主にデータコミュニティがPythonに住んでいるからです。

DataFrame vs RDD

  • RDD はオリジナルの抽象化です — 論理パーティションを持つ分散リストで、イミュータブルかつ遅延評価されます。「特徴」はデータが複数のマシンに分割されていることです。
  • DataFrame はより高レベルの構造化APIです。内部的には最終的にRDD操作にコンパイルされますが、その間にCatalystオプティマイザが挟まります。
  • コースではDatabricksでDataFrameを作成し、.show().collect() のような アクション がトリガーされるまでジョブが実行されないことを示しています。

遅延評価、トランスフォーメーション、アクション

  • トランスフォーメーション(filter、select、groupBy、join)はDAGを構築しますが、実行はしません。
  • アクション(show、collect、count、write)が実行をトリガーします。
  • これによりSparkはパイプライン全体を見渡してから実行する前に最適化できます — 述語プッシュダウン、カラムプルーニングなど。

Narrow vs Wide トランスフォーメーション

  • Narrow:各出力パーティションは単一の入力パーティションに依存します(filtermapselect)。ネットワーク越しのデータ移動はありません。
  • Wide:出力パーティションは複数の入力パーティションに依存します(groupByjoindistinct)。シャッフル が必要です — データがネットワーク経由でexecutor間を移動します。Wideトランスフォーメーションは、注意しないとパフォーマンスが死ぬ場所です。

ジョブ、ステージ、タスク

  • 1つの アクション = 1つの ジョブ
  • ジョブは、シャッフルの境界(wideトランスフォーメーション)ごとに ステージ に分割されます。
  • 各ステージは タスク の集合で、パーティションごとに1つです。
  • Spark UIのDAGビューはまさにこの階層を示しています。これをうまく読めるかどうかが、時間がどこに使われているかを推測するか把握するかの違いになります。

ジョイン

  • シャッフルジョイン(sort-merge):両側がジョインキーでシャッフルされ、マッチする行が同じexecutorに集まります。大規模 + 大規模のデフォルトです。
  • ブロードキャストジョイン:小さい側が丸ごと全executorに送られ、シャッフルなしです。はるかに高速ですが、ブロードキャスト側がdriver/executorメモリに収まる場合にのみ安全です(デフォルトのしきい値は約10MB、調整可能)。
  • Sparkオプティマイザは統計情報に基づいて自動的にブロードキャストする場合があります。PySparkでは broadcast(df) で明示的に強制することもできます。
  • スピーカーが繰り返し強調する経験則:シャッフルを受け入れる前に、ジョインがブロードキャストジョインになり得ないか必ず確認すること

driverメモリと .collect()

  • .collect() は全パーティションをdriverに引き戻します。合計サイズがdriverのJVMヒープを超えると → driver OOM、アプリケーションが死にます。
  • 修正は「driverを大きくする」ことではめったにありません。修正は「100GBのDataFrameに対して .collect() を呼ばない」ことです。代わりに .show(N).take(N)、またはストレージへの書き込みを使います。

executorメモリのレイアウト

executorのJVMヒープ(spark.executor.memory)に約10%のオーバーヘッドを加えたものが、メモリを要求した際に実際に得られるものです。コースではoff-heapメモリとPySparkメモリにも触れていますが、どちらもデフォルトはゼロで、ほとんど使われません。ヒープ自体は3つに分割されます:

  • Reserved memory — Sparkエンジン用に固定300MB。交渉の余地なし。
  • Spark memory pool — デフォルトで (heap − 300 MB) の60%(spark.memory.fraction)。ここで実際の作業が行われます。
  • User memory — 残りの40%。UDFの状態やユーザー定義のデータ構造を保持します。

統合メモリプール:storage vs execution

Spark memory poolはデフォルトで50/50に分割されます(spark.memory.storageFraction):

  • Storage memory — 長期的、キャッシュされたDataFrameを保持します。
  • Execution memory — 短期的、ジョイン、集約、トランスフォーメーションを実行します。

この分割は 固定ではなく柔軟 です:

  • Execution > storage. executionがより多くのスペースを必要とし、storageに空きがあれば、自由に借ります。storageがキャッシュブロックでいっぱいなら、executionはLRUを使ってそれらを 追い出す ことができます。executionが優先されます。
  • Storage > execution. storageは空いているexecutionスペースを借りられますが、executionを 追い出すことはできません。後でexecutionがそのスペースを必要とすれば、storageはLRUで押し出されます。一方通行の権限です。

storage自体がオーバーフローした場合は、自分自身の最も使われていないキャッシュブロックを追い出します — executionのものは決して追い出しません。

executor OOMとデータスピル

各wideトランスフォーメーション(例:groupBy)は、同じキーの行を1つのパーティションに送ります。各タスクは一度に1つのパーティションをexecutionメモリ内で処理します。

  • パーティションが収まらない場合、Sparkは中間結果をディスクに スピル してスペースを解放します — パーティションは丸ごとスピルされます。「パーティションの半分」を動かすことはできません。
  • スピル自体は問題ありません。実際のOOMは 1つのパーティション自体がexecutionメモリより大きい ときに発生します — これはskew(偏り)の問題です。Sparkは最終的にそのパーティションを処理するためにメモリ上に実体化する必要があるため、スピルだけでは抜け出せません。
  • spark.executor.memory を増やすと一度はうまくいきますが、skewが大きくなれば再び壊れます。正しい修正は skewを潰すこと であって、RAMを買い増すことではありません。

ソルティング(Salting)

skewの標準的な対処法です。1つのキー(例えば food)が80%の行を占有している場合:

  1. 小さい範囲(例:0..3 → 4バケット)のランダム値を持つ salt カラムを追加します。
  2. key 単独ではなく (key, salt) でグループ化します。
  3. 偏っていたキーは、それぞれメモリに収まるN個の独立したパーティションに分割されます。

ホットキーをどれだけ分割する必要があるかに基づいてsaltのカーディナリティを選びます。ソルティングはきれいなgroup-byを追加の集約ステップと引き換えにしますが、OOMを動作するジョブに変えます。

キャッシング:cache() vs persist()

キャッシングなしでは、すべてのアクションが完全なDAGを再計算します。df1df2df3df4 を構築するために再利用される場合、Sparkは毎回 df1 をソースから再構築します — executionメモリは短期的で、アクション間で再利用されます。

  • df.cache() はDataFrameを storage memory に置き、その後の使用がインメモリのテーブルスキャンになります(.explain() で再計算されたプロジェクションの代わりに InMemoryTableScan として表示されます)。
  • DataFrame APIにおいて cache()persist(StorageLevel.MEMORY_AND_DISK) と同じです。RDD APIでは cache()MEMORY_ONLY になります — よく混乱するポイントです。
  • 他のストレージレベル:MEMORY_ONLY(スピルは再計算、ディスクには書かない)、DISK_ONLY(遅いが安全)、MEMORY_ONLY_2(耐障害性のためにレプリケート)、OFF_HEAP(実験的、明示的な有効化が必要)。
  • (a) 十分に小さく収まる、かつ (b) 複数回再利用される、DataFrameだけをキャッシュしてください。uncache() は存在しません — df.unpersist() を呼びます。

edge node、client mode vs cluster mode

クラスタマネージャと直接話すことはありません。ゲートウェイマシン — edge node — がクラスタの認証情報を持ち、開発者はそこにSSHで入ってジョブをsubmitします。

driverがどこで動くかによって、2つのデプロイモードがあります:

  • Client mode:driverはsubmitしたマシン(あなたのラップトップ / edge node)で動きます。ログはローカルにストリーミングされ、開発には便利です — ただしラップトップを閉じるとdriverが死に、ジョブ全体が死にます。driverとexecutor間のネットワーク越しの通信がレイテンシを増やします。開発専用です。
  • Cluster mode:driverはexecutorと一緒にクラスタノードの1つで動きます。submitterの切断に強く、driver↔executor間のトラフィックが低レイテンシです。プロダクション標準です。

パーティションプルーニング

df.write.partitionBy("department").parquet(path) を実行すると、Sparkは個別の値ごとに1つのフォルダを書き出します:department=HR/department=IT/、…

読み込み時、クエリにパーティションカラムに対するフィルタがあれば(.filter(col("department") == "HR"))、Sparkはマッチするフォルダだけをスキャンします。Spark UIの number of files readnumber of partitions read がそれを証明します:パーティショニング + パーティションカラムへのフィルタがあれば、テーブル全体ではなく数KBだけをスキャンします。TBスケールではこれが最も重要なツマミです。

動的パーティションプルーニング(DPP)

巧妙なケース:パーティション化されたファクトテーブル を、パーティションキーと共有するカラムにフィルタを持つ 小さなディメンションテーブル にジョインしているが、ファクトテーブル自体にはフィルタがない、という状況です。

素朴にはSparkはファクトテーブルのすべてのパーティションをスキャンしてしまいます。DPPはこれを書き換えます:

  1. ディメンションテーブルをスキャンしてフィルタする(小さいので安い)。
  2. 結果のキーセットをファクトテーブルのスキャンに ブロードキャスト する。
  3. ブロードキャストされたキーが動的フィルタとして働き、ファクトテーブルのスキャンはマッチするパーティションのみを読み込む。

2つの条件:ディメンション側のフィルタがファクトテーブルの パーティションカラム でもあるカラム上にあること、そしてそのカラムが ジョインキー の一部であること。

Adaptive Query Execution(AQE)

Spark 3.0以降、デフォルトで有効です(spark.sql.adaptive.enabled = true)。物理プランはもはや最終ではありません — Sparkは各ステージ後に実行時統計を収集し、残りのプランを書き換えます。3つの大きな勝利:

  • 動的なシャッフルパーティションのコアレッシング。 wideトランスフォーメーションはデフォルトで200のシャッフルパーティションになります。AQEオフで6行のデータがあると、199個の空のパーティションと1個の実際のパーティションができます — 無意味なGCプレッシャーとタスクスケジューリングのオーバーヘッドです。AQEはデータが実際に必要とする数(多くの場合1)までコアレッシングします。
  • 動的なジョイン戦略の切り替え。 sort-mergeジョインとしてコンパイルされたプランが、上流のトランスフォーメーションで片側がブロードキャストしきい値以下に縮んだ場合、実行時にブロードキャストジョインに変わることがあります。ヒント不要です。
  • 動的なskew処理。 実行時統計があるパーティションが兄弟よりはるかに大きいことを示せば(Sparkは中央値の約5倍という乗数ルールを使います)、AQEは自動的にサブパーティションに分割します。ソルティングを置き換えるものではありません(ソルティングの方が制御性が高い)が、中程度のskewに対する無料の安全網になります。

重要なポイント

  • 遅延評価がすべての肝。 DAGを構築し、作業が始まる前にCatalystに最適化させましょう。
  • シャッフルがコスト。 すべてのwideトランスフォーメーションはネットワーク操作です。できるところはブロードキャストしましょう。
  • コードではなくDAGを読め。 Spark UIは 実際に 何が走ったかを教えてくれます — チューニングすべきはそこです。
  • 無闇に .collect() するな。 プロダクションにおけるdriver OOMの最も一般的な原因です。
  • メモリチューニングはモデルを理解した後の話。 どんなに spark.driver.memory=64g を盛っても、誤ったDataFrameへの .collect() は救えません。
  • executionメモリはstorageメモリとの戦いに勝つ。 積極的にキャッシュしつつ、追い出されうることを知っておきましょう。
  • スケールアップの前にソルティング。 単一のパーティションがOOMするとき、答えはほぼ常にソルティング(または上流のクリーンアップ)であって、より大きなexecutorではありません。
  • フィルタするカラムでパーティション化して書け。 パーティションプルーニング + DPPはTBスケールのスキャンをKBスケールのスキャンに変えます。
  • AQEを信頼せよ、しかし何をしているかを理解せよ。 自動ですが、なぜ突然sort-mergeの代わりにブロードキャストジョインが見えているのか 理由 を知っているかどうかが、シニアエンジニアと乗客の違いです。

スピーカー

Ansh Lamba — リラックスした会話調の話し方で、「make sense?(わかるかな?)」というチェックインや「buddy」という呼びかけが多いです。長いコースを実際に最後まで見られるものにしてくれています。


🌐 Claudeによる翻訳

Tony Duong

著者: Tony Duong

デジタル日記。思考、経験、そして人生についての考え。