Compartilhar via


Copiar e transformar dados no Snowflake V2 usando o Azure Data Factory ou o Azure Synapse Analytics

APLICA-SE A: Azure Data Factory Azure Synapse Analytics

Tip

Experimente o Data Factory no Microsoft Fabric, uma solução de análise tudo-em-um para empresas. O Microsoft Fabric abrange desde movimentação de dados até ciência de dados, análise em tempo real, business intelligence e relatórios. Saiba como iniciar uma avaliação gratuita!

Este artigo descreve como usar a atividade Copy no Azure Data Factory e em pipelines do Azure Synapse para copiar dados de e para o Snowflake e como usar o Fluxo de Dados para transformar dados no Snowflake. Para mais informações, consulte o artigo introdutório do Data Factory ou do Azure Synapse Analytics.

Important

O conector Snowflake V1 está em fase de remoção. É recomendável atualizar o conector Snowflake da V1 para a V2.

Recursos com suporte

Esse conector Snowflake é compatível com as seguintes funcionalidades:

Recursos com suporte IR
Atividade de cópia (origem/coletor) (1) (2)
Mapeamento do fluxo de dados (origem/destino)
Atividade de pesquisa (1) (2)
Atividade de script (aplicar a versão 1.1 ao usar o parâmetro de script) (1) (2)

① Tempo de execução de integração do Azure ② Tempo de execução de integração auto-hospedado

No caso da atividade Copy, esse conector Snowflake dá suporte às seguintes funções:

  • Copiar os dados do Snowflake utilizando o comando do Snowflake COPY into [location] para obter o melhor desempenho.
  • Copie os dados para o Snowflake aproveitando o comando do Snowflake COPY into [table] para obter o melhor desempenho. Ele oferece suporte ao Snowflake no Azure.
  • Se um proxy for necessário para se conectar ao Snowflake a partir de um Integration Runtime auto-hospedado, você deverá configurar as variáveis de ambiente HTTP_PROXY e HTTPS_PROXY no host do Integration Runtime.

Prerequisites

Se o armazenamento de dados estiver localizado dentro de uma rede local, em uma rede virtual do Azure ou na Amazon Virtual Private Cloud, você precisará configurar um runtime de integração auto-hospedada para se conectar a ele. Adicione os endereços IP usados pelo runtime de integração auto-hospedada à lista de permissões.

Se o armazenamento de dados for um serviço de dados de nuvem gerenciado, você poderá usar o Azure Integration Runtime. Se o acesso for restrito aos IPs aprovados nas regras de firewall, você poderá adicionar IPs do Azure Integration Runtime à lista de permissões.

A conta Snowflake usada para Origem ou Coletor deve ter o acesso necessário USAGE no banco de dados e acesso de leitura/gravação no esquema e nas tabelas/visualizações sob ele. Além disso, ele também deve ter CREATE STAGE no esquema para poder criar o estágio Externo com a URI SAS.

Os valores de propriedades da Conta a seguir devem ser definidos

Property Description Required Default
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION Especifica se é necessário exigir um objeto de integração de armazenamento como credenciais de nuvem ao criar um estágio externo nomeado (usando CREATE STAGE) para acessar um local de armazenamento em nuvem privado. FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION Especifica se é necessário usar um estágio externo nomeado que referencie um objeto de integração de armazenamento como credenciais de nuvem ao carregar ou descarregar dados em um local de armazenamento em nuvem privado. FALSE FALSE

Para obter mais informações sobre os mecanismos de segurança de rede e as opções compatíveis com o Data Factory, consulte Estratégias de acesso a dados.

Introdução

Para executar a atividade de cópia com um pipeline, você pode usar uma das seguintes ferramentas ou SDKs:

Criar um serviço vinculado ao Snowflake usando a interface do usuário

Use as etapas a seguir para criar um serviço vinculado ao Snowflake na interface do usuário do portal do Microsoft Azure.

  1. Navegue até a guia Gerenciar em seu espaço de trabalho do Azure Data Factory ou do Synapse e selecione Serviços Vinculados, em seguida, clique em Novo:

  2. Pesquise por Snowflake e selecione o conector Snowflake.

    Captura de tela do conector Snowflake.

  3. Configure os detalhes do serviço, teste a conexão e crie o novo serviço vinculado.

    Captura de tela da configuração do serviço vinculado para Snowflake.

Detalhes da configuração do conector

As seções a seguir fornecem detalhes sobre as propriedades usadas para definir entidades específicas de um conector Snowflake.

Propriedades de serviço vinculadas

Há suporte para essas propriedades genéricas no serviço vinculado do Snowflake:

Property Description Required
tipo A propriedade Type deve estar definida como SnowflakeV2. Yes
versão A versão que você especifica. É recomendável atualizar para a versão mais recente para aproveitar os aprimoramentos mais recentes. Sim para a versão 1.1
accountIdentifier O nome da conta junto com sua organização. Por exemplo, myorg-account123. Yes
banco de dados O banco do dados padrão utilizado para a sessão após a conexão. Yes
warehouse O warehouse virtual padrão utilizado para a sessão após a conexão. Yes
authenticationType Tipo de autenticação utilizada para se conectar ao serviço Snowflake. Os valores permitidos são: Básico (Padrão) e KeyPair. Consulte as seções correspondentes abaixo para mais propriedades e exemplos, respectivamente. No
função A função de segurança padrão usada na sessão após a conexão. No
hospedar O nome do host da conta Snowflake. Por exemplo: contoso.snowflakecomputing.com. .cn também tem suporte. No
connectVia O tempo de execução de integração usado para conectar ao armazenamento de dados. Você poderá usar o runtime de integração do Azure ou um runtime de integração auto-hospedada (se o seu armazenamento de dados estiver localizado em uma rede privada). Se não for especificado, ele usará o runtime de integração padrão do Azure. No

Você pode definir as seguintes propriedades de conexão adicionais no serviço vinculado, dependendo do seu caso.

Property Description Required Valor padrão
UseUtcTimestamps Especifique-o para false para retornar o tipo TIMESTAMP_LTZ e o tipo TIMESTAMP_TZ com o fuso horário correto, e o tipo TIMESTAMP_NTZ sem informações de fuso horário. Especifique-o para true para retornar todos os tipos de carimbo de data/hora Snowflake em UTC. No false
esquema Especifica o esquema da sessão de consulta após a conexão. No /

Esse conector do Snowflake dá suporte aos seguintes tipos de autenticação. Consulte as seções correspondentes para obter detalhes.

autenticação Básica

Para usar a autenticação Básica, além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:

Property Description Required
usuário Nome de logon do usuário do Snowflake. Yes
senha A senha de usuário do Snowflake. Marque esse campo como um tipoSecureString para armazená-lo com segurança. Você também pode referenciar um segredo armazenado no Cofre de Chaves do Azure. Yes

Example:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Senha no Azure Key Vault:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Autenticação de par de chaves

Para usar a autenticação de Par de chaves, é necessário configurar e criar um usuário de autenticação de par de chaves no Snowflake, consultando Autenticação de par de chaves e rotação de par de chaves. Depois disso, anote a chave privada e a frase secreta (opcional), que você usa para definir o serviço vinculado.

Além das propriedades genéricas descritas na seção anterior, especifique as seguintes propriedades:

Property Description Required
usuário Nome de logon do usuário do Snowflake. Yes
privateKey A chave privada usada para a autenticação do par de chaves.

Para garantir que a chave privada seja válida quando enviada ao Azure Data Factory, e considerando que o arquivo privateKey inclui caracteres de nova linha (\n), é essencial formatar corretamente o conteúdo da privateKey em seu formato literal de cadeia de caracteres. Esse processo envolve adicionar \n explicitamente a cada nova linha.
Yes
privateKeyPassphrase A frase secreta usada para descriptografar a chave privada, se ela estiver criptografada. No

Example:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Note

Para mapear fluxos de dados, recomendamos gerar uma nova chave privada RSA usando o padrão PKCS#8 no formato PEM (arquivo.p8).

Propriedades do conjunto de dados

Para obter uma lista completa das seções e propriedades disponíveis para definir os conjuntos de dados, confira o artigo sobre Conjuntos de Dados.

As propriedades a seguir têm suporte para o conjunto de dados do Snowflake.

Property Description Required
tipo A propriedade Type do conjunto de dados deve ser definida como SnowflakeV2Table. Yes
esquema Nome do esquema. Observe que o nome do esquema diferencia maiúsculas de minúsculas. Não para fonte, sim para coletor
tabela Nome da tabela/exibição. Observe que o nome da tabela diferencia maiúsculas de minúsculas. Não para fonte, sim para coletor

Example:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

Propriedades da atividade de cópia

Para obter uma lista completa das seções e propriedades disponíveis para definir atividades, confia o artigo Pipelines. Essa seção fornece uma lista de propriedades suportadas pela origem e pelo coletor do Snowflake.

Snowflake como a origem

O conector do Snowflake utiliza o comando COPY into [local] do Snowflake para obter o melhor desempenho.

Se o armazenamento de dados e o formato do coletor forem suportados nativamente pelo comando COPY do Snowflake, você poderá usar a atividade Copy para copiar diretamente do Snowflake para o coletor. Para obter detalhes, consulte Cópia direta do Snowflake. Caso contrário, use a cópia interna Staged do Snowflake.

Ao copiar os dados do Snowflake, as seguintes propriedades têm suporte na seção de origem da atividade Copy.

Property Description Required
tipo A propriedade de tipo da fonte da atividade Copy deve ser definida como SnowflakeV2Source. Yes
consulta Especifica a consulta SQL para ler os dados do Snowflake. Se os nomes do esquema, tabela e colunas contiverem letras minúsculas, mencione o identificador de objeto na consulta, por exemplo, select * from "schema"."myTable".
Não há suporte para a execução de procedimentos armazenados.
No
exportSettings Configurações avançadas usadas para recuperar dados do Snowflake. Você pode configurar aqueles suportados pelo comando COPY into pelos quais o serviço passará quando você invocar a instrução. Yes
treatDecimalAsString Especifique para tratar o tipo decimal como tipo de cadeia de caracteres na atividade de pesquisa e script. O valor padrão é false.

Essa propriedade só tem suporte na versão 1.1.
No
Em exportSettings:
tipo O tipo de comando de exportação, definido como SnowflakeExportCopyCommand. Yes
storageIntegration Especifique o nome da integração de armazenamento que você criou no Snowflake. Para as etapas de pré-requisito para o uso da integração de armazenamento, consulte Configuração de uma Integração de Armazenamento Snowflake. No
additionalCopyOptions Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: MAX_FILE_SIZE, OVERWRITE. Para mais informações, consulte Opções de cópia do Snowflake. No
additionalFormatOptions Opções adicionais de formato de arquivo que são fornecidas ao comando COPY como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT, NULL_IF. Para mais informações, consulte Opções de tipo de formato do Snowflake.

Quando você usa NULL_IF, o valor NULL no Snowflake é convertido para o valor especificado (que precisa estar entre aspas simples) ao gravar no arquivo de texto delimitado no armazenamento temporário. Esse valor especificado é tratado como NULL ao ler do arquivo de preparação para o armazenamento de recebimento. O valor padrão é 'NULL'.
No

Note

Verifique se você tem permissão para executar o comando a seguir e acessar o esquema INFORMATION_SCHEMA e a tabela COLUMNS.

  • COPY INTO <location>

Cópia direta do Snowflake

Se o armazenamento de dados e o formato do seu coletor atenderem aos critérios descritos nessa seção, você poderá usar a atividade Copy para copiar diretamente do Snowflake para o coletor. O serviço verifica as configurações e falha na execução da atividade Copy se os critérios a seguir não forem atendidos:

  • Quando você especifica storageIntegration na origem:

    O repositório de dados de destino é o Azure Blob Storage que você mencionou na etapa externa no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:

    1. Crie um serviço vinculado do Armazenamento de Blobs do Azure para o coletor do Azure Blob Storage com qualquer tipo de autenticação compatível.

    2. Conceda pelo menos a função Storage Blob Data Contributor à entidade de serviço do Snowflake no coletor Armazenamento de Blobs do Azure Access Control (IAM).

  • Quando você não especifica storageIntegration na origem:

    O serviço vinculado ao coletor é o armazenamento de Blobs do Azure com autenticação de assinatura de acesso compartilhado. Se quiser copiar dados diretamente para o Azure Data Lake Storage Gen2 no seguinte formato suportado, você poderá criar um serviço vinculado ao Armazenamento de Blobs do Azure com autenticação SAS na sua conta do Azure Data Lake Storage Gen2 para evitar o uso de cópia em etapas do Snowflake.

  • O formato de dados do coletor é de Parquet, texto delimitado ou JSON, com as seguintes configurações:

    • Para o formato Parquet, o codec de compactação é Nenhum, Snappy ou Lzo.
    • Para formato de texto delimitado:
      • rowDelimiter é \r\n, ou qualquer caractere único.
      • compression pode ser sem compactação, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • quoteChar é aspas duplas, aspas simples ou cadeia de caracteres vazia (sem caracteres de aspas).
    • Para o formato JSON, a cópia direta dá suporte apenas ao caso em que a tabela ou resultado de consulta do Snowflake de origem tem uma única coluna e o tipo de dados dessa coluna seja VARIANT, OBJECT ou ARRAY.
      • compression pode ser sem compactação, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • filePattern na atividade de cópia o coletor é deixado como padrão ou definido como setOfObjects.
  • Na origem da atividade de cópia, additionalColumns não é especificado.

  • O mapeamento de colunas não está especificado.

Example:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "query": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Cópia encenada de Snowflake

Quando o armazenamento de dados ou formato do seu coletor não for nativamente compatível com o comando Snowflake COPY, conforme mencionado na última seção, habilite a cópia em etapas interna usando uma instância provisória do Armazenamento de Blobs do Azure. O recurso de cópia em etapas também proporciona uma melhor eficiência. O serviço exporta dados do Snowflake para o armazenamento de preparação, depois copia os dados para o coletor e, por fim, limpa seus dados temporários do armazenamento de preparação. Veja Cópia preparada para obter detalhes sobre como copiar dados usando preparação.

Para usar esse recurso, crie um serviço vinculado ao armazenamento de Blobs do Azure que se refere à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as propriedades enableStaging e stagingSettings na atividade Copy.

  • Quando você especifica storageIntegration na origem, o Armazenamento de Blobs do Azure temporário deve ser aquele ao qual você se referiu no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado ao Armazenamento de Blobs do Azure para ele com qualquer autenticação compatível ao usar o tempo de execução de integração do Azure, ou com autenticação anônima, de chave de conta, de assinatura de acesso compartilhado ou de entidade de serviço ao usar o tempo de execução de integração auto-hospedado. Além disso, conceda pelo menos a função Storage Blob Data Contributor à entidade de serviço do Snowflake no Armazenamento de Blobs do Azure Access Control (IAM) de preparação.

  • Quando você não especifica storageIntegration na origem, o serviço vinculado do Armazenamento de Blobs do Azure de preparação deve usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY. Certifique-se de conceder permissão de acesso adequada ao Snowflake no Armazenamento de Blobs do Azure de preparação. Para saber mais sobre isso, consulte este artigo.

Example:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "query": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"                    
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Ao executar uma cópia em etapas do Snowflake, é crucial definir o comportamento da cópia do coletor como Mesclar arquivos. Essa configuração garante que todos os arquivos particionados sejam manipulados e mesclados corretamente, evitando o problema em que apenas o último arquivo particionado é copiado.

Configuração de exemplo

{
    "type": "Copy",
    "source": {
        "type": "SnowflakeSource",
        "query": "SELECT * FROM my_table"
    },
    "sink": {
        "type": "AzureBlobStorage",
        "copyBehavior": "MergeFiles"
    }
}

Note

Não definir o Sink Copy Behavior como Arquivos de Mesclagem pode fazer com que somente o último arquivo particionado seja copiado.

Floco de neve como pia

O conector Snowflake utiliza o comando COPY into [table] do Snowflake para o melhor desempenho. Ele oferece suporte à gravação de dados no Snowflake no Azure.

Se o repositório de dados de origem e o formato forem suportados nativamente pelo comando COPY do Snowflake, é possível usar a atividade de cópia para transferir diretamente da origem para o Snowflake. Para mais detalhes, veja Cópia direta para Snowflake. Caso contrário, use a Cópia preparada para o Snowflake interna.

Para copiar dados para o Snowflake, as seguintes propriedades são suportadas na seção coletor da atividade Copy.

Property Description Required
tipo A propriedade de tipo do coletor de atividade Copy, definida como SnowflakeV2Sink. Yes
preCopyScript Especifique uma consulta SQL para a atividade Copy a ser executada antes de gravar dados no Snowflake em cada execução. Use essa propriedade para limpar os dados pré-carregados. No
importSettings Configurações avançadas usadas para gravar dados no Snowflake. Você pode configurar aqueles suportados pelo comando COPY into pelos quais o serviço passará quando você invocar a instrução. Yes
Em importSettings:
tipo O tipo de comando de exportação, definido como SnowflakeExportCopyCommand. Yes
storageIntegration Especifique o nome da integração de armazenamento que você criou no Snowflake. Para as etapas de pré-requisito para o uso da integração de armazenamento, consulte Configuração de uma Integração de Armazenamento Snowflake. No
additionalCopyOptions Opções de cópia adicionais, fornecidas como um dicionário de pares chave-valor. Exemplos: ON_ERROR, FORCE, LOAD_UNCERTAIN_FILES. Para mais informações, consulte Opções de cópia do Snowflake. No
additionalFormatOptions Opções adicionais de formato de arquivo fornecidas ao comando COPY, fornecidas como um dicionário de pares chave-valor. Exemplos: DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT. Para mais informações, consulte Opções de tipo de formato do Snowflake. No

Note

Verifique se você tem permissão para executar o comando a seguir e acessar o esquema INFORMATION_SCHEMA e a tabela COLUMNS.

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Cópia direta para Snowflake

Se o armazenamento e o formato de dados de origem atenderem aos critérios descritos nesta seção, você poderá usar a atividade Copy para copiar diretamente da origem para o Snowflake. O serviço verifica as configurações e falha na execução da atividade Copy se os critérios a seguir não forem atendidos:

  • Quando você especifica storageIntegration no coletor:

    O armazenamento de dados de origem é o Armazenamento de Blobs do Azure que você mencionou no estágio externo no Snowflake. Você precisa concluir as seguintes etapas antes de copiar dados:

    1. Crie um serviço vinculado do Armazenamento de Blobs do Azure para o Armazenamento de Blobs do Azure de origem com qualquer tipo de autenticação compatível.

    2. Conceda pelo menos função de Leitor de Dados de Blobs de Armazenamento à entidade de serviço Snowflake no Controle de Acesso (IAM) de Armazenamento de Blobs do Azure da origem.

  • Quando você não especifica storageIntegration no coletor:

    O serviço vinculado à origem é o armazenamento de Blobs do Azure com autenticação de assinatura de acesso compartilhado. Se você quiser copiar dados diretamente do Azure Data Lake Storage Gen2 no seguinte formato suportado, você pode criar um serviço vinculado do Armazenamento de Blobs do Azure com autenticação SAS em sua conta do Azure Data Lake Storage Gen2, para evitar o uso de cópia em etapas para Snowflake.

  • O formato de dados do coletor é de Parquet, texto delimitado ou JSON, com as seguintes configurações:

    • Para o formato Parquet, o codec de compressão é Nenhum ou Snappy.

    • Para formato de texto delimitado:

      • rowDelimiter é \r\n, ou qualquer caractere único. Se o delimitador de linha não for "\r\n", firstRowAsHeader precisará ser false, e skipLineCount não estar especificado.
      • compression pode ser sem compactação, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como "UTF-8", "UTF-16", "UTF-16BE", "UTF-32", "UTF-32BE", "BIG5", "EUC-JP", "EUC-KR", "GB18030", "ISO-2022-JP", "ISO-2022-KR", "ISO-8859-1", "ISO-8859-2", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "WINDOWS-1250", "WINDOWS-1251", "WINDOWS-1252", "WINDOWS-1253", "WINDOWS-1254", "WINDOWS-1255".
      • quoteChar é aspas duplas, aspas simples ou cadeia de caracteres vazia (sem caracteres de aspas).
    • Para o formato JSON, a cópia direta suporta apenas o caso em que a tabela Snowflake do coletor tem apenas uma coluna e o tipo de dados dessa coluna é VARIANT, OBJECT, ou ARRAY.

      • compression pode ser sem compactação, gzip, bzip2 ou deflate.
      • encodingName é deixado como padrão ou definido como utf-8.
      • O mapeamento de colunas não está especificado.
  • Na origem da atividade Copy:

    • additionalColumns não está especificado.
    • Se sua origem for uma pasta, recursive é definido como verdadeiro.
    • prefix, modifiedDateTimeStart, modifiedDateTimeEnd, e enablePartitionDiscovery não estão especificados.

Example:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    },
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            }
        }
    }
]

Cópia preparada para o Snowflake

Quando o formato ou o armazenamento de dados de origem não for nativamente compatível com o comando Snowflake COPY, conforme mencionado na última seção, habilite a cópia temporária interna usando uma instância de Armazenamento de Blobs do Azure. O recurso de cópia em etapas também proporciona uma melhor eficiência. O serviço converte automaticamente os dados para atender aos requisitos de formato de dados do Snowflake. Em seguida, ele invoca o comando COPY para carregar dados no Snowflake. Por fim, ele limpa seus dados temporários do armazenamento de blobs. Veja Cópia preparada para obter detalhes sobre como copiar dados usando preparação.

Para usar esse recurso, crie um serviço vinculado ao armazenamento de Blobs do Azure que se refere à conta de armazenamento do Azure como o preparo provisório. Em seguida, especifique as propriedades enableStaging e stagingSettings na atividade Copy.

  • Quando você especifica storageIntegration no coletor, o Armazenamento de Blobs do Azure temporário deve ser aquele ao qual você se referiu no estágio externo no Snowflake. Certifique-se de criar um serviço vinculado ao Armazenamento de Blobs do Azure para ele com qualquer autenticação compatível ao usar o tempo de execução de integração do Azure, ou com autenticação anônima, de chave de conta, de assinatura de acesso compartilhado ou de entidade de serviço ao usar o tempo de execução de integração auto-hospedado. Além disso, conceda pelo menos a função Leitor de Dados de Blobs de Armazenamento à entidade de serviço do Snowflake no controle de acesso (IAM) de Armazenamento de Blobs do Azure em preparação.

  • Quando você não especifica storageIntegration no coletor, o serviço vinculado do Armazenamento de Blobs do Azure de preparação precisa usar a autenticação de assinatura de acesso compartilhado, conforme exigido pelo comando Snowflake COPY.

Example:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "storageIntegration": "< Snowflake storage integration name >"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Propriedades de fluxo de dados de mapeamento

Ao transformar dados no fluxo de dados de mapeamento, você pode ler e gravar em tabelas no Snowflake. Para obter mais informações, veja a transformação de origem e a transformação de coletor no fluxos de dados de mapeamento. Você pode optar por usar um conjunto de dados do Snowflake ou um conjunto de dados embutido como origem e tipo de coletor.

Transformação de origem

A tabela abaixo lista as propriedades suportadas pela fonte Snowflake. Você pode editar essas propriedades na guia Opções de origem. O conector utiliza a transferência de dados internado Snowflake.

Name Description Required Valores permitidos Propriedade do script do Fluxo de Dados
Table Se você selecionar Tabela como entrada, o fluxo de dados buscará todos os dados da tabela especificada no conjunto de dados do Snowflake ou nas opções de origem ao usar o conjunto de dados embutido. No String (somente para conjunto de dados em linha)
tableName
schemaName
Query Se você selecionar Consulta como entrada, insira uma consulta para buscar dados do Snowflake. Essa configuração substitui qualquer tabela que você tenha escolhido no conjunto de dados.
Se os nomes do esquema, tabela e colunas contiverem letras minúsculas, mencione o identificador de objeto na consulta, por exemplo, select * from "schema"."myTable".
No String consulta
Habilitar extração incremental (versão prévia) Use esta opção para indicar ao ADF que processe apenas as linhas que foram alteradas desde a última execução do pipeline. No booleano enableCdc
Coluna Incremental Ao usar o recurso de extração incremental, será necessário escolher uma coluna de data/hora/número que você deseja usar como referência na tabela de origem. No String waterMarkColumn
Habilitar o Controle de Alterações do Snowflake (Versão prévia) Essa opção permite que o ADF aproveite a tecnologia de captura de dados de alterações do Snowflake para processar apenas os dados delta desde a última execução do pipeline. Essa opção carrega automaticamente os dados delta com operações de inserção, atualização e exclusão de linhas sem a necessidade de nenhuma coluna incremental. No booleano enableNativeCdc
Mudanças Líquidas Ao usar o rastreamento de alterações em floco de neve, você pode usar esta opção para obter linhas alteradas com dedução ou alterações exaustivas. As linhas alteradas com dedução mostrarão apenas as versões mais recentes das linhas que foram alteradas desde um determinado momento, enquanto as alterações exaustivas mostrarão todas as versões de cada linha que foi alterada, incluindo aquelas que foram excluídas ou atualizadas. Por exemplo, se você atualizar uma linha, verá uma versão de exclusão e uma versão de inserção em alterações exaustivas, mas apenas a versão de inserção em linhas alteradas com deduplicação. Dependendo do seu caso de uso, você pode escolher a opção que atenda às suas necessidades. A opção padrão é false, o que significa alterações exaustivas. No booleano netChanges
Incluir colunas do sistema Ao usar o controle de alterações do Snowflake, você pode usar a opção systemColumns para controlar se as colunas de fluxo de metadados fornecidas pelo Snowflake estão incluídas ou excluídas na saída de controle de alterações. Por padrão, systemColumns está definido como true, o que significa que as colunas de fluxo de metadados estão incluídas. Você pode definir systemColumns como false se quiser excluí-las. No booleano systemColumns
Iniciar a leitura desde o início Definir essa opção com extração incremental e controle de alterações instruirá o ADF a ler todas as linhas na primeira execução de um pipeline com extração incremental ativada. No booleano skipInitialLoad

Exemplos de script de origem do Snowflake

Quando você usa um conjunto de dados do Snowflake como tipo de origem, o script de fluxo de dados associado é:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

Se você usar um conjunto de dados integrado, o script de fluxo de dados associado é:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

Rastreamento Nativo de Alterações

O Azure Data Factory agora dá suporte a um recurso nativo no Snowflake conhecido como controle de alterações, que envolve o acompanhamento de mudanças na forma de logs. Esse recurso do Snowflake nos permite acompanhar as alterações nos dados ao longo do tempo, tornando-os úteis para o carregamento incremental de dados e para fins de auditoria. Para utilizar esse recurso, quando você habilita a Captura de dados de alterações e seleciona o Controle de Alterações do Snowflake, criamos um objeto Stream para a tabela de origem que permite o controle de alterações na tabela de origem do Snowflake. Posteriormente, usamos a cláusula CHANGES em nossa consulta para buscar apenas os dados novos ou atualizados da tabela de origem. Além disso, é recomendável agendar o pipeline de modo que as alterações sejam consumidas dentro do intervalo de tempo de retenção de dados definido para a tabela de origem do Snowflake; caso contrário, o usuário poderá ver um comportamento inconsistente nas alterações capturadas.

Transformação de coletor

A tabela abaixo lista as propriedades suportadas pelo coletor Snowflake. Você pode editar essas propriedades na guia Configurações. Ao usar o conjunto de dados em linha, você verá configurações adicionais, que são as mesmas que as propriedades descritas na seção propriedades do conjunto de dados. O conector utiliza a transferência de dados interna do Snowflake.

Name Description Required Valores permitidos Propriedade do script do Fluxo de Dados
Método de atualização Especifique quais são as operações permitidas no seu destino do Snowflake.
Para atualizar, fazer upsert ou excluir linhas, uma transformação Alter row é necessária para marcar as linhas para essas ações.
Yes true ou false deletable
insertable
updateable
upsertable
Colunas de chave Para atualizações, inserções e exclusões, uma ou mais colunas-chave devem ser definidas para determinar qual linha alterar. No Array keys
Ação de tabela Determina se deve-se recriar ou remover todas as linhas da tabela de destino antes da gravação.
- Nenhum: nenhuma ação será feita à tabela.
- Recriar: A tabela será descartada e recriada. Necessário se estiver criando uma nova tabela dinamicamente.
- Truncate: Todas as linhas da tabela de destino serão removidas.
No true ou false recreate
truncar

Exemplos de script de pia de floco de neve

Quando você usa um conjunto de dados do Snowflake como tipo de origem, o script de fluxo de dados associado é:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Se você usar um conjunto de dados integrado, o script de fluxo de dados associado é:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

Otimização de consulta pushdown

Ao definir o Nível de Registros em Log do pipeline como Nenhum, excluímos a transmissão de métricas de transformação intermediárias, evitando possíveis obstáculos para as otimizações do Spark e possibilitando a otimização de pushdown de consulta fornecida pelo Snowflake. Essa otimização de pushdown permite melhorias substanciais de desempenho para grandes tabelas Snowflake com conjuntos de dados extensos.

Note

Não damos suporte a tabelas temporárias no Snowflake, pois elas são locais para a sessão ou o usuário que as cria, tornando-as inacessíveis a outras sessões e propensas a serem substituídas como tabelas regulares pelo Snowflake. Embora o Snowflake ofereça tabelas transitórias como alternativa, acessíveis globalmente, elas exigem exclusão manual, contradizendo nosso principal objetivo de usar tabelas temporárias, que é evitar quaisquer operações de exclusão no esquema de origem.

Mapeamento de tipo de dados para Snowflake V2

Quando você copia dados do Snowflake, os mapeamentos a seguir são usados de tipos de dados Snowflake para tipos de dados provisórios dentro do serviço internamente. Para saber mais sobre como a atividade Copy mapeia o tipo de dados e esquema de origem para o coletor, consulte Mapeamentos de tipo de dados e esquema.

Tipo de dados Snowflake Tipo de dados provisório do serviço
NÚMERO (p,0) Decimal
NUMBER (p,s onde s>0) Decimal
FLOAT Double
VARCHAR String
CHAR String
BINARY Byte[]
BOOLEAN booleano
DATE DateTime
TIME TimeSpan
TIMESTAMP_LTZ DateTimeOffset
TIMESTAMP_NTZ DateTimeOffset
TIMESTAMP_TZ DateTimeOffset
VARIANT String
OBJECT String
ARRAY String

Propriedades da atividade de pesquisa

Para obter mais informações sobre as propriedades, confira Atividade de pesquisa.

Ciclo de vida e atualização do conector Snowflake

A tabela a seguir mostra o estágio de lançamento e os logs de alteração para versões diferentes do conector Snowflake:

Version Estágio de lançamento Log de alterações
Floco de Neve V1 Removed Não aplicável.
Snowflake V2 (versão 1.0) Versão GA disponível • Adicione suporte para autenticação de par de chaves.

• Adicionar suporte para storageIntegration na atividade de cópia.

• As propriedades accountIdentifier, warehouse, database, schema e role são usadas para estabelecer uma conexão em vez da propriedade connectionstring.

• Adicione suporte para "Decimal" na atividade de consulta. O tipo NUMBER, conforme definido no Snowflake, será exibido como uma cadeia de caracteres na atividade Pesquisa. Se você quiser encoberto para o tipo numérico na V2, poderá usar o parâmetro de pipeline com função int ou função float. Por exemplo, int(activity('lookup').output.firstRow.VALUE), float(activity('lookup').output.firstRow.VALUE)

• O tipo de dados timestamp no Snowflake é lido como tipo de dados DateTimeOffset na atividade de pesquisa e Script. Se você ainda precisar usar o valor Datetime como um parâmetro em seu pipeline após a atualização para v2, poderá converter o tipo DateTimeOffset em tipo DateTime usando a função formatDateTime (recomendada) ou a função concat. Por exemplo: formatDateTime(activity('lookup').output.firstRow.DATETIMETYPE), concat(substring(activity('lookup').output.firstRow.DATETIMETYPE, 0, 19), 'Z')

• NUMBER (p,0) é lido como um tipo de dados decimal.

• TIMESTAMP_LTZ, TIMESTAMP_NTZ e TIMESTAMP_TZ são lidos como tipo de dados DateTimeOffset.

• Não há suporte para parâmetros de script na atividade script. Como alternativa, utilize expressões dinâmicas para parâmetros de script. Para obter mais informações, confira Expressões e funções no Azure Data Factory e no Azure Synapse Analytics.

• Não há suporte para a execução de várias instruções SQL na atividade script.
Snowflake V2 (versão 1.1) Versão GA disponível • Adicione suporte para parâmetros de script.

• Adicione suporte para execução de várias instruções na atividade script.

• Adicionar a treatDecimalAsString propriedade nas atividades Pesquisa e Script.

• Adicionar propriedade de conexão adicional UseUtcTimestamps.

Atualize o conector Snowflake de V1 para V2

Para atualizar o conector Snowflake da V1 para a V2, você pode optar por uma atualização paralela ou uma atualização no local.

Atualização lado a lado

Para executar uma atualização lado a lado, conclua as seguintes etapas:

  1. Crie um novo serviço vinculado ao Snowflake e configure-o consultando as propriedades do serviço vinculado V2.
  2. Crie um conjunto de dados com base no serviço vinculado do Snowflake recém-criado.
  3. Substitua o novo serviço vinculado e o conjunto de dados pelos existentes nos pipelines que têm como alvo os objetos V1.

Atualização no local

Para executar uma atualização local, você precisa editar a carga útil do serviço vinculado existente e atualizar o conjunto de dados para usar o novo serviço vinculado.

  1. Atualize o tipo do Snowflake para SnowflakeV2.

  2. Modifique o conteúdo do serviço vinculado do formato V1 para o V2. Você pode preencher cada campo na interface do usuário após alterar o tipo mencionado acima ou atualizar a carga diretamente por meio do Editor JSON. Consulte a seção Propriedades do serviço vinculado nesse artigo para obter as propriedades de conexão suportadas. Os exemplos a seguir mostram as diferenças no conteúdo dos serviços vinculados do Snowflake V1 e V2:

    Carga útil JSON do serviço vinculado ao Snowflake V1:

      {
         "name": "Snowflake1",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "annotations": [],
             "type": "Snowflake",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "connectionString": "jdbc:snowflake://<fake_account>.snowflakecomputing.com/?user=FAKE_USER&db=FAKE_DB&warehouse=FAKE_DW&schema=PUBLIC",
                 "encryptedCredential": "<your_encrypted_credential_value>"
             },
             "connectVia": {
                 "referenceName": "AzureIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    

    Carga útil JSON do serviço vinculado ao Snowflake V2:

     {
         "name": "Snowflake2",
         "type": "Microsoft.DataFactory/factories/linkedservices",
         "properties": {
             "parameters": {
                 "schema": {
                     "type": "string",
                     "defaultValue": "PUBLIC"
                 }
             },
             "annotations": [],
             "type": "SnowflakeV2",
             "typeProperties": {
                 "authenticationType": "Basic",
                 "accountIdentifier": "<FAKE_Account>",
                 "user": "FAKE_USER",
                 "database": "FAKE_DB",
                 "warehouse": "FAKE_DW",
                 "encryptedCredential": "<placeholder>"
             },
             "connectVia": {
                 "referenceName": "AutoResolveIntegrationRuntime",
                 "type": "IntegrationRuntimeReference"
             }
         }
     }
    
  3. Atualize o conjunto de dados para usar o novo serviço vinculado. Você pode criar um novo conjunto de dados com base no serviço vinculado recém-criado ou atualizar a propriedade de tipo de um conjunto de dados existente, mudando de SnowflakeTable para SnowflakeV2Table.

Note

Ao fazer a transição de serviços vinculados, a seção de parâmetro de modelo de substituição pode exibir apenas as propriedades do banco de dados. Você pode resolver isso editando manualmente os parâmetros. Depois disso, a seção Substituição de parâmetros de modelo mostrará as cadeias de conexão.

Atualizar o conector Snowflake V2 da versão 1.0 para a versão 1.1

Na página Editar serviço vinculado , selecione 1.1 para a versão. Para saber mais, confira Propriedades do serviço vinculado.

Para obter uma lista de armazenamentos de dados suportados como fontes e destinos pela atividade de cópia, consulte armazenamentos e formatos de dados compatíveis.