Partilhar via


Execução de consulta adaptável

A execução adaptativa de consultas (AQE) é a reotimização de consultas que ocorre durante a execução da consulta.

A motivação para a reotimização do tempo de execução é que o Azure Databricks tem as estatísticas precisas de data mais up-tono final de uma troca aleatória e de transmissão (referida como um estágio de consulta no AQE). Como resultado, o Azure Databricks pode optar por uma estratégia física melhor, escolher um tamanho e número de partição pós-embaralhamento ideais ou fazer otimizações que costumavam exigir dicas, por exemplo, manipulação de junção distorcida.

Isto pode ser muito útil quando a recolha de estatísticas não está ativada ou quando as estatísticas estão obsoletas. Também é útil em locais onde as estatísticas derivadas estaticamente são imprecisas, como no meio de uma consulta complicada ou após a ocorrência de distorção de dados.

Capacidades

O AQE está habilitado por padrão. Tem 4 características principais:

  • Altera dinamicamente a junção de mesclagem de classificação na junção de hash de transmissão.
  • Aglutina dinamicamente partições (combine pequenas partições em partições de tamanho razoável) após a troca aleatória. Tarefas muito pequenas têm pior taxa de transferência de E/S e tendem a sofrer mais com a sobrecarga de agendamento e a sobrecarga de configuração de tarefas. A combinação de pequenas tarefas economiza recursos e melhora a taxa de transferência do cluster.
  • Manipula dinamicamente a inclinação na junção de mesclagem de classificação e a junção de hash aleatória dividindo (e replicando, se necessário) tarefas distorcidas em tarefas de tamanho aproximadamente uniforme.
  • Deteta e propaga dinamicamente relações vazias.

Aplicação

AQE aplica-se a todas as consultas que sejam:

  • Não transmissão em fluxo
  • Contêm pelo menos uma troca (geralmente quando há uma junção, agregação ou janela), uma subconsulta ou ambas.

Nem todas as consultas aplicadas ao AQE são necessariamente reotimizadas. A reotimização pode ou não apresentar um plano de consulta diferente daquele compilado estaticamente. Para determinar se o plano de uma consulta foi alterado pelo AQE, consulte a seção a seguir, Planos de consulta.

Planos de consulta

Esta seção discute como você pode examinar planos de consulta de diferentes maneiras.

Nesta secção:

Interface do usuário do Spark

AdaptiveSparkPlan

As consultas aplicadas ao AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente servindo como o nó raiz de cada consulta principal ou subconsulta. Antes de a consulta ser executada ou quando estiver em execução, o sinalizador de isFinalPlan do nó AdaptiveSparkPlan correspondente é exibido como false; Após a conclusão da execução da consulta, o sinalizador isFinalPlan muda para true.

Plano evolutivo

O diagrama de plano de consulta evolui à medida que a execução progride e reflete o plano mais atual que está sendo executado. Os nós que já foram executados (nos quais as métricas estão disponíveis) não serão alterados, mas aqueles que não foram executados podem mudar ao longo do tempo como resultado de reotimizações.

Segue-se um exemplo de diagrama de plano de consulta:

Diagrama de plano de consulta

DataFrame.explain()

AdaptiveSparkPlan

As consultas aplicadas ao AQE contêm um ou mais nós AdaptiveSparkPlan, geralmente servindo como o nó raiz de cada consulta principal ou subconsulta. Antes de a consulta ser executada ou quando estiver em execução, o sinalizador de isFinalPlan do nó AdaptiveSparkPlan correspondente é exibido como false; Após a conclusão da execução da consulta, o sinalizador isFinalPlan muda para true.

Plano atual e plano inicial

Sob cada nó AdaptiveSparkPlan, existirá o plano inicial (o plano antes de aplicar quaisquer otimizações AQE) e o plano atual ou o plano final, dependendo se a execução foi concluída. O plano atual evoluirá à medida que a execução progride.

Estatísticas de tempo de execução

Cada etapa de baralhamento e transmissão contém estatísticas de dados.

Antes de o estágio começar ou quando o estágio está em execução, as estatísticas são estimativas de tempo de compilação, e o indicador isRuntime é false, por exemplo: Statistics(sizeInBytes=1024.0 KiB, rowCount=4, isRuntime=false);.

Após a conclusão da execução do estágio, as estatísticas são aquelas coletadas em tempo de execução, e o sinalizador isRuntime se tornará true, por exemplo: Statistics(sizeInBytes=658.1 KiB, rowCount=2.81E+4, isRuntime=true)

O seguinte é um exemplo DataFrame.explain:

  • Antes da execução

    Antes da execução

  • Durante a execução

    Durante a execução

  • Após a execução

    Após a execução

SQL EXPLAIN

AdaptiveSparkPlan

As consultas em que o AQE é aplicado contêm um ou mais nós AdaptiveSparkPlan, geralmente atuando como o nó raiz de cada consulta principal ou subconsulta.

Nenhum plano atual

Como SQL EXPLAIN não executa a consulta, o plano atual é sempre o mesmo que o plano inicial e não reflete o que acabaria por ser executado pela AQE.

Segue-se um exemplo de explicação SQL:

SQL explicam

Eficácia

O plano de consulta será alterado se uma ou mais otimizações AQE entrarem em efeito. O efeito dessas otimizações de AQE é demonstrado pela diferença entre os planos atual e final e os nós do plano inicial e do plano específico nos planos atual e final.

  • Alterar dinamicamente a junção de ordenação por mesclagem em junção de hash por difusão: diferentes nós de junção física entre o plano atual/final e o plano inicial.

    Cadeia de estratégia Join

  • Unir partições dinamicamente: nó CustomShuffleReader com propriedade Coalesced

    Leitor aleatório personalizado

    String de leitura aleatória personalizada

  • Manipule dinamicamente a junção 'skew': nó SortMergeJoin com campo isSkew como verdadeiro.

    Skew juntam-se ao plano

    Junção desalinhada de cadeia

  • Deteção e propagação dinâmica de relações vazias: parte (ou a totalidade) do plano é substituída pelo nó LocalTableScan com o campo de relação vazio.

    Verificação local de tabela

    Cadeia de caracteres de análise de tabela local

Configuração

Nesta secção:

Habilitar e desabilitar a execução de consultas adaptáveis

Propriedade
spark.databricks.optimizer.adaptive.enabled
Tipo: Boolean
Se a execução de consulta adaptável deve ser habilitada ou desabilitada.
Valor padrão: true

Habilite o shuffle otimizado automaticamente

Propriedade
spark.sql.shuffle.partições
Tipo: Integer
O número padrão de partições a serem usadas ao embaralhar dados para junções ou agregações. A definição do valor auto permite o shuffle otimizado automaticamente, que determina automaticamente esse número com base no plano de consulta e no tamanho dos dados de entrada da consulta.
Nota: Para Streaming Estruturado, essa configuração não pode ser alterada entre reinicializações de consulta do mesmo local de ponto de verificação.
Valor padrão: 200

Alterar dinamicamente a junção de mesclagem de classificação em junção de hash de difusão

Propriedade
spark.databricks.adaptive.autoBroadcastJoinThreshold
Tipo: Byte String
O limite para acionar a mudança para junção por difusão em tempo de execução.
Valor padrão: 30MB

Aglutinar partições dinamicamente

Propriedade
spark.sql.adaptive.coalescePartitions.enabled
Tipo: Boolean
Se é necessário ativar ou desativar a coalescência de partições.
Valor padrão: true
spark.sql.adaptive.advisoryPartitionSizeInBytes
Tipo: Byte String
O tamanho do alvo após a coalescência. Os tamanhos das partições coalescidas serão próximos, mas não maiores do que este tamanho-alvo.
Valor padrão: 64MB
spark.sql.adaptive.coalescePartitions.minPartitionSize
Tipo: Byte String
O tamanho mínimo das partições após a coalescência. Os tamanhos das partições aglutinadas não serão inferiores a este tamanho.
Valor padrão: 1MB
spark.sql.adaptive.coalescePartitions.minPartitionNum
Tipo: Integer
O número mínimo de partições após a coalescência. Não recomendado, porque a configuração substitui explicitamente
spark.sql.adaptive.coalescePartitions.minPartitionSize.
Valor padrão: 2x o número de núcleos do cluster

Manipule dinamicamente a junção inclinada

Propriedade
spark.sql.adaptive.skewJoin.enabled
Tipo: Boolean
Se é necessário ativar ou desativar o tratamento de junção de distorção.
Valor padrão: true
spark.sql.adaptive.skewJoin.skewedPartitionFactor
Tipo: Integer
Um fator que, quando multiplicado pelo tamanho mediano da partição, contribui para determinar se uma partição está enviesada.
Valor padrão: 5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
Tipo: Byte String
Um limite que contribui para determinar se uma partição está distorcida.
Valor padrão: 256MB

Uma partição é considerada enviesada quando (partition size > skewedPartitionFactor * median partition size) e (partition size > skewedPartitionThresholdInBytes) são true.

Detetar e propagar dinamicamente relações vazias

Propriedade
spark.databricks.adaptive.emptyRelationPropagation.enabled
Tipo: Boolean
Se deseja habilitar ou desabilitar a propagação dinâmica de relações vazias.
Valor padrão: true

Perguntas frequentes (FAQ)

Nesta secção:

Porque a AQE não transmitiu uma pequena tabela de junção?

Se o tamanho da relação que se espera difundir estiver abaixo deste limite, mas ainda assim não for difundida:

  • Verifique o tipo de junção. A transmissão não é suportada para certos tipos de junção, por exemplo, a relação esquerda de um LEFT OUTER JOIN não pode ser transmitida.
  • Também pode ser que a relação contenha muitas partições vazias, caso em que a maioria das tarefas pode terminar rapidamente com junção por classificação e mesclagem ou pode ser potencialmente otimizada com manipulação de desvio de junção. AQE evita alterar essas junções de ordenação e mesclagem para junções de hash de difusão se a porcentagem de partições não vazias for menor que spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin.

Ainda devo usar uma sugestão de estratégia de junção de disseminação com o AQE ativada?

Sim. Uma junção de transmissão planeada estaticamente é geralmente mais eficiente do que uma junção planeada dinamicamente pela AQE, pois a AQE pode não mudar para a junção de transmissão até depois de executar o shuffle para ambos os lados da junção (quando os tamanhos de relação reais são obtidos). Portanto, usar uma dica de transmissão ainda pode ser uma boa escolha se você conhecer bem sua consulta. O AQE respeitará as dicas de consulta da mesma forma que a otimização estática, mas ainda poderá aplicar otimizações dinâmicas que não são afetadas pelas dicas.

Qual é a diferença entre a indicação de junção desbalanceada e a otimização de junção desbalanceada AQE? Qual devo usar?

Recomenda-se confiar no manuseio de junção inclinada AQE em vez de usar a dica de junção inclinada, porque a junção inclinada AQE é completamente automática e, em geral, tem um desempenho melhor do que a contraparte de dica.

Porque é que a AQE não ajustou a minha encomenda de adesão automaticamente?

A reordenação dinâmica da junção não faz parte da AQE.

Por que a AQE não detetou minha distorção de dados?

Existem duas condições de tamanho que precisam ser cumpridas para que o AQE detete uma partição como uma partição enviesada:

  • O tamanho da partição é maior do que o spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes (padrão 256MB)
  • O tamanho da partição é maior do que o tamanho mediano de todas as partições vezes o fator de partição enviesado spark.sql.adaptive.skewJoin.skewedPartitionFactor (padrão 5)

Além disso, o suporte de manipulação de distorção é limitado para certos tipos de junção, por exemplo, em LEFT OUTER JOIN, apenas a inclinação no lado esquerdo pode ser otimizada.

Legado

O termo "Execução Adaptativa" existe desde o Spark 1.6, mas o novo AQE no Spark 3.0 é fundamentalmente diferente. Em termos de funcionalidade, o Spark 1.6 faz apenas a parte de "coalescência dinâmica de partições". Em termos de arquitetura técnica, o novo AQE é uma estrutura de planejamento dinâmico e replanejamento de consultas com base em estatísticas de tempo de execução, que suporta uma variedade de otimizações como as que descrevemos neste artigo e pode ser estendida para permitir mais otimizações potenciais.