Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
L’exécution de requête adaptative (AQE) est une nouvelle optimisation des requêtes qui se produit pendant l’exécution de la requête.
La motivation de la re-optimisation du runtime est qu’Azure Databricks a les statistiques précises les plus up-to-date à la fin d’un échange aléatoire et de diffusion (appelé étape de requête dans AQE). Par conséquent, Azure Databricks peut opter pour une meilleure stratégie physique, choisir une taille et un nombre optimaux de partitions après le shuffle, ou effectuer des optimisations qui nécessitaient auparavant des indications, par exemple la gestion des jointures déséquilibrées.
Cela peut être très utile lorsque la collecte des statistiques n’est pas activée ou lorsque les statistiques sont obsolètes. Il est également utile dans les endroits où les statistiques dérivées statiquement sont inexactes, comme au milieu d’une requête complexe ou après l’occurrence d’un asymétrie des données.
Capacités
AQE est activé par défaut. Elle comporte quatre fonctionnalités majeures :
- Modifie dynamiquement la jointure de fusion de tri en jointure de synthèse de diffusion.
- Fusionne dynamiquement les partitions (combine de petites partitions en partitions de taille raisonnable) après le shuffle. Les petites tâches ont un débit d’E/S pire et ont tendance à souffrir davantage de la surcharge de planification et de la surcharge de configuration des tâches. Combiner de petites tâches économise les ressources et améliore le débit du cluster.
- Gère dynamiquement l’asymétrie dans la jointure de fusion de tri et la jointure de hachage aléatoire en fractionnant (et en répliquant si nécessaire) les tâches asymétriques en tâches de taille approximative.
- Détection et propagation dynamique des relations vides.
Application
AQE s’applique à toutes les requêtes qui sont :
- Non-diffusion en continu
- Contiennent au moins un échange (généralement lorsqu’il existe une jointure, un agrégat ou une fenêtre), une sous-requête ou les deux.
Toutes les requêtes appliquées à AQE ne sont pas nécessairement re-optimisées. La ré-optimisation peut ou ne pas apparaître avec un plan de requête différent de celui compilé statiquement. Pour déterminer si le plan d’une requête a été modifié par AQE, consultez la section suivante, Plans de requête.
Plans de requête
Cette section explique comment examiner les plans de requête de différentes façons.
Dans cette section :
Interface utilisateur Spark
AdaptiveSparkPlan noeud
Les requêtes appliquées par AQE contiennent un ou plusieurs AdaptiveSparkPlan nœuds, généralement en tant que nœud racine de chaque requête principale ou sous-requête.
Avant l’exécution de la requête ou lorsqu’elle est en cours d’exécution, l’indicateur isFinalPlan du nœud correspondant AdaptiveSparkPlan s’affiche comme false; une fois l’exécution de la requête terminée, l’indicateur isFinalPlan passe à true.
Plan en évolution
Le diagramme du plan de requête évolue à mesure que l’exécution progresse et reflète le plan le plus actuel en cours d’exécution. Les nœuds qui ont déjà été exécutés (dans lesquels les métriques sont disponibles) ne changeront pas, mais ceux qui ne l'ont pas encore été peuvent changer au fil du temps à la suite de ré-optimisations.
Voici un exemple de diagramme de plan de requête :
DataFrame.explain()
AdaptiveSparkPlan noeud
Les requêtes appliquées par AQE contiennent un ou plusieurs AdaptiveSparkPlan nœuds, généralement en tant que nœud racine de chaque requête principale ou sous-requête. Avant l’exécution de la requête ou lorsqu’elle est en cours d’exécution, l’indicateur isFinalPlan du nœud correspondant AdaptiveSparkPlan s’affiche comme false; une fois l’exécution de la requête terminée, l’indicateur isFinalPlan passe à true.
Plan actuel et initial
Sous chaque AdaptiveSparkPlan nœud, il y aura à la fois le plan initial (le plan avant d’appliquer des optimisations AQE) et le plan actuel ou final, selon que l’exécution est terminée. Le plan actuel évoluera à mesure que l’exécution progresse.
Statistiques d’exécution
Chaque étape de remaniement et de diffusion contient des statistiques de données.
Avant l’exécution de l’étape ou lorsque l’étape est en cours d’exécution, les statistiques sont des estimations au moment de la compilation et l’indicateur isRuntime est false, par exemple : Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);
Une fois l’exécution de l’étape terminée, les statistiques sont celles collectées au moment de l’exécution, et l’indicateur isRuntime devient true, par exemple : Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)
Voici un DataFrame.explain exemple :
Avant l’exécution
Pendant l’exécution
Après l’exécution
SQL EXPLAIN
AdaptiveSparkPlan noeud
Les requêtes appliquées par AQE contiennent un ou plusieurs nœuds AdaptiveSparkPlan, généralement en tant que nœud racine de chaque requête principale ou sous-requête.
Aucun plan actuel
Comme SQL EXPLAIN il n’exécute pas la requête, le plan actuel est toujours identique au plan initial et ne reflète pas ce qui finirait par être exécuté par AQE.
Voici un exemple d’explication SQL :
Efficacité
Le plan de requête change si une ou plusieurs optimisations AQE prennent effet. L’effet de ces optimisations AQE est démontré par la différence entre les plans actuels et finaux et le plan initial et les nœuds de plan spécifiques dans les plans actuels et finaux.
Modifier dynamiquement la jointure de fusion de tri en jointure de hachage de diffusion : différents nœuds de jointure physique entre le plan actuel/final et le plan initial
Fusion dynamique des partitions : nœud
CustomShuffleReaderavec propriétéCoalesced
< c0>
Chaîne de lecture aléatoire personnalisée Gérer dynamiquement la jointure d’asymétrie : nœud
SortMergeJoinavec le champisSkewen tant que vrai.
Détecter et propager dynamiquement les relations vides : une partie (ou l’intégralité) du plan est remplacée par le nœud LocalTableScan avec le champ de la relation vide.
Paramétrage
Dans cette section :
- Activer et désactiver l’exécution de requêtes adaptatives
- Activer le shuffle optimisé automatiquement
- Modifier dynamiquement la jointure de fusion de tri en jointure de hachage de diffusion
- Fusion dynamique des partitions
- Gérer dynamiquement la jointure biaisée
- Détecter et propager dynamiquement des relations vides
Activer et désactiver l’exécution de requêtes adaptatives
| Propriété |
|---|
|
spark.databricks.optimizer.adaptive.enabled Entrez : BooleanIndique s’il faut activer ou désactiver l’exécution de requête adaptative. Valeur par défaut : true |
Activer le shuffle optimisé automatiquement
| Propriété |
|---|
|
spark.sql.shuffle.partitions Entrez : IntegerNombre par défaut de partitions à utiliser lors de la synchronisation des données pour les jointures ou les agrégations. La définition de la valeur auto active le shuffle optimisé automatiquement, ce qui détermine automatiquement ce nombre en fonction du plan de requête et de la taille des données d’entrée de requête.Remarque : Pour Structured Streaming, cette configuration ne peut pas être modifiée entre les redémarrages de requête à partir du même emplacement de point de contrôle. Valeur par défaut : 200 |
Modifier dynamiquement la jointure de fusion de tri en jointure de hachage de diffusion
| Propriété |
|---|
|
spark.databricks.adaptive.autoBroadcastJoinThreshold Entrez : Byte StringSeuil pour déclencher le basculement vers la jonction de diffusion au moment de l’exécution. Valeur par défaut : 30MB |
Fusion dynamique des partitions
| Propriété |
|---|
|
spark.sql.adaptive.coalescePartitions.enabled Entrez : BooleanIndique s’il faut activer ou désactiver la fusion de partition. Valeur par défaut : true |
|
spark.sql.adaptive.advisoryPartitionSizeInBytes Entrez : Byte StringTaille cible après fusion. Les tailles de partition coalescées seront proches, mais pas plus grandes que cette taille cible. Valeur par défaut : 64MB |
|
spark.sql.adaptive.coalescePartitions.minPartitionSize Entrez : Byte StringTaille minimale des partitions après fusion. Les tailles de partitions fusionnées ne seront pas inférieures à cette taille. Valeur par défaut : 1MB |
|
spark.sql.adaptive.coalescePartitions.minPartitionNum Entrez : IntegerNombre minimal de partitions après fusion. Non recommandé, car le paramètre remplace explicitement spark.sql.adaptive.coalescePartitions.minPartitionSize.Valeur par défaut : 2 fois le nombre de cœurs de cluster |
Gérer dynamiquement la jointure déséquilibrée
| Propriété |
|---|
|
spark.sql.adaptive.skewJoin.enabled Entrez : BooleanIndique s’il faut activer ou désactiver la gestion des jointures asymétriques. Valeur par défaut : true |
|
spark.sql.adaptive.skewJoin.skewedPartitionFactor Entrez : IntegerFacteur qui, lorsqu’il est multiplié par la taille de partition médiane, contribue à déterminer si une partition est asymétrique. Valeur par défaut : 5 |
|
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes Entrez : Byte StringSeuil qui contribue à déterminer si une partition est asymétrique. Valeur par défaut : 256MB |
Une partition est considérée comme asymétrique lorsque les deux (partition size > skewedPartitionFactor * median partition size) et (partition size > skewedPartitionThresholdInBytes) sont true.
Détecter et propager dynamiquement des relations vides
| Propriété |
|---|
|
spark.databricks.adaptive.emptyRelationPropagation.enabled Entrez : BooleanIndique s’il faut activer ou désactiver la propagation dynamique d’une relation vide. Valeur par défaut : true |
Questions fréquemment posées (FAQ)
Dans cette section :
- Pourquoi AQE n’a-t-il pas émis une petite table de jonction ?
- Dois-je toujours utiliser un indicateur de stratégie de jonction de diffusion avec AQE activé ?
- Quelle est la différence entre l’indicateur de jointure asymétrique et l’optimisation de jointure AQE ? Lequel dois-je utiliser ?
- Pourquoi l'AQE n'a-t-il pas ajusté automatiquement l'ordre de jointure ?
- Pourquoi AQE n’a-t-il pas détecté mon asymétrie des données ?
Pourquoi AQE n’a-t-il pas diffusé une petite table de jointure ?
Si la taille de la relation censée être diffusée est inférieure à ce seuil, mais n’est toujours pas diffusée :
- Vérifiez le type de jointure. La diffusion n’est pas prise en charge pour certains types de jointure, par exemple, la relation gauche d’un
LEFT OUTER JOINne peut pas être diffusée. - Il peut également s’agir du fait que la relation contient beaucoup de partitions vides, auquel cas la majorité des tâches peuvent se terminer rapidement avec la jointure de fusion de tri ou peut être optimisée avec la gestion des jointures asymétriques. AQE évite de modifier ces jointures de fusion de tri en jointures de hachage de diffusion si le pourcentage de partitions non vides est inférieur à
spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.
Dois-je toujours utiliser un indicateur de stratégie de jonction de diffusion avec AQE activé ?
Oui. Une jointure de diffusion planifiée statiquement est généralement plus performante qu’une jointure planifiée dynamiquement par AQE, car AQE peut ne pas basculer vers la jointure de diffusion qu'après avoir effectué une opération de redistribution des deux côtés de la jointure (au moment où les tailles de relation réelles sont obtenues). Ainsi, l’utilisation d’un indicateur de diffusion peut toujours être un bon choix si vous connaissez bien votre requête. AQE respecte les indicateurs de requête de la même façon que l’optimisation statique, mais peut toujours appliquer des optimisations dynamiques qui ne sont pas affectées par les indicateurs.
Quelle est la différence entre l’indicateur de jointure asymétrique et l’optimisation de jointure AQE ? Lequel dois-je utiliser ?
Il est recommandé de s’appuyer sur la gestion des jointures d’asymétrie AQE plutôt que d’utiliser l’indicateur de jointure d’asymétrie AQE, car la jointure d’asymétrie AQE est complètement automatique et en général fonctionne mieux que l’équivalent de l’indicateur.
Pourquoi AQE n’a-t-il pas ajusté automatiquement ma commande de jointure ?
La réorganisation dynamique des jointures ne fait pas partie de l'AQE.
Pourquoi AQE n’a-t-il pas détecté mon asymétrie des données ?
Il existe deux conditions de taille qui doivent être satisfaites pour que L’AQE détecte une partition en tant que partition asymétrique :
- La taille de la partition est supérieure à la
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytestaille (par défaut, 256 Mo) - La taille de la partition est supérieure à la taille médiane de toutes les partitions fois le facteur
spark.sql.adaptive.skewJoin.skewedPartitionFactorde partition asymétrique (par défaut 5)
En outre, la prise en charge de la gestion des asymétries est limitée pour certains types de jointure, par exemple, dans LEFT OUTER JOIN, uniquement l’asymétrie sur le côté gauche peut être optimisée.
Héritage
Le terme « Exécution adaptative » existe depuis Spark 1.6, mais le nouvel AQE dans Spark 3.0 est fondamentalement différent. En termes de fonctionnalité, Spark 1.6 ne fait que la partie « partitions de fusion dynamique ». En termes d’architecture technique, le nouvel AQE est une infrastructure de planification dynamique et de replanification des requêtes basées sur des statistiques d’exécution, qui prend en charge diverses optimisations telles que celles que nous avons décrites dans cet article et peut être étendue pour permettre davantage d’optimisations potentielles.