Partilhar via


Analisar tipos de dados complexos no Azure Synapse Analytics

Importante

O Synapse Link para Cosmos DB já não é suportado para novos projetos. Não uses esta funcionalidade.

Por favor, use o Azure Cosmos DB Mirroring para Microsoft Fabric, que agora é GA. O espelhamento oferece os mesmos benefícios de zero ETL e está totalmente integrado com o Microsoft Fabric. Saiba mais em Cosmos DB Mirroring Overview.

Este artigo é relevante para arquivos e contêineres do Parquet no Azure Synapse Link for Azure Cosmos DB. Você pode usar o Spark ou o SQL para ler ou transformar dados com esquemas complexos, como matrizes ou estruturas aninhadas. O exemplo a seguir é concluído com um único documento, mas pode ser facilmente dimensionado para bilhões de documentos com o Spark ou SQL. O código incluído neste artigo usa PySpark (Python).

Caso de utilização

Tipos de dados complexos são cada vez mais comuns e representam um desafio para os engenheiros de dados. A análise de esquemas e matrizes aninhados pode envolver consultas SQL demoradas e complexas. Além disso, pode ser difícil renomear ou transformar o tipo de dados de colunas aninhadas. Além disso, quando estás a trabalhar com objetos profundamente aninhados, podes encontrar problemas de desempenho.

Os engenheiros de dados precisam entender como processar com eficiência tipos de dados complexos e torná-los facilmente acessíveis a todos. No exemplo a seguir, você usa o Spark no Azure Synapse Analytics para ler e transformar objetos em uma estrutura plana por meio de quadros de dados. Você usa o modelo sem servidor do SQL no Azure Synapse Analytics para consultar esses objetos diretamente e retornar esses resultados como uma tabela regular.

O que são arrays e estruturas aninhadas?

O objeto a seguir vem do Application Insights. Neste objeto, há estruturas aninhadas e matrizes que contêm estruturas aninhadas.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Exemplo de esquema de matrizes e estruturas aninhadas

Ao imprimir o esquema do quadro de dados do objeto (chamado df) com o comando df.printschema, você verá a seguinte representação:

  • A cor amarela representa estruturas aninhadas.
  • Verde representa uma matriz com dois elementos.

Código com realce amarelo e verde, mostrando a origem do esquema

_rid, _tse _etag foram adicionados ao sistema à medida que o documento foi ingerido no repositório transacional do Azure Cosmos DB.

O quadro de dados anterior conta apenas para 5 colunas e 1 linha. Após a transformação, o quadro de dados curado terá 13 colunas e 2 linhas, em formato tabular.

Achatar estruturas aninhadas e explodir matrizes

Com o Spark no Azure Synapse Analytics, é fácil transformar estruturas aninhadas em colunas e elementos de matriz em várias linhas. Use as etapas a seguir para implementação.

Fluxograma mostrando etapas para transformações do Spark

Definir uma função para nivelar o esquema aninhado

Você pode usar esta função sem alterações. Crie uma célula num notebook PySpark com a seguinte função:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Use a função para nivelar o esquema aninhado

Nesta etapa, você nivela o esquema aninhado do quadro de dados (df) em um novo quadro de dados (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

A função de exibição deve retornar 10 colunas e 1 linha. A matriz e seus elementos aninhados ainda estão lá.

Transformar a matriz

Aqui, você transforma a matriz, context_custom_dimensionsno quadro df_flatde dados, em um novo quadro de df_flat_explodedados. No código a seguir, você também define qual coluna selecionar:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

A função de exibição deve retornar 10 colunas e 2 linhas. A próxima etapa é achatar esquemas aninhados com a função definida na etapa 1.

Use a função para nivelar o esquema aninhado

Finalmente, utiliza a função para nivelar o esquema aninhado do quadro de dados df_flat_explode, num novo quadro de dados, df_flat_explode_flat:

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

A função de exibição deve mostrar 13 colunas e 2 linhas.

A função printSchema do quadro df_flat_explode_flat de dados retorna o seguinte resultado:

Código mostrando o esquema final

Leia matrizes e estruturas aninhadas diretamente

Com o modelo sem servidor do SQL, você pode consultar e criar exibições e tabelas sobre esses objetos.

Primeiro, dependendo de como os dados foram armazenados, os usuários devem usar a seguinte taxonomia. Tudo o que é mostrado em maiúsculas é específico para o seu caso de uso:

Em massa Formato
https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet 'Parquet' (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; base de dados=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='SEUSEGREDO' 'CosmosDB' (Azure Synapse Link)

Substitua cada campo da seguinte forma:

  • 'YOUR BULK ABOVE' é a cadeia de conexão da fonte de dados à qual você se conecta.
  • 'SEU TIPO ACIMA' é o formato que você usa para se conectar à fonte.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Existem dois tipos diferentes de operações:

  • O primeiro tipo de operação é indicado na seguinte linha de código, que define a coluna chamada contextdataeventTime que se refere ao elemento aninhado, Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Esta linha define a coluna chamada contextdataeventTime que se refere ao elemento aninhado, Context>Data>eventTime.

  • O segundo tipo de operação usa cross apply para criar novas linhas para cada elemento sob a matriz. Em seguida, ele define cada objeto aninhado.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Se a matriz tinha 5 elementos com 4 estruturas aninhadas, o modelo sem servidor do SQL retorna 5 linhas e 4 colunas. O modelo sem servidor do SQL pode consultar no local, mapear a matriz em 2 linhas e exibir todas as estruturas aninhadas em colunas.

Próximos passos