作业见解是一个基于 Java 的诊断库,旨在帮助你在 Microsoft Fabric 中以交互方式分析已完成的 Spark 应用程序。 使用 Scala 检索结构化执行数据(例如查询、作业、阶段、任务和执行程序),通过作业见解,可让你更深入地了解 Spark 作业。
无论是排查性能问题还是执行自定义诊断,Job Insight 库都允许将 Spark 遥测用作本机 Spark 数据集,以便更轻松地排查性能问题并浏览执行见解。
注释
尚不支持使用 PySpark 访问作业见解库。
先决条件
仅支持 Scala。
需要 Fabric 运行时 1.3 或更高版本(使用 Spark 3.5+)。
PySpark 不支持访问 Job Insight 库。
关键功能
交互式 Spark 作业分析:访问 Spark 执行指标,包括作业、阶段和执行程序详细信息。
保留执行指标:将 Spark 作业执行指标保存到 lakehouse 表,以便进行报告和集成。
Spark 事件日志复制:将事件日志导出到 OneLake 或 Azure 数据存储。
已知限制
目前,库不支持处理大型事件日志,例如超过 20 MB 或深度嵌套结构的字符串。
示例笔记本
可以使用提供的示例笔记本 (示例 ipynb 文件) 开始。 笔记本包括:
- 示例
analyze()和loadJobInsight()代码 - 显示命令(例如)
queries.show() - 事件日志复制示例。
入门指南
1.分析已完成的 Spark 作业
使用 analyze API 从已完成的 Spark 作业中提取结构化执行数据:
import com.microsoft.jobinsight.diagnostic.SparkDiagnostic
val jobInsight = SparkDiagnostic.analyze(
$workspaceId,
$artifactId,
$livyId,
$jobType,
$stateStorePath,
$attemptId
)
val queries = jobInsight.queries
val jobs = jobInsight.jobs
val stages = jobInsight.stages
val tasks = jobInsight.tasks
val executors = jobInsight.executors
2. 将指标和日志保存到 Lakehouse
将分析输出保存到 lakehouse 表以报告或集成:
val df = jobInsight.queries
df.write
.format("delta")
.mode("overwrite")
.saveAsTable("sparkdiagnostic_lh.Queries")
将相同的逻辑应用于其他组件,例如作业、阶段或执行程序。
3.重新加载以前的分析
如果已运行分析并保存了输出,请在不重复此过程的情况下重新加载它:
import com.microsoft.jobInsight.diagnostic.SparkDiagnostic
val jobInsight = SparkDiagnostic.loadJobInsight(
$stateStorePath
)
val queries = jobInsight.queries
val jobs = jobInsight.jobs
val stages = jobInsight.stages
val tasks = jobInsight.tasks
val executors = jobInsight.executors
4.复制 spark 事件日志
使用此 API 将 Spark 事件日志复制到 ABFSS 位置(如 OneLake 或 Azure Data Lake Storage (ADLS) Gen2:
import com.microsoft.jobInsight.diagnostic.LogUtils
val contentLength = LogUtils.copyEventLog(
$workspaceId,
$artifactId,
$livyId,
$jobType,
$targetDirectory,
$asyncMode,
$attemptId
)
最佳做法
确保所有 ABFSS 路径都具有正确的读/写权限。
将输出保存到
analyze()持久位置以供重复使用。复制大型作业的日志以减少延迟时使用
asyncMode = true。监视事件日志大小和结构,以避免反序列化问题。
Troubleshooting
| 問题 | 决议 |
|---|---|
| 写入访问被拒绝 | 检查目标 ABFSS 目录的写入权限。 |
| stateStorePath 已存在 | 对要分析的每个调用使用尚不存在的新路径()。 |