Freigeben über


Jobs aktualisieren, wenn Sie veraltete Arbeitsbereiche auf Unity Catalog aktualisieren

Wenn Sie legacy-Arbeitsbereiche auf den Unity-Katalog aktualisieren, müssen Sie möglicherweise vorhandene Aufträge aktualisieren, um auf aktualisierte Tabellen und Dateipfade zu verweisen. In der folgenden Tabelle sind typische Szenarien und Vorschläge zum Aktualisieren Ihrer Aufträge aufgeführt.

Eine Demo zum Aktualisieren von Aufträgen in Unity-Katalog finden Sie unter Aktualisieren eines Auftrags auf Unity-Katalog.

Scenario Solution
Job verwendet ein Notizbuch mit Verweisen auf benutzerdefinierte Bibliotheken, entweder über ein Init-Skript oder über clusterdefinierte Bibliotheken.
Eine benutzerdefinierte Bibliothek wäre als ein nicht öffentlich verfügbares pip Paket oder Jar definiert, das Apache Spark- oder SQL-Lese- oder Schreibvorgänge ausführt, die in den Code eingebettet sind.
Ändern Sie die benutzerdefinierte Bibliothek, um folgendes sicherzustellen:
  • Datenbanknamen sind dreistufige Namespaces.
  • Bereitstellungspunkte werden im Code nicht verwendet.
Job verwendet ein Notebook, das aus einer Hive-Metastore-Tabelle liest oder in sie schreibt.
  • Evaluieren Sie das Festlegen des Standardkatalogs in der Spark-Konfiguration des Job-Clusters: spark.databricks.sql.initial.catalog.name my_catalog
  • Bewerten Sie, ob der Standardkatalog des Arbeitsbereichs auf andere als hive_metastore festgelegt werden kann, damit der Auftragscode nicht geändert werden muss.
  • Ändern Sie andernfalls den Code des Jobs, um zweistufige Namespaces in dreistufige Namespaces der entsprechenden Tabelle umzubenennen.
  • Wenn der Auftrag reines SQL verwendet, sollten Sie eine USE CATALOG Anweisung hinzufügen.
Der Job verwendet ein Notebook, das aus Pfaden liest oder in Pfade schreibt, die Unterordner von Tabellen sind. Dies ist im Unity-Katalog nicht möglich.
  • Ändern Sie den Code, um aus der Tabelle mit einem Prädikat für die Partitionierungsspalte zu lesen.
  • Ändern Sie den Code, um mit overwriteByPartition oder einer anderen geeigneten Option in die Tabelle zu schreiben.
Der Job verwendet ein Notebook, das aus Pfaden liest oder in Pfade schreibt, die in Unity Catalog registrierte Tabellen sind
  • Ändern Sie Code, um auf die richtige dreistufige Namespacetabelle zu verweisen.
  • Wenn die Tabelle nicht registriert ist oder sein wird, muss der Code trotzdem so geändert werden, dass er in einen Volume-Pfad statt in einen Mount-Pfad schreibt.
Der Job verwendet ein Notebook, das Dateien, nicht Tabellen, über Mount-Pfade liest oder schreibt. Ändern Sie den Code, um stattdessen in einen Volumespeicherort zu schreiben.
Der Job ist ein Streaming-Job, der applyInPandasWithState verwendet. Wird derzeit nicht unterstützt. Erwägen Sie eine Neufassung, wenn möglich, oder versuchen Sie nicht, diese Aufgabe umzugestalten, bis der Support bereitgestellt wird.
Der Job ist ein Streaming-Job, der den Modus der kontinuierlichen Verarbeitung verwendet. Der fortlaufende Verarbeitungsmodus ist in Spark experimentell und wird im Unity-Katalog nicht unterstützt. Umgestalten Sie den Auftrag so, dass strukturiertes Streaming verwendet wird. Wenn dies nicht möglich ist, sollten Sie den Job gegen den Hive-Metaspeicher ausführen lassen.
Der Job ist ein Streaming-Job, der Checkpoint-Verzeichnisse verwendet.
  • Verschieben Sie Checkpoint-Verzeichnisse auf Volumes.
  • Ändern Sie den Code im Notebook so, dass ein Volume-Pfad verwendet wird.
  • Der Besitzer des Jobs sollte Lese- und Schreibrechte für diesen Pfad haben.
  • Auftrag beenden.
  • Verschieben Sie den Checkpoint an den neuen Speicherort des Volumes.
  • Auftrag neu starten.
Der Job hat eine Cluster-Definition unter Databricks Runtime 11.3.
  • Ändern Sie die Auftragsclusterdefinition in Databricks Runtime 11.3 oder höher.
  • Ändern Sie die Auftragsclusterdefinition so, dass ein festgelegter oder standardmäßiger Zugriffsmodus verwendet wird.
Job verfügt über Notizbücher, die mit Speicher oder Tabellen interagieren. Das Dienstprinzipal, als das der Job ausgeführt wurde, muss über Lese- und Schreibzugriff auf die erforderlichen Ressourcen im Unity Catalog verfügen, wie z. B. Volumes, Tabellen, externe Speicherorte usw.
Auftrag ist ein Lakeflow Spark Declarative Pipelines.
  • Ändern Sie den Job-Cluster auf Databricks Runtime 13.1 oder höher.
  • Beenden Sie die Aufgabe "Lakeflow Spark Declarative Pipelines".
  • Verschieben Sie die Daten in eine von Unity Catalog verwaltete Tabelle.
  • Ändern Sie die Auftragsdefinition von Lakeflow Spark Declarative Pipelines so, dass die neue verwaltete Tabelle des Unity-Katalogs verwendet wird.
  • Starten Sie den Lakeflow Spark Declarative Pipelines-Auftrag neu.
Der Job umfasst Notebooks, die Cloud-Dienste ohne Storage verwenden, wie z. B. AWSKinesis, und die für die Verbindung verwendete Konfiguration verwendet ein Instanz-Profil.
  • Ändern Sie den Code so, dass Unity Catalog-Dienstanmeldeinformationen verwendet werden, die die Anmeldeinformationen steuern, die mit nicht speicherfreien Clouddiensten interagieren können, indem temporäre Anmeldeinformationen generiert werden, die von den SDKs verwendet werden können.
Job verwendet Scala
  • Wenn Sie Databricks Runtime 13.3 unterschreiten, führen Sie den Auftrag mit einem dedizierten Compute-System aus.
  • Standardcluster werden für Databricks Runtime 13.3 und höher unterstützt.
Job verfügt über Notizbücher, die Scala-UDFs verwenden
  • Wenn Sie Databricks Runtime 13.3 unterschreiten, führen Sie den Auftrag mit einem dedizierten Compute-System aus.
  • Standardcluster werden für Databricks Runtime 14.2 unterstützt.
Job hat Aufgaben, die MLR verwenden Wird auf dedizierten Rechenressourcen ausgeführt.
Der Job verfügt über eine Clusterkonfiguration, die auf globalen Init-Skripts basiert.
  • Verwenden Sie Databricks Runtime 13.3 oder höher für den vollständigen Support.
  • Ändern Sie dies so, dass clusterweite Initskripts verwendet werden oder verwenden Sie Clusterrichtlinien. Skripts, Dateien und Pakete müssen auf Unity-Katalogvolumes installiert werden, um sie auszuführen.
Ein Job verfügt über eine Clusterkonfiguration oder Notebooks, die Jars/Maven, Spark-Erweiterungen oder benutzerdefinierte Datenquellen (aus Spark) verwenden.
  • Verwenden Sie Databricks Runtime 13.3 oder höher.
  • Verwenden Sie Clusterrichtlinien zum Installieren von Bibliotheken.
Job verfügt über Notebooks, die PySpark UDFs verwenden. Verwenden Sie Databricks Runtime 13.2 oder höher.
Job verfügt über Notizbücher mit Python-Code, der Netzwerkaufrufe durchführt. Verwenden Sie Databricks Runtime 12.2 oder höher.
Der Job hat Notebooks, die Pandas UDFs (skalar) verwenden. Verwenden Sie Databricks Runtime 13.2 oder höher.
Der Job verwendet Unity Catalog Volumes. Verwenden Sie Databricks Runtime 13.3 oder höher.
Job verfügt über Notizbücher, die spark.catalog.X (tableExists, listTables, setDefaultCatalog) verwenden und auf einem freigegebenen Cluster ausgeführt werden.
  • Verwenden Sie Databricks Runtime 14.2 oder höher.
  • Wenn das Databricks-Runtime-Upgrade nicht möglich ist, führen Sie die folgenden Schritte aus: Verwenden Sie anstelle des tableExistsfolgenden Codes den folgenden Code: # SQL workaround def tableExistsSql(tablename): try: spark.sql(f"DESCRIBE TABLE {tablename};") except Exception as e: return False return True tableExistsSql("jakob.jakob.my_table") Anstelle von listTables, verwenden SHOW TABLES (ermöglicht auch das Einschränken des Datenbank- oder Musterabgleichs): spark.sql("SHOW TABLES") Zum Ausführen von setDefaultCatalog spark.sql("USE CATALOG ")
Der Job hat Notebooks, die die interne DButils-API verwenden: Command Context und führt einen gemeinsamen Cluster aus.
Workloads, die versuchen, auf den Befehlszugriff zuzugreifen, z.B. um eine Job-ID abzurufen, verwenden
dbutils.notebook.entry_point.getDbutils().notebook().getContext().toJson()
Verwenden Sie statt .toJson() jetzt .safeToJson(). Dadurch erhalten Sie eine Teilmenge aller Befehlskontextinformationen, die sicher auf einem gemeinsamen Cluster geteilt werden können.
Erfordert Databricks Runtime 13.3 LTS+
Job hat Notebooks, die PySpark verwenden: spark.udf.registerJavaFunction und wird über einen gemeinsamen Cluster ausgeführt
  • Verwenden von Databricks Runtime 14.3 LTS oder höher
  • Für Notebooks und Jobs verwenden Sie eine %scala-Zelle, um die Scala UDF mit spark.udf.register zu registrieren. Da Python und Scala den Ausführungskontext gemeinsam nutzen, steht auch die Scala UDF aus Python zur Verfügung.
  • Für Kunden, die IDEs verwenden (mit Databricks Connect v2), besteht die einzige Option darin, die UDF als Unity Catalog Python UDF umzuschreiben. In Zukunft planen wir die Erweiterung der Unterstützung für Unity Catalog UDFs zur Unterstützung von Scala.
Job hat Notebooks, die RDDs verwenden: sc.parallelize & spark.read.json() zur Konvertierung eines JSON-Objekts in eine DF und führt einen gemeinsamen Cluster aus.
  • Verwenden Sie stattdessen json.loads.

Beispiel-
Before:
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
df = spark.read.json(sc.parallelize(json_list))
display(df)
After:
from pyspark.sql import Row
import json
# Sample JSON data as a list of dictionaries (similar to JSON objects)
json_data_str = response.text
json_data = [json.loads(json_data_str)]
# Convert dictionaries to Row objects
rows = [Row(**json_dict) for json_dict in json_data]
# Create DataFrame from list of Row objects
df = spark.createDataFrame(rows)
df.display()
Der Job verfügt über Notebooks, die RDDs verwenden: Empty Dataframe über sc.emptyRDD() und wird mit einem gemeinsamen Cluster ausgeführt Beispiel-
Before:
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(sc.emptyRDD[Row], schema)
After:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil)
spark.createDataFrame(new java.util.ArrayList[Row](), schema)
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType([StructField("k", StringType(), True)])
spark.createDataFrame([], schema)
Job hat Notebooks, die RDDs verwenden: mapPartitions (teure Initialisierungslogik + billigere Operationen pro Zeile) und wird in einem gemeinsamen Cluster ausgeführt
  • Grund – Unity Catalog verwendet Spark Connect für die Kommunikation zwischen Python-/Scala-Programmen und dem Spark-Server, wodurch RDDs nicht mehr zugänglich sind. Vor: Ein typischer Anwendungsfall für RDDs besteht darin, teure Initialisierungslogik nur einmal auszuführen und dann günstigere Vorgänge pro Zeile auszuführen. Ein solcher Anwendungsfall kann das Aufrufen eines externen Dienstes oder das Initialisieren von Verschlüsselungslogik sein. Nach: Schreiben Sie RDD-Vorgänge mithilfe der Dataframe-API und verwenden Sie pySpark native Arrow UDFs.
Job hat Notebooks, die SparkContext (sc) & sqlContext verwenden und mit einem gemeinsamen Cluster ausgeführt werden
  • Reason - Spark Context (sc) & sqlContext sind aufgrund der gemeinsamen Clusterarchitektur des Unity-Katalogs und SparkConnect nicht entwurfsbedingt verfügbar. So lösen Sie Folgendes: Verwenden Sie Spark-Variable, um mit den Einschränkungen der SparkSession-Instanz zu interagieren: Der Zugriff auf spark JVM kann nicht direkt über python/ Scala REPL erfolgen, nur über Spark-Befehle. Dies bedeutet, dass sc._jvm Befehle standardmäßig fehlschlagen. Die folgenden sc-Befehle werden nicht unterstützt: emptyRDD, range, init_batched_serializer, parallelize, pickleFile, textFile, wholeTextFiles, binaryFiles, binaryRecords, sequenceFile, newAPIHadoopFile, newAPIHadoopRDD, hadoopFile, hadoopRDD, union, runJob, setSystemProperty, uiWebUrl, stop, setJobGroup, setLocalProperty, getConf
Job verfügt über Notebooks, die die Spark-Konfiguration `sparkContext.getConf` nutzen und auf einem gemeinsam genutzten Cluster laufen.
  • Reason – sparkContext, df.sparkContext, sc.sparkContext und ähnliche APIs sind nicht entwurfsweise verfügbar. So lösen Sie Folgendes: Verwenden Sie stattdessen spark.conf.
Job hat Notizbücher, die SparkContext - SetJobDescription() verwenden und auf einem freigegebenen Cluster ausgeführt werden.
  • Reason - sc.setJobDescription("String") ist aufgrund der Architektur des gemeinsamen Clusters im Unity Catalog und von SparkConnect absichtlich nicht verfügbar. So lösen Sie Folgendes: Verwenden Sie stattdessen Tags, wenn möglich [PySpark docs] spark.addTag() kann ein Tag anfügen, und getTags() und interruptTag(tag) können verwendet werden, um auf das Vorhandensein/Fehlen eines Tags zu reagieren, erfordert Databricks Runtime 14.1+
Job verfügt über Notebooks, die den Spark-Log-Level mit Befehlen wie sc.setLogLevel("INFO") einstellen und auf einem freigegebenen Cluster laufen.
  • Grund: In Single-User- und ohne Isolationscluster ist es möglich, auf den Spark-Kontext zuzugreifen, um das Logging-Level über Treiber und Executors hinweg dynamisch festzulegen. Auf freigegebenen Clustern war diese Methode nicht über den Spark-Kontext zugänglich, und in Databricks Runtime 14+ ist der Spark-Kontext nicht mehr verfügbar. So lösen Sie Folgendes: Um die Protokollebene zu steuern, ohne ein log4j.conf bereitzustellen, ist es jetzt möglich, einen Spark-Konfigurationswert in den Clustereinstellungen zu verwenden. Verwenden Sie Spark Log Levels, indem Sie spark.log.level auf DEBUG, WARN, INFO, ERROR als Spark-Konfigurationswert in den Clustereinstellungen festlegen.
Job hat Notebooks, die tief verschachtelte Ausdrücke / Abfragen verwenden und über einen gemeinsamen Cluster ausgeführt werden
  • Reason - RecursionError / Protobuf maximale Schachtelungsebene überschritten (für tief geschachtelte Ausdrücke / Abfragen) Wenn rekursiv verschachtelte DataFrames und Ausdrücke mithilfe der PySpark DataFrame-API erstellt werden, ist es möglich, dass in bestimmten Fällen eine der folgenden Fälle auftreten kann:
    • Python-Ausnahme: RecursionError: maximale Rekursionstiefe überschritten
    • SparkConnectGprcException: Maximale Verschachtelungsebene von Protobuf überschritten
    So lösen Sie Folgendes: Um das Problem zu umgehen, identifizieren Sie tief geschachtelte Codepfade, und schreiben Sie sie mit linearen Ausdrücken/ Unterabfragen oder temporären Ansichten um. Zum Beispiel: Anstatt df.withColumn rekursiv aufzurufen, rufen Sie stattdessen df.withColumns(dict) auf.
Job hat ein Notizbuch, das die Funktion input_file_name() im Code verwendet und auf einem freigegebenen Cluster läuft.
  • Grund : input_file_name() wird im Unity-Katalog für freigegebene Cluster nicht unterstützt. So lösen Sie Folgendes: So rufen Sie den Dateinamen ab .withColumn("RECORD_FILE_NAME", col("_metadata.file_name")) funktioniert für spark.read So rufen Sie den gesamten Dateipfad ab .withColumn("RECORD_FILE_PATH", col("_metadata.file_path")) wird für spark.read funktionieren
Job hat Notebooks, die Datenoperationen in DBFS-Dateisystemen ausführen und mit einem freigegebenen Cluster laufen.
  • de-DE: Grund: Wenn Sie DBFS mit gemeinsam genutztem Cluster unter Verwendung des FUSE-Dienstes verwenden, kann es das Dateisystem nicht erreichen und generiert einen Datei nicht gefunden-Fehler Beispiel: Hier sind einige Beispiele, bei denen der Zugang zu DBFS mit gemeinsam genutztem Cluster fehlschlägt with open('/dbfs/test/sample_file.csv', 'r') as file:ls -ltr /dbfs/testcat /dbfs/test/sample_file.csvSo lösen Sie das Problem: Entweder verwenden Sie -
    • Databricks Unity Catalog Volume anstelle der Nutzung von DBFS (Bevorzugt)
    • Aktualisieren Sie den Code, um dbutils oder Spark zu verwenden, das den direkten Zugriffspfad zum Speicher durchläuft und Zugriff auf DBFS vom gemeinsam genutzten Cluster erhält.