Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Este artigo descreve a solicitação clone a pipeline na API REST do Databricks e como você pode usá-la para copiar um pipeline existente que publica no metastore do Hive para um novo pipeline publicado no Catálogo do Unity. Quando você chama a solicitação clone a pipeline , ela:
- Copia o código e as configurações do pipeline existente para um novo, aplicando as alterações de configuração que você especificou.
- Atualiza as definições e referências de visão materializada e tabela de streaming com as alterações necessárias para que esses objetos sejam gerenciados pelo Catálogo do Unity.
- Inicia uma atualização de pipeline para migrar os dados e metadados existentes, como pontos de verificação, para todas as tabelas de streaming no pipeline. Isso permite que as tabelas de Streaming retomem o processamento exatamente no mesmo ponto que o pipeline original.
Depois que a operação de clone for concluída, tanto os pipelines originais quanto os novos poderão ser executados de forma independente.
Este artigo inclui exemplos de como chamar a solicitação de API diretamente e por meio de um script Python de um bloco de anotações do Databricks.
Antes de começar
É necessário o seguinte antes de clonar um pipeline:
Para clonar um pipeline de metastore do Hive, é necessário que as tabelas e visões definidas no pipeline publiquem tabelas em um esquema de destino. Para saber como adicionar um esquema de destino a um pipeline, consulte Configurar um pipeline para publicar no metastore do Hive.
As referências a tabelas gerenciadas do metastore do Hive ou exibições no pipeline para clonar devem ser totalmente qualificadas com o catálogo (
hive_metastore), o esquema e o nome da tabela. Por exemplo, no código a seguir criando umcustomersconjunto de dados, o argumento nome da tabela deve ser atualizado parahive_metastore.sales.customers:@dp.table def customers(): return spark.read.table("sales.customers").where(...)Não edite o código-fonte do pipeline do metastore do Hive de origem enquanto uma operação de clonagem estiver em execução, incluindo notebooks configurados como parte do pipeline e quaisquer módulos armazenados em pastas de Git ou arquivos de espaço de trabalho.
O pipeline de metastore do Hive de origem não deve estar em execução quando você iniciar a operação de clone. Se uma atualização estiver em execução, interrompa-a ou aguarde até que ela seja concluída.
A seguir estão outras considerações importantes antes de clonar um pipeline:
- Se as tabelas no pipeline de metastore do Hive especificarem um local de armazenamento usando o argumento
pathem Python ouLOCATIONno SQL, passe a configuração"pipelines.migration.ignoreExplicitPath": "true"para a solicitação de clone. Definir essa configuração está incluída nas instruções abaixo. - Se o pipeline de metastore do Hive incluir uma origem do Carregador Automático que especifica um valor para a opção
cloudFiles.schemaLocatione o pipeline de metastore do Hive permanecerá operacional após a criação do clone do Catálogo do Unity, você deverá definir a opçãomergeSchematrueno pipeline metastore do Hive e no pipeline clonado do Catálogo do Unity. Adicionar essa opção ao metastore do Hive antes da clonagem fará com que a opção seja copiada para o novo pipeline.
** Clonar um fluxo de trabalho com a API REST do Databricks
O exemplo a seguir usa o curl comando para chamar a solicitação clone a pipeline na API REST do Databricks:
curl -X POST \
--header "Authorization: Bearer <personal-access-token>" \
<databricks-instance>/api/2.0/pipelines/<pipeline-id>/clone \
--data @clone-pipeline.json
Substituir:
-
<personal-access-token>com um token de acesso pessoal do Databricks. -
<databricks-instance>com o nome da instância do workspace do Azure Databricks, por exemploadb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>com o identificador exclusivo do pipeline metastore do Hive a ser clonado. Você pode encontrar a ID do pipeline na interface do usuário de pipelines.
clone-pipeline.json:
{
"catalog": "<target-catalog-name>",
"target": "<target-schema-name>",
"name": "<new-pipeline-name>"
"clone_mode": "MIGRATE_TO_UC",
"configuration": {
"pipelines.migration.ignoreExplicitPath": "true"
}
}
Substituir:
-
<target-catalog-name>com o nome de um catálogo no Catálogo do Unity onde o novo pipeline deve ser publicado. Esse deve ser um catálogo existente. -
<target-schema-name>com o nome de um esquema no Unity Catalog ao qual o novo pipeline deve ser publicado, caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não especificado, o nome do esquema existente é usado. -
<new-pipeline-name>com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com[UC]acrescentado.
clone_mode especifica o modo a ser usado para a operação de clone.
MIGRATE_TO_UC é a única opção com suporte.
Utilize o campo configuration para especificar configurações no novo pipeline. Os valores definidos aqui substituem as configurações no pipeline original.
A resposta da solicitação da clone API REST é o ID do novo pipeline do Catálogo Unity.
Clonar um pipeline a partir de um notebook do Databricks
O exemplo a seguir chama a solicitação create a pipeline de um script Python. Você pode usar um bloco de anotações do Databricks para executar este script:
- Crie um novo bloco de anotações para o script. Consulte Criar um notebook.
- Copie o script Python a seguir para a primeira célula do notebook.
- Atualize os valores de espaço reservado no script substituindo:
-
<databricks-instance>com o nome da instância do workspace do Azure Databricks, por exemploadb-1234567890123456.7.azuredatabricks.net -
<pipeline-id>com o identificador exclusivo do pipeline metastore do Hive a ser clonado. Você pode encontrar a ID do pipeline na interface do usuário de pipelines. -
<target-catalog-name>com o nome de um catálogo no Catálogo do Unity onde o novo pipeline deve ser publicado. Esse deve ser um catálogo existente. -
<target-schema-name>com o nome de um esquema no Unity Catalog ao qual o novo pipeline deve ser publicado, caso seja diferente do nome do esquema atual. Esse parâmetro é opcional e, se não especificado, o nome do esquema existente é usado. -
<new-pipeline-name>com um nome opcional para o novo pipeline. Se não for especificado, o novo pipeline será nomeado usando o nome do pipeline de origem com[UC]acrescentado.
-
- Executar o script. Consulte a seção Executar notebooks do Databricks.
import requests
# Your Databricks workspace URL, with no trailing spaces
WORKSPACE = "<databricks-instance>"
# The pipeline ID of the Hive metastore pipeline to clone
SOURCE_PIPELINE_ID = "<pipeline-id>"
# The target catalog name in Unity Catalog
TARGET_CATALOG = "<target-catalog-name>"
# (Optional) The name of a target schema in Unity Catalog. If empty, the same schema name as the Hive metastore pipeline is used
TARGET_SCHEMA = "<target-schema-name>"
# (Optional) The name of the new pipeline. If empty, the following is used for the new pipeline name: f"{originalPipelineName} [UC]"
CLONED_PIPELINE_NAME = "<new-pipeline-name>"
# This is the only supported clone mode
CLONE_MODE = "MIGRATE_TO_UC"
# Specify override configurations
OVERRIDE_CONFIGS = {"pipelines.migration.ignoreExplicitPath": "true"}
def get_token():
ctx = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
return getattr(ctx, "apiToken")().get()
def check_source_pipeline_exists():
data = requests.get(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}",
headers={"Authorization": f"Bearer {get_token()}"},
)
assert data.json()["pipeline_id"] == SOURCE_PIPELINE_ID, "The provided source pipeline does not exist!"
def request_pipeline_clone():
payload = {
"catalog": TARGET_CATALOG,
"clone_mode": CLONE_MODE,
}
if TARGET_SCHEMA != "":
payload["target"] = TARGET_SCHEMA
if CLONED_PIPELINE_NAME != "":
payload["name"] = CLONED_PIPELINE_NAME
if OVERRIDE_CONFIGS:
payload["configuration"] = OVERRIDE_CONFIGS
data = requests.post(
f"{WORKSPACE}/api/2.0/pipelines/{SOURCE_PIPELINE_ID}/clone",
headers={"Authorization": f"Bearer {get_token()}"},
json=payload,
)
response = data.json()
return response
check_source_pipeline_exists()
request_pipeline_clone()
Limitações
Veja a seguir as limitações da requisição da API de Pipelines Declarativos clone a pipeline do Lakeflow Spark:
- Apenas é suportada a clonagem de um pipeline configurado para usar o metastore do Hive para um pipeline do Catálogo do Unity.
- Você pode criar um clone apenas no mesmo espaço de trabalho do Azure Databricks que o pipeline do qual você está clonando.
- O pipeline que você está clonando pode incluir apenas as seguintes fontes de streaming:
- Fontes delta
- Carregador Automático, incluindo quaisquer fontes de dados compatíveis com o Carregador Automático. Consulte Carregar arquivos do armazenamento de objetos na nuvem.
- Apache Kafka com Streaming Estruturado. No entanto, a origem Kafka não pode ser configurada para utilizar a opção
kafka.group.id. Confira Processamento de fluxos com o Apache Kafka e o Azure Databricks. - Amazon Kinesis com streaming estruturado. No entanto, a origem Kinesis não pode ser configurada para definir
consumerModecomoefo.
- Se o pipeline de metastore do Hive que você está clonando usar o modo de notificação de arquivo do Auto Loader, o Databricks recomenda não executá-lo após a clonagem. Isso ocorre porque a execução do Hive metastore pipeline resulta na queda de alguns eventos de notificação de arquivo do clone do Unity Catalog. Se o pipeline de metastore do Hive de origem for executado após a conclusão da operação de clone, você poderá fazer o backup de arquivos ausentes usando o Carregador Automático com a opção
cloudFiles.backfillInterval. Para saber mais sobre o modo de notificação de arquivo do Carregador Automático, consulte Configurar fluxos do Carregador Automático no modo de notificação de arquivo. Para saber mais sobre como fazer o backfilling de arquivos com o Carregador Automático, consulte Disparar backfills regulares usando cloudFiles.backfillInterval e Opções Comuns do Carregador Automático. - As tarefas de manutenção de pipeline são pausadas automaticamente para ambos os pipelines enquanto a clonagem está em andamento.
- Aplica-se o seguinte às consultas de viagem no tempo em tabelas na pipeline clonada do Catálogo Unity:
- Se uma versão de tabela foi originalmente gravada em um objeto gerenciado pelo metastore do Hive, as consultas de time travel que utilizam uma cláusula
timestamp_expressionserão indefinidas ao consultar o objeto clonado do Catálogo Unity. - No entanto, se a versão da tabela tiver sido gravada no objeto clonado do Catálogo do Unity, as consultas de viagem no tempo usando uma
timestamp_expressioncláusula funcionarão corretamente. - Consultas de viagem no tempo que utilizam uma cláusula
versionfuncionam corretamente ao consultar um objeto clonado do Catálogo Unity, mesmo quando a versão foi originalmente registrada no objeto gerenciado pelo metastore do Hive.
- Se uma versão de tabela foi originalmente gravada em um objeto gerenciado pelo metastore do Hive, as consultas de time travel que utilizam uma cláusula
- Para obter outras limitações ao usar pipelines declarativos do Lakeflow Spark com o Unity Catalog, consulte as limitações de pipelines do Unity Catalog.
- Para limitações do Catálogo do Unity, consulte as limitações do Catálogo do Unity.