Partager via


Exécution de requête adaptative

L’exécution de requête adaptative (AQE) est une nouvelle optimisation des requêtes qui se produit pendant l’exécution de la requête.

La motivation de la re-optimisation du runtime est qu’Azure Databricks a les statistiques précises les plus up-to-date à la fin d’un échange aléatoire et de diffusion (appelé étape de requête dans AQE). Par conséquent, Azure Databricks peut opter pour une meilleure stratégie physique, choisir une taille et un nombre optimaux de partitions après le shuffle, ou effectuer des optimisations qui nécessitaient auparavant des indications, par exemple la gestion des jointures déséquilibrées.

Cela peut être très utile lorsque la collecte des statistiques n’est pas activée ou lorsque les statistiques sont obsolètes. Il est également utile dans les endroits où les statistiques dérivées statiquement sont inexactes, comme au milieu d’une requête complexe ou après l’occurrence d’un asymétrie des données.

Capacités

AQE est activé par défaut. Elle comporte quatre fonctionnalités majeures :

  • Modifie dynamiquement la jointure de fusion de tri en jointure de synthèse de diffusion.
  • Fusionne dynamiquement les partitions (combine de petites partitions en partitions de taille raisonnable) après le shuffle. Les petites tâches ont un débit d’E/S pire et ont tendance à souffrir davantage de la surcharge de planification et de la surcharge de configuration des tâches. Combiner de petites tâches économise les ressources et améliore le débit du cluster.
  • Gère dynamiquement l’asymétrie dans la jointure de fusion de tri et la jointure de hachage aléatoire en fractionnant (et en répliquant si nécessaire) les tâches asymétriques en tâches de taille approximative.
  • Détection et propagation dynamique des relations vides.

Application

AQE s’applique à toutes les requêtes qui sont :

  • Non-diffusion en continu
  • Contiennent au moins un échange (généralement lorsqu’il existe une jointure, un agrégat ou une fenêtre), une sous-requête ou les deux.

Toutes les requêtes appliquées à AQE ne sont pas nécessairement re-optimisées. La ré-optimisation peut ou ne pas apparaître avec un plan de requête différent de celui compilé statiquement. Pour déterminer si le plan d’une requête a été modifié par AQE, consultez la section suivante, Plans de requête.

Plans de requête

Cette section explique comment examiner les plans de requête de différentes façons.

Dans cette section :

Interface utilisateur Spark

AdaptiveSparkPlan noeud

Les requêtes appliquées par AQE contiennent un ou plusieurs AdaptiveSparkPlan nœuds, généralement en tant que nœud racine de chaque requête principale ou sous-requête. Avant l’exécution de la requête ou lorsqu’elle est en cours d’exécution, l’indicateur isFinalPlan du nœud correspondant AdaptiveSparkPlan s’affiche comme false; une fois l’exécution de la requête terminée, l’indicateur isFinalPlan passe à true.

Plan en évolution

Le diagramme du plan de requête évolue à mesure que l’exécution progresse et reflète le plan le plus actuel en cours d’exécution. Les nœuds qui ont déjà été exécutés (dans lesquels les métriques sont disponibles) ne changeront pas, mais ceux qui ne l'ont pas encore été peuvent changer au fil du temps à la suite de ré-optimisations.

Voici un exemple de diagramme de plan de requête :

Diagramme du plan de requête

DataFrame.explain()

AdaptiveSparkPlan noeud

Les requêtes appliquées par AQE contiennent un ou plusieurs AdaptiveSparkPlan nœuds, généralement en tant que nœud racine de chaque requête principale ou sous-requête. Avant l’exécution de la requête ou lorsqu’elle est en cours d’exécution, l’indicateur isFinalPlan du nœud correspondant AdaptiveSparkPlan s’affiche comme false; une fois l’exécution de la requête terminée, l’indicateur isFinalPlan passe à true.

Plan actuel et initial

Sous chaque AdaptiveSparkPlan nœud, il y aura à la fois le plan initial (le plan avant d’appliquer des optimisations AQE) et le plan actuel ou final, selon que l’exécution est terminée. Le plan actuel évoluera à mesure que l’exécution progresse.

Statistiques d’exécution

Chaque étape de remaniement et de diffusion contient des statistiques de données.

Avant l’exécution de l’étape ou lorsque l’étape est en cours d’exécution, les statistiques sont des estimations au moment de la compilation et l’indicateur isRuntime est false, par exemple : Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Une fois l’exécution de l’étape terminée, les statistiques sont celles collectées au moment de l’exécution, et l’indicateur isRuntime devient true, par exemple : Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Voici un DataFrame.explain exemple :

  • Avant l’exécution

    Avant l’exécution

  • Pendant l’exécution

    Pendant l’exécution

  • Après l’exécution

    Après l’exécution

SQL EXPLAIN

AdaptiveSparkPlan noeud

Les requêtes appliquées par AQE contiennent un ou plusieurs nœuds AdaptiveSparkPlan, généralement en tant que nœud racine de chaque requête principale ou sous-requête.

Aucun plan actuel

Comme SQL EXPLAIN il n’exécute pas la requête, le plan actuel est toujours identique au plan initial et ne reflète pas ce qui finirait par être exécuté par AQE.

Voici un exemple d’explication SQL :

SQL Explain

Efficacité

Le plan de requête change si une ou plusieurs optimisations AQE prennent effet. L’effet de ces optimisations AQE est démontré par la différence entre les plans actuels et finaux et le plan initial et les nœuds de plan spécifiques dans les plans actuels et finaux.

  • Modifier dynamiquement la jointure de fusion de tri en jointure de hachage de diffusion : différents nœuds de jointure physique entre le plan actuel/final et le plan initial

    Chaîne de stratégie de combinaison

  • Fusion dynamique des partitions : nœud CustomShuffleReader avec propriété Coalesced

    Lecteur aléatoire personnalisé

    < c0>Chaîne de lecture aléatoire personnalisée

  • Gérer dynamiquement la jointure d’asymétrie : nœud SortMergeJoin avec le champ isSkew en tant que vrai.

    Plan de jointure asymétrique

    Chaîne de jointure déséquilibrée

  • Détecter et propager dynamiquement les relations vides : une partie (ou l’intégralité) du plan est remplacée par le nœud LocalTableScan avec le champ de la relation vide.

    Analyse de table locale

    Chaîne de balayage de table locale

Paramétrage

Dans cette section :

Activer et désactiver l’exécution de requêtes adaptatives

Propriété
spark.databricks.optimizer.adaptive.enabled
Entrez : Boolean
Indique s’il faut activer ou désactiver l’exécution de requête adaptative.
Valeur par défaut : true

Activer le shuffle optimisé automatiquement

Propriété
spark.sql.shuffle.partitions
Entrez : Integer
Nombre par défaut de partitions à utiliser lors de la synchronisation des données pour les jointures ou les agrégations. La définition de la valeur auto active le shuffle optimisé automatiquement, ce qui détermine automatiquement ce nombre en fonction du plan de requête et de la taille des données d’entrée de requête.
Remarque : Pour Structured Streaming, cette configuration ne peut pas être modifiée entre les redémarrages de requête à partir du même emplacement de point de contrôle.
Valeur par défaut : 200

Modifier dynamiquement la jointure de fusion de tri en jointure de hachage de diffusion

Propriété
spark.databricks.adaptive.autoBroadcastJoinThreshold
Entrez : Byte String
Seuil pour déclencher le basculement vers la jonction de diffusion au moment de l’exécution.
Valeur par défaut : 30MB

Fusion dynamique des partitions

Propriété
spark.sql.adaptive.coalescePartitions.enabled
Entrez : Boolean
Indique s’il faut activer ou désactiver la fusion de partition.
Valeur par défaut : true
spark.sql.adaptive.advisoryPartitionSizeInBytes
Entrez : Byte String
Taille cible après fusion. Les tailles de partition coalescées seront proches, mais pas plus grandes que cette taille cible.
Valeur par défaut : 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize
Entrez : Byte String
Taille minimale des partitions après fusion. Les tailles de partitions fusionnées ne seront pas inférieures à cette taille.
Valeur par défaut : 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum
Entrez : Integer
Nombre minimal de partitions après fusion. Non recommandé, car le paramètre remplace explicitement
spark.sql.adaptive.coalescePartitions.minPartitionSize.
Valeur par défaut : 2 fois le nombre de cœurs de cluster

Gérer dynamiquement la jointure déséquilibrée

Propriété
spark.sql.adaptive.skewJoin.enabled
Entrez : Boolean
Indique s’il faut activer ou désactiver la gestion des jointures asymétriques.
Valeur par défaut : true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Entrez : Integer
Facteur qui, lorsqu’il est multiplié par la taille de partition médiane, contribue à déterminer si une partition est asymétrique.
Valeur par défaut : 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Entrez : Byte String
Seuil qui contribue à déterminer si une partition est asymétrique.
Valeur par défaut : 256MB

Une partition est considérée comme asymétrique lorsque les deux (partition size > skewedPartitionFactor * median partition size) et (partition size > skewedPartitionThresholdInBytes) sont true.

Détecter et propager dynamiquement des relations vides

Propriété
spark.databricks.adaptive.emptyRelationPropagation.enabled
Entrez : Boolean
Indique s’il faut activer ou désactiver la propagation dynamique d’une relation vide.
Valeur par défaut : true

Questions fréquemment posées (FAQ)

Dans cette section :

Pourquoi AQE n’a-t-il pas diffusé une petite table de jointure ?

Si la taille de la relation censée être diffusée est inférieure à ce seuil, mais n’est toujours pas diffusée :

  • Vérifiez le type de jointure. La diffusion n’est pas prise en charge pour certains types de jointure, par exemple, la relation gauche d’un LEFT OUTER JOIN ne peut pas être diffusée.
  • Il peut également s’agir du fait que la relation contient beaucoup de partitions vides, auquel cas la majorité des tâches peuvent se terminer rapidement avec la jointure de fusion de tri ou peut être optimisée avec la gestion des jointures asymétriques. AQE évite de modifier ces jointures de fusion de tri en jointures de hachage de diffusion si le pourcentage de partitions non vides est inférieur à spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Dois-je toujours utiliser un indicateur de stratégie de jonction de diffusion avec AQE activé ?

Oui. Une jointure de diffusion planifiée statiquement est généralement plus performante qu’une jointure planifiée dynamiquement par AQE, car AQE peut ne pas basculer vers la jointure de diffusion qu'après avoir effectué une opération de redistribution des deux côtés de la jointure (au moment où les tailles de relation réelles sont obtenues). Ainsi, l’utilisation d’un indicateur de diffusion peut toujours être un bon choix si vous connaissez bien votre requête. AQE respecte les indicateurs de requête de la même façon que l’optimisation statique, mais peut toujours appliquer des optimisations dynamiques qui ne sont pas affectées par les indicateurs.

Quelle est la différence entre l’indicateur de jointure asymétrique et l’optimisation de jointure AQE ? Lequel dois-je utiliser ?

Il est recommandé de s’appuyer sur la gestion des jointures d’asymétrie AQE plutôt que d’utiliser l’indicateur de jointure d’asymétrie AQE, car la jointure d’asymétrie AQE est complètement automatique et en général fonctionne mieux que l’équivalent de l’indicateur.

Pourquoi AQE n’a-t-il pas ajusté automatiquement ma commande de jointure ?

La réorganisation dynamique des jointures ne fait pas partie de l'AQE.

Pourquoi AQE n’a-t-il pas détecté mon asymétrie des données ?

Il existe deux conditions de taille qui doivent être satisfaites pour que L’AQE détecte une partition en tant que partition asymétrique :

  • La taille de la partition est supérieure à la spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes taille (par défaut, 256 Mo)
  • La taille de la partition est supérieure à la taille médiane de toutes les partitions fois le facteur spark.sql.adaptive.skewJoin.skewedPartitionFactor de partition asymétrique (par défaut 5)

En outre, la prise en charge de la gestion des asymétries est limitée pour certains types de jointure, par exemple, dans LEFT OUTER JOIN, uniquement l’asymétrie sur le côté gauche peut être optimisée.

Héritage

Le terme « Exécution adaptative » existe depuis Spark 1.6, mais le nouvel AQE dans Spark 3.0 est fondamentalement différent. En termes de fonctionnalité, Spark 1.6 ne fait que la partie « partitions de fusion dynamique ». En termes d’architecture technique, le nouvel AQE est une infrastructure de planification dynamique et de replanification des requêtes basées sur des statistiques d’exécution, qui prend en charge diverses optimisations telles que celles que nous avons décrites dans cet article et peut être étendue pour permettre davantage d’optimisations potentielles.