Compartilhar via


Execução de consulta adaptável

A execução de consulta adaptável (AQE) é a reotimização de consulta que ocorre durante a execução de consulta.

A motivação para a otimização de runtime é que o Azure Databricks tem as estatísticas precisas mais up-tono final de uma troca de embaralhamento e difusão (conhecida como estágio de consulta no AQE). Como resultado, o Azure Databricks pode optar por uma estratégia física melhor, escolher um número e tamanho de partição ideais após o embaralhamento ou fazer otimizações que costumavam exigir hints, por exemplo, gerenciamento de skew join.

Isso pode ser muito útil quando a coleta de estatísticas não está ativada ou quando as estatísticas estão obsoletas. Também é útil em locais em que estatísticas derivadas estaticamente são imprecisas, como no meio de uma consulta complicada ou após a ocorrência de distorção de dados.

Capacidades

O AQE está habilitado por padrão. Ela tem quatro recursos principais:

  • Altera dinamicamente a junção de mesclagem de classificação na junção hash de difusão.
  • Agrupa dinamicamente partições (combinar pequenas partições em partições de tamanho razoável) após a troca aleatória. Tarefas muito pequenas têm pior taxa de transferência de E/S e tendem a sofrer mais com sobrecarga de agendamento e sobrecarga de instalação de tarefas. A combinação de pequenas tarefas salva recursos e melhora a taxa de transferência do cluster.
  • Manipula dinamicamente a distorção em 'sort merge join' e em 'shuffle hash join' dividindo (e replicando, se necessário) tarefas desbalanceadas em tarefas de tamanho aproximadamente uniforme.
  • Detecta e propaga dinamicamente as relações vazias.

Aplicação

O AQE se aplica a todas as consultas que são:

  • Conteúdo não transmitido
  • Contém pelo menos um intercâmbio (geralmente quando há uma junção, agregação ou janela), uma subconsulta ou ambas.

Nem todas as consultas aplicadas ao AQE são necessariamente reotimizadas. A otimização pode ou não criar um plano de consulta diferente do compilado estaticamente. Para determinar se o plano de uma consulta foi alterado pelo AQE, consulte a seção a seguir, Planos de consulta.

Planos de consultas

Esta seção discute como você pode examinar os planos de consulta de diferentes maneiras.

Nesta seção:

Interface do usuário do Spark

AdaptiveSparkPlan nodo

As consultas aplicadas ao AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como o nó raiz de cada consulta principal ou subconsulta. Antes que a consulta seja executada ou quando ela estiver em execução, o isFinalPlan sinalizador do nó correspondente AdaptiveSparkPlan será exibido como false; depois que a execução da consulta for concluída, o isFinalPlan sinalizador será alterado para true.

Plano em evolução

O diagrama do plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Nós que já foram executados (nos quais as métricas estão disponíveis) não serão modificados, mas aqueles que ainda não foram executados podem ser modificados ao longo do tempo como resultado de otimizações novamente.

Veja a seguir um exemplo de diagrama de plano de consulta:

Diagrama do plano de consulta

DataFrame.explain()

AdaptiveSparkPlan nodo

As consultas aplicadas ao AQE contêm um ou mais AdaptiveSparkPlan nós, geralmente como o nó raiz de cada consulta principal ou subconsulta. Antes que a consulta seja executada ou quando ela estiver em execução, o isFinalPlan sinalizador do nó correspondente AdaptiveSparkPlan será exibido como false; depois que a execução da consulta for concluída, o isFinalPlan sinalizador será alterado para true.

Plano atual e inicial

Em cada nó AdaptiveSparkPlan, haverá tanto o plano inicial (o plano antes de aplicar as otimizações do AQE) quanto o plano atual ou final, dependendo de a execução ter sido concluída. O plano atual evoluirá conforme a execução progride.

Estatísticas de tempo de execução

Cada estágio de embaralhamento e distribuição contém estatísticas de dados.

Antes da execução do estágio ou quando o estágio está em execução, as estatísticas são estimativas de tempo de compilação e o sinalizador isRuntime é false, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);

Após a conclusão da execução do estágio, as estatísticas são as coletadas em runtime e o sinalizador isRuntime se tornará true, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

Veja a seguir um DataFrame.explain exemplo:

  • Antes da execução

    Antes da execução

  • Durante a execução

    Durante a execução

  • Após a execução

    Após a execução

SQL EXPLAIN

AdaptiveSparkPlan nodo

Consultas com AQE aplicado contêm um ou mais nós do tipo AdaptiveSparkPlan, geralmente como nó raiz de cada consulta principal ou subconsulta.

Nenhum plano atual

Como SQL EXPLAIN não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que eventualmente seria executado pelo AQE.

Veja a seguir um exemplo de explicação do SQL:

Explicação do SQL

Eficácia

O plano de consulta será alterado se uma ou mais otimizações do AQE entrarem em vigor. O efeito dessas otimizações do AQE é demonstrado pela diferença entre os planos atuais e finais e o plano inicial e os nós de plano específicos nos planos atuais e finais.

  • Alterar dinamicamente a junção de mesclagem ordenada para junção de hash por difusão: diferentes nós de junção física entre o plano atual/final e o plano inicial

    Cadeia de caracteres de estratégia de junção

  • Partições de coalescência dinamicamente: nó CustomShuffleReader com propriedade Coalesced

    Leitor de embaralhamento personalizado

    Cadeia de caracteres do leitor de embaralhamento personalizado

  • Manipular dinamicamente a junção de distorção: nó SortMergeJoin com o campo isSkew como verdadeiro.

    Plano de junção com assimetria

    Distorcer cadeia de caracteres de junção

  • Detectar e propagar dinamicamente as relações vazias: parte (ou a totalidade) do plano é substituída pelo nó LocalTableScan, com o campo de relação vazio.

    Verificação de tabela local

    Cadeia de varredura de tabela local

Configuração

Nesta seção:

Habilitar e desabilitar a execução de consulta adaptável

Propriedade
spark.databricks.optimizer.adaptive.enabled
Tipo: Boolean
Se deseja habilitar ou desabilitar a execução de consulta adaptável.
Valor padrão: true

Habilitar o embaralhamento otimizado automaticamente

Propriedade
spark.sql.shuffle.partitions
Tipo: Integer
O número padrão de partições a serem usadas ao embaralhar dados para junções ou agregações. Definir o valor auto ativa o embaralhamento otimizado automaticamente, que ajusta esse número com base no plano de consulta e no tamanho dos dados de entrada da consulta.
Observação: para Streaming Estruturado, essa configuração não pode ser alterada entre reinicializações de consulta do mesmo local de ponto de verificação.
Valor padrão: 200

Alterar dinamicamente a junção de mesclagem de ordenação na junção de hash transmitida

Propriedade
spark.databricks.adaptive.autoBroadcastJoinThreshold
Tipo: Byte String
O limite para desencadear a alternância para a junção de difusão no tempo de execução.
Valor padrão: 30MB

Coalescer partições dinamicamente.

Propriedade
spark.sql.adaptive.coalescePartitions.enabled
Tipo: Boolean
Habilitar ou desabilitar o coalescimento de partição.
Valor padrão: true
spark.sql.adaptive.advisoryPartitionSizeInBytes
Tipo: Byte String
O tamanho alvo após a coalescência. Os tamanhos de partição unificados estarão próximos, mas não maiores do que o tamanho alvo.
Valor padrão: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize
Tipo: Byte String
O tamanho mínimo das partições após a coalescagem. Os tamanhos das partições combinadas não devem ser menores que este tamanho.
Valor padrão: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum
Tipo: Integer
O número mínimo de partições após a coalescagem. Não recomendado, porque a configuração substitui explicitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.
Valor padrão: 2x número de núcleos do cluster

Manipular dinamicamente a junção assimétrica

Propriedade
spark.sql.adaptive.skewJoin.enabled
Tipo: Boolean
Caso deseje habilitar ou desabilitar o manuseio de junção de distorção.
Valor padrão: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Tipo: Integer
Um fator que, quando multiplicado pelo tamanho médio da partição, contribui para determinar se uma partição é distorcida.
Valor padrão: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Tipo: Byte String
Um limite que contribui para determinar se uma partição é distorcida.
Valor padrão: 256MB

Uma partição é considerada distorcida quando ambas (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true.

Detectar e propagar dinamicamente relações vazias

Propriedade
spark.databricks.adaptive.emptyRelationPropagation.enabled
Tipo: Boolean
Seja para habilitar ou desabilitar a propagação de relações vazias dinâmicas.
Valor padrão: true

Perguntas frequentes (FAQ)

Nesta seção:

Por que o AQE não transmitiu uma pequena tabela de junção?

Se o tamanho da relação que se espera que seja transmitida estiver abaixo desse limite, mas ainda assim não for transmitida:

  • Verifique o tipo de junção. Não há suporte para transmissão para determinados tipos de junção, por exemplo, a relação esquerda de um LEFT OUTER JOIN não pode ser transmitida.
  • Também pode ser que a relação contenha muitas partições vazias; nesse caso, a maioria das tarefas pode terminar rapidamente com a junção por ordenação e mesclagem ou pode ser otimizada com o tratamento de junção de distorção. O AQE evita alterar essas junções de mesclagem de classificação para transmitir junções de hash se o percentual de partições não vazias for menor que spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Ainda devo usar uma dica de estratégia de junção de difusão com o AQE habilitado?

Sim. Uma junção por difusão planejada estaticamente geralmente oferece melhor desempenho do que uma planejada dinamicamente pelo AQE, pois o AQE pode não mudar para a junção por difusão até depois de realizar a troca de dados para ambos os lados da junção (quando os tamanhos reais das relações são obtidos). Portanto, usar uma dica de transmissão ainda pode ser uma boa opção se você conhece bem sua consulta. O AQE respeitará as dicas de consulta da mesma maneira que a otimização estática, mas ainda pode aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre a sugestão de junção assimétrica e a otimização de junção assimétrica do AQE? Qual delas devo utilizar?

É recomendável confiar no tratamento de junção enviesada do AQE em vez de usar a dica de junção enviesada, pois a junção enviesada do AQE é completamente automática e, em geral, tem um desempenho melhor do que a versão com dica.

Por que o AQE não ajustou minha ordem de junção automaticamente?

A reordenação de junção dinâmica não faz parte do AQE.

Por que o AQE não detectou minha distorção de dados?

Há duas condições de tamanho que devem ser satisfeitas para que o AQE detecte uma partição como uma partição distorcida:

  • O tamanho da partição é maior que o spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (padrão de 256 MB)
  • O tamanho da partição é maior que o tamanho mediano de todas as partições vezes o fator spark.sql.adaptive.skewJoin.skewedPartitionFactor de partição distorcido (padrão 5)

Além disso, o suporte ao tratamento de distorção é limitado para determinados tipos de junção, por exemplo, em LEFT OUTER JOIN, somente a distorção no lado esquerdo pode ser otimizada.

Legado

O termo "Execução Adaptável" existe desde o Spark 1.6, mas o novo AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte de "coalesce dinamicamente as partições". Em termos de arquitetura técnica, o novo AQE é uma estrutura de planejamento dinâmico e replanificação de consultas com base em estatísticas de runtime, que dá suporte a uma variedade de otimizações, como as que descrevemos neste artigo e pode ser estendida para habilitar mais otimizações potenciais.