Job insight は、Microsoft Fabric で完成した Spark アプリケーションを対話形式で分析できるように設計された Java ベースの診断ライブラリです。 ジョブの分析情報を使用すると、Scala を使用して Fabric Spark ノートブック内のクエリ、ジョブ、ステージ、タスク、Executor などの構造化された実行データを取得することで、Spark ジョブに関するより深い洞察を得ることができます。
パフォーマンスの問題をトラブルシューティングする場合でも、カスタム診断を実行する場合でも、Job insight ライブラリを使用すると、Spark テレメトリをネイティブの Spark データセットとして操作できるため、パフォーマンスの問題のトラブルシューティングや実行の分析情報の調査が容易になります。
注
PySpark を使用した Job insight ライブラリへのアクセスはまだサポートされていません。
[前提条件]
Scala のみがサポートされています。
ファブリック ランタイム 1.3 以降 (Spark 3.5 以降) が必要です。
PySpark では、Job Insight ライブラリへのアクセスはサポートされていません。
主な機能
対話型 Spark ジョブ分析: ジョブ、ステージ、Executor の詳細を含む Spark 実行メトリックにアクセスします。
実行メトリックを保持する: Spark ジョブ実行メトリックを lakehouse テーブルに保存して、レポートと統合を行います。
Spark イベント ログのコピー: OneLake または Azure Data Storage にイベント ログをエクスポートします。
サンプル ノートブック
提供されているサンプル ノートブック (サンプル ipynb ファイル) を使用して開始できます。 ノートブックには次のものが含まれます。
- サンプル
analyze()とloadJobInsight()コード - コマンドの表示 (例:
queries.show()) - イベント ログのコピーの例。
作業の開始
1. 完了した Spark ジョブを分析する
analyze API を使用して、完了した Spark ジョブから構造化された実行データを抽出します。
import com.microsoft.jobinsight.diagnostic.SparkDiagnostic
val jobInsight = SparkDiagnostic.analyze(
$workspaceId,
$artifactId,
$livyId,
$jobType,
$stateStorePath,
$attemptId
)
val queries = jobInsight.queries
val jobs = jobInsight.jobs
val stages = jobInsight.stages
val tasks = jobInsight.tasks
val executors = jobInsight.executors
2. メトリックとログを lakehouse に保存する
レポートまたは統合のために、分析出力を lakehouse テーブルに保存します。
val df = jobInsight.queries
df.write
.format("delta")
.mode("overwrite")
.saveAsTable("sparkdiagnostic_lh.Queries")
ジョブ、ステージ、Executor などの他のコンポーネントに同じロジックを適用します。
3. 前の分析を再読み込みする
既に分析を実行して出力を保存している場合は、プロセスを繰り返さずに再読み込みします。
import com.microsoft.jobinsight.diagnostic.SparkDiagnostic
val jobInsight = SparkDiagnostic.loadJobInsight(
$stateStorePath
)
val queries = jobInsight.queries
val jobs = jobInsight.jobs
val stages = jobInsight.stages
val tasks = jobInsight.tasks
val executors = jobInsight.executors
4. Spark イベント ログをコピーする
次の API を使用して、Spark イベント ログを ABFSS の場所 (OneLake や Azure Data Lake Storage (ADLS) Gen2 など) にコピーします。
import com.microsoft.jobinsight.diagnostic.LogUtils
val contentLength = LogUtils.copyEventLog(
$workspaceId,
$artifactId,
$livyId,
$jobType,
$targetDirectory,
$asyncMode,
$attemptId
)
ベスト プラクティス
すべての ABFSS パスに対して正しい読み取り/書き込みアクセス許可があることを確認します。
analyze()出力を再利用のために永続的な場所に保存します。大きなジョブのログをコピーして待機時間を短縮する場合は、
asyncMode = trueを使用します。逆シリアル化の問題を回避するためのイベント ログのサイズと構造の監視。
トラブルシューティング
| 問題点 | 解決策 |
|---|---|
| 書き込みアクセスが拒否されました | ターゲット ABFSS ディレクトリの書き込みアクセス許可を確認します。 |
| stateStorePath は既に存在します | analyze() の呼び出しごとに、まだ存在しない新しいパスを使用します。 |