共用方式為


Spark 提交任務淘汰通知和遷移指南

警告

Spark Submit工作已被棄用,並等待移除。 不允許將此任務類型用於新的使用案例,並且強烈不建議現有客戶使用。 如需此工作類型的原始文件,請參閱 Spark Submit (舊版)。 請繼續閱讀移轉指示。

為什麼 Spark Submit 被取代?

Spark 提交工作類型因 JARNotebookPython 指令碼工作中不存在的技術限制和功能差距而被取代。 這些工作可讓您更妥善地與 Databricks 功能整合、改善效能,以及更高的可靠性。

淘汰措施

Databricks 正在實作下列與淘汰相關的措施:

  • 限制建立:只有從 2025 年 11 月開始的上個月使用過 Spark Submit 任務的使用者才能建立新的 Spark Submit 任務。 如果您需要例外狀況,請聯絡您的帳戶支援人員。
  • Databricks 執行時版本限制Spark Submit 的使用僅限於現有的 Databricks 執行時版本及維護版本。 現有搭載 Spark Submit 的 Databricks 執行時版本,將持續接收安全與錯誤修正維護版本,直到該功能完全關閉為止。 Databricks 執行環境 17.3+ 與 18.x+ 不支援此任務類型。
  • UI 警告:當 Spark Submit 任務正在使用時,警告會出現在整個 Databricks UI 中,並且會向現有使用者帳戶的工作區系統管理員傳送通訊。

將 JVM 工作量移轉至 JAR 作業

對於 JVM 工作負載,請將 Spark 提交 任務遷移至 JAR 任務。 JAR 工作提供更好的功能支援,以及與 Databricks 的整合。

請依照下列步驟移轉:

  1. 在您的工作裡建立新的 JAR 任務。
  2. Spark 提交 任務參數中,識別前三個引數。 他們通常遵循以下模式: ["--class", "org.apache.spark.mainClassName", "dbfs:/path/to/jar_file.jar"]
  3. 移除參數 --class
  4. 將主要類別名稱 (例如 org.apache.spark.mainClassName) 設為 JAR 作業的主要 類別
  5. 在 JAR 作業配置中提供 JAR 檔案的路徑 (例如, dbfs:/path/to/jar_file.jar)。
  6. Spark 提交 任務中的任何剩餘引數複製到 JAR 任務參數。
  7. 執行 JAR 工作並驗證其是否如預期般運作。

如需配置 JAR 作業的詳細資訊,請參閱 JAR 作業

移轉 R 工作負載

如果您直接從 Spark 提交 工作啟動 R 腳本,則有多個移轉路徑可供使用。

選項 A:使用筆記本工作

將您的 R 腳本移轉至 Databricks 筆記本。 筆記本工作支援一整套功能,包括叢集自動調整,並提供與 Databricks 平台更好的整合。

選項 B:從筆記本工作啟動 R 指令碼

使用 Notebook 任務來初始化 R 腳本。 使用下列程式碼建立筆記本,並參考 R 檔案作為作業參數。 如有需要,修改以新增 R 指令碼所使用的參數:

dbutils.widgets.text("script_path", "", "Path to script")
script_path <- dbutils.widgets.get("script_path")
source(script_path)

尋找使用 Spark Submit 任務的工作

您可以使用下列 Python 指令碼來識別工作區中包含 Spark 提交任務的作業。 需要有效的 個人存取權或其他權杖 ,而且應該使用 您的工作區 URL

選項 A:快速掃描(首先運行,僅限持續性工作)

此指令碼只會掃描持續性工作(透過 /jobs/create 或 Web 介面建立),不包括透過 /runs/submit建立的暫時工作。 這是識別 Spark 提交使用情況的建議第一線方法,因為它速度要快得多。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_jobs.py

Output:
    CSV format with columns: Job ID, Owner ID/Email, Job Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    # Scan workspace for persistent jobs with Spark Submit tasks
    # Using list() to scan only persistent jobs (faster than list_runs())
    print(
        "Scanning workspace for persistent jobs with Spark Submit tasks...",
        file=sys.stderr,
    )
    jobs_with_spark_submit = []
    total_jobs = 0

    # Iterate through all jobs (pagination is handled automatically by the SDK)
    skipped_jobs = 0
    for job in client.jobs.list(expand_tasks=True, limit=25):
        try:
            total_jobs += 1
            if total_jobs % 1000 == 0:
                print(f"Scanned {total_jobs} jobs total", file=sys.stderr)

            # Check if job has any Spark Submit tasks
            if job.settings and job.settings.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in job.settings.tasks
                )

                if has_spark_submit:
                    # Extract job information
                    job_id = job.job_id
                    owner_email = job.creator_user_name or "Unknown"
                    job_name = job.settings.name or f"Job {job_id}"

                    jobs_with_spark_submit.append(
                        {"job_id": job_id, "owner_email": owner_email, "job_name": job_name}
                    )
        except PermissionDenied:
            # Skip jobs that the user doesn't have permission to access
            skipped_jobs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_jobs} jobs total", file=sys.stderr)
    if skipped_jobs > 0:
        print(
            f"Skipped {skipped_jobs} jobs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(jobs_with_spark_submit)} jobs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if jobs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(jobs_with_spark_submit)
    else:
        print("No jobs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

選項 B:全面掃描 (較慢,包括過去 30 天的暫時作業)

如果您需要識別透過 /runs/submit建立的暫時性工作,請使用此更詳盡的指令碼。 此指令碼會掃描工作區中過去 30 天內的所有作業執行,包括持續性作業 (透過 /jobs/create建立) 和暫時作業。 此指令碼可能需要數小時才能在大型工作區中執行。

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_runs.py

Output:
    CSV format with columns: Job ID, Run ID, Owner ID/Email, Job/Run Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    thirty_days_ago_ms = int((time.time() - 30 * 24 * 60 * 60) * 1000)

    # Scan workspace for runs with Spark Submit tasks
    # Using list_runs() instead of list() to include ephemeral jobs created via /runs/submit
    print(
        "Scanning workspace for runs with Spark Submit tasks from the last 30 days... (this will take more than an hour in large workspaces)",
        file=sys.stderr,
    )
    runs_with_spark_submit = []
    total_runs = 0
    seen_job_ids = set()

    # Iterate through all runs (pagination is handled automatically by the SDK)
    skipped_runs = 0
    for run in client.jobs.list_runs(
        expand_tasks=True,
        limit=25,
        completed_only=True,
        start_time_from=thirty_days_ago_ms,
    ):
        try:
            total_runs += 1
            if total_runs % 1000 == 0:
                print(f"Scanned {total_runs} runs total", file=sys.stderr)

            # Check if run has any Spark Submit tasks
            if run.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in run.tasks
                )

                if has_spark_submit:
                    # Extract job information from the run
                    job_id = run.job_id if run.job_id else "N/A"
                    run_id = run.run_id if run.run_id else "N/A"
                    owner_email = run.creator_user_name or "Unknown"
                    # Use run name if available, otherwise try to construct a name
                    run_name = run.run_name or (
                        f"Run {run_id}" if run_id != "N/A" else "Unnamed Run"
                    )

                    # Track unique job IDs to avoid duplicates for persistent jobs
                    # (ephemeral jobs may have the same job_id across multiple runs)
                    key = (job_id, run_id)
                    if key not in seen_job_ids:
                        seen_job_ids.add(key)
                        runs_with_spark_submit.append(
                            {
                                "job_id": job_id,
                                "run_id": run_id,
                                "owner_email": owner_email,
                                "job_name": run_name,
                            }
                        )
        except PermissionDenied:
            # Skip runs that the user doesn't have permission to access
            skipped_runs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_runs} runs total", file=sys.stderr)
    if skipped_runs > 0:
        print(
            f"Skipped {skipped_runs} runs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(runs_with_spark_submit)} runs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if runs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "run_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(runs_with_spark_submit)
    else:
        print("No runs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

需要幫忙?

如果您需要其他協助,請聯絡您的帳戶支援人員。