Partager via


Notification de dépréciation de la tâche d’envoi Spark & Guide de migration

Avertissement

La tâche Spark Submit est déconseillée et en attente de suppression. L’utilisation de ce type de tâche n’est pas autorisée pour les nouveaux cas d’usage et fortement déconseillée pour les clients existants. Consultez Spark Submit (hérité) pour obtenir la documentation d’origine de ce type de tâche. Continuez à lire les instructions de migration.

Pourquoi Spark Submit est-il déconseillé ?

Le type de tâche Spark Submit est déconseillé en raison des limitations techniques et des lacunes de fonctionnalités qui ne se trouvent pas dans les tâches de script JAR, Notebook ou Python. Ces tâches offrent une meilleure intégration avec les fonctionnalités Databricks, des performances améliorées et une plus grande fiabilité.

Mesures de dépréciation

Databricks implémente les mesures suivantes en lien avec la dépréciation :

  • Création restreinte : seuls les utilisateurs qui ont utilisé des tâches Spark Submit au cours du mois précédent, à compter de novembre 2025, peuvent créer de nouvelles tâches Spark Submit . Si vous avez besoin d’une exception, contactez le support technique de votre compte.
  • Restrictions de version de Databricks Runtime : l’utilisation de Spark Submit est limitée aux versions existantes de Databricks Runtime et aux versions de maintenance. Les versions existantes de Databricks Runtime avec Spark Submit continueront de recevoir des versions de maintenance de sécurité et de correction de bogues jusqu’à ce que la fonctionnalité soit arrêtée complètement. Databricks Runtime 17.3+ et 18.x+ ne prend pas en charge ce type de tâche.
  • Avertissements de l’interface utilisateur : les avertissements s’affichent dans l’interface utilisateur Databricks où les tâches Spark Submit sont en cours d’utilisation et les communications sont envoyées aux administrateurs d’espace de travail dans les comptes des utilisateurs existants.

Migrer des charges de travail JVM vers des tâches JAR

Pour les charges de travail JVM, migrez vos tâches Spark Submit vers des tâches JAR. Les tâches JAR offrent une meilleure prise en charge des fonctionnalités et une intégration avec Databricks.

Procédez comme suit pour migrer :

  1. Créez une tâche JAR dans votre travail.
  2. À partir de vos paramètres de tâche Spark Submit , identifiez les trois premiers arguments. Ils suivent généralement ce modèle : ["--class", "org.apache.spark.mainClassName", "dbfs:/path/to/jar_file.jar"]
  3. Supprimez le --class paramètre.
  4. Définissez le nom de la classe principale (par exemple) org.apache.spark.mainClassNameen tant que classe Main pour votre tâche JAR.
  5. Indiquez le chemin d’accès à votre fichier JAR (par exemple) dbfs:/path/to/jar_file.jardans la configuration de la tâche JAR.
  6. Copiez les arguments restants de votre tâche Spark Submit vers les paramètres de tâche JAR.
  7. Exécutez la tâche JAR et vérifiez qu’elle fonctionne comme prévu.

Pour plus d’informations sur la configuration des tâches JAR, consultez la tâche JAR.

Migrer des charges de travail R

Si vous lancez un script R directement à partir d’une tâche Spark Submit , plusieurs chemins de migration sont disponibles.

Option A : Utiliser des tâches carnet

Migrez votre script R vers un notebook Databricks. Les tâches de notebook supportent un ensemble de fonctionnalités complet, y compris la mise à l’échelle automatique du cluster, et fournissent une intégration améliorée avec la plateforme Databricks.

Option B : Initialiser des scripts R à partir d’une tâche Notebook

Utilisez une tâche notebook pour démarrer vos scripts R. Créez un bloc-notes avec le code suivant et référencez votre fichier R en tant que paramètre de travail. Modifiez pour ajouter des paramètres utilisés par votre script R, si nécessaire :

dbutils.widgets.text("script_path", "", "Path to script")
script_path <- dbutils.widgets.get("script_path")
source(script_path)

Rechercher des travaux qui utilisent des tâches Spark Submit

Vous pouvez utiliser les scripts Python suivants pour identifier les travaux de votre espace de travail qui contiennent des tâches Spark Submit. Un accès personnel valide ou un autre jeton sera nécessaire et l’URL de votre espace de travail doit être utilisée.

Option A : Analyse rapide (exécutez ceci en premier, pour les tâches persistantes uniquement)

Ce script analyse uniquement les travaux persistants (créés via /jobs/create ou l’interface web) et n’inclut pas les travaux éphémères créés via /runs/submit. Il s’agit de la méthode de première ligne recommandée pour identifier l’utilisation de Spark Submit, car elle est beaucoup plus rapide.

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_jobs.py

Output:
    CSV format with columns: Job ID, Owner ID/Email, Job Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    # Scan workspace for persistent jobs with Spark Submit tasks
    # Using list() to scan only persistent jobs (faster than list_runs())
    print(
        "Scanning workspace for persistent jobs with Spark Submit tasks...",
        file=sys.stderr,
    )
    jobs_with_spark_submit = []
    total_jobs = 0

    # Iterate through all jobs (pagination is handled automatically by the SDK)
    skipped_jobs = 0
    for job in client.jobs.list(expand_tasks=True, limit=25):
        try:
            total_jobs += 1
            if total_jobs % 1000 == 0:
                print(f"Scanned {total_jobs} jobs total", file=sys.stderr)

            # Check if job has any Spark Submit tasks
            if job.settings and job.settings.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in job.settings.tasks
                )

                if has_spark_submit:
                    # Extract job information
                    job_id = job.job_id
                    owner_email = job.creator_user_name or "Unknown"
                    job_name = job.settings.name or f"Job {job_id}"

                    jobs_with_spark_submit.append(
                        {"job_id": job_id, "owner_email": owner_email, "job_name": job_name}
                    )
        except PermissionDenied:
            # Skip jobs that the user doesn't have permission to access
            skipped_jobs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_jobs} jobs total", file=sys.stderr)
    if skipped_jobs > 0:
        print(
            f"Skipped {skipped_jobs} jobs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(jobs_with_spark_submit)} jobs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if jobs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(jobs_with_spark_submit)
    else:
        print("No jobs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

Option B : Analyse complète (plus lente, inclut les travaux éphémères des 30 derniers jours)

Si vous devez identifier les travaux éphémères créés via /runs/submit, utilisez ce script plus exhaustif. Ce script analyse toutes les exécutions de travaux des 30 derniers jours dans votre espace de travail, y compris les travaux persistants (créés via /jobs/create) et les travaux éphémères. Ce script peut prendre des heures pour s’exécuter dans des espaces de travail volumineux.

#!/usr/bin/env python3
"""
Requirements:
    databricks-sdk>=0.20.0

Usage:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com"
    export DATABRICKS_TOKEN="your-token"
    python3 list_spark_submit_runs.py

Output:
    CSV format with columns: Job ID, Run ID, Owner ID/Email, Job/Run Name

Incorrect:
    export DATABRICKS_HOST="https://your-workspace.cloud.databricks.com/?o=12345678910"
"""

import csv
import os
import sys
import time
from databricks.sdk import WorkspaceClient
from databricks.sdk.errors import PermissionDenied


def main():
    # Get credentials from environment
    workspace_url = os.environ.get("DATABRICKS_HOST")
    token = os.environ.get("DATABRICKS_TOKEN")

    if not workspace_url or not token:
        print(
            "Error: Set DATABRICKS_HOST and DATABRICKS_TOKEN environment variables",
            file=sys.stderr,
        )
        sys.exit(1)

    # Initialize client
    client = WorkspaceClient(host=workspace_url, token=token)

    thirty_days_ago_ms = int((time.time() - 30 * 24 * 60 * 60) * 1000)

    # Scan workspace for runs with Spark Submit tasks
    # Using list_runs() instead of list() to include ephemeral jobs created via /runs/submit
    print(
        "Scanning workspace for runs with Spark Submit tasks from the last 30 days... (this will take more than an hour in large workspaces)",
        file=sys.stderr,
    )
    runs_with_spark_submit = []
    total_runs = 0
    seen_job_ids = set()

    # Iterate through all runs (pagination is handled automatically by the SDK)
    skipped_runs = 0
    for run in client.jobs.list_runs(
        expand_tasks=True,
        limit=25,
        completed_only=True,
        start_time_from=thirty_days_ago_ms,
    ):
        try:
            total_runs += 1
            if total_runs % 1000 == 0:
                print(f"Scanned {total_runs} runs total", file=sys.stderr)

            # Check if run has any Spark Submit tasks
            if run.tasks:
                has_spark_submit = any(
                    task.spark_submit_task is not None for task in run.tasks
                )

                if has_spark_submit:
                    # Extract job information from the run
                    job_id = run.job_id if run.job_id else "N/A"
                    run_id = run.run_id if run.run_id else "N/A"
                    owner_email = run.creator_user_name or "Unknown"
                    # Use run name if available, otherwise try to construct a name
                    run_name = run.run_name or (
                        f"Run {run_id}" if run_id != "N/A" else "Unnamed Run"
                    )

                    # Track unique job IDs to avoid duplicates for persistent jobs
                    # (ephemeral jobs may have the same job_id across multiple runs)
                    key = (job_id, run_id)
                    if key not in seen_job_ids:
                        seen_job_ids.add(key)
                        runs_with_spark_submit.append(
                            {
                                "job_id": job_id,
                                "run_id": run_id,
                                "owner_email": owner_email,
                                "job_name": run_name,
                            }
                        )
        except PermissionDenied:
            # Skip runs that the user doesn't have permission to access
            skipped_runs += 1
            continue

    # Print summary to stderr
    print(f"Scanned {total_runs} runs total", file=sys.stderr)
    if skipped_runs > 0:
        print(
            f"Skipped {skipped_runs} runs due to insufficient permissions",
            file=sys.stderr,
        )
    print(
        f"Found {len(runs_with_spark_submit)} runs with Spark Submit tasks",
        file=sys.stderr,
    )
    print("", file=sys.stderr)

    # Output CSV to stdout
    if runs_with_spark_submit:
        writer = csv.DictWriter(
            sys.stdout,
            fieldnames=["job_id", "run_id", "owner_email", "job_name"],
            quoting=csv.QUOTE_MINIMAL,
        )
        writer.writeheader()
        writer.writerows(runs_with_spark_submit)
    else:
        print("No runs with Spark Submit tasks found.", file=sys.stderr)


if __name__ == "__main__":
    main()

Vous avez besoin d’aide ?

Si vous avez besoin d’aide supplémentaire, contactez le support technique de votre compte.