Udostępnij przez


Analizowanie złożonych typów danych w usłudze Azure Synapse Analytics

Ważne

Usługa Synapse Link dla usługi Cosmos DB nie jest już obsługiwana w przypadku nowych projektów. Nie używaj tej funkcji.

Użyj funkcji mirroringu Azure Cosmos DB dla Microsoft Fabric, która jest teraz ogólnie dostępna. Mirroring zapewnia takie same korzyści płynące z braku konieczności ETL i jest w pełni zintegrowane z Microsoft Fabric. Dowiedz się więcej na stronie Omówienie dublowania usługi Cosmos DB.

Ten artykuł dotyczy plików i kontenerów Parquet w usłudze Azure Synapse Link dla usługi Azure Cosmos DB. Za pomocą platformy Spark lub SQL można odczytywać lub przekształcać dane za pomocą złożonych schematów, takich jak tablice lub struktury zagnieżdżone. Poniższy przykład został ukończony przy użyciu pojedynczego dokumentu, ale można go łatwo skalować do miliardów dokumentów przy użyciu platformy Spark lub języka SQL. Kod zawarty w tym artykule używa narzędzia PySpark (Python).

Przypadek użycia

Złożone typy danych są coraz bardziej powszechne i stanowią wyzwanie dla inżynierów danych. Analizowanie zagnieżdżonego schematu i tablic może obejmować czasochłonne i złożone zapytania SQL. Ponadto może być trudno zmienić nazwę lub typ zagnieżdżonych kolumn. Ponadto, podczas pracy z głęboko zagnieżdżonymi obiektami, można napotkać problemy z wydajnością.

Inżynierowie danych muszą zrozumieć, jak efektywnie przetwarzać złożone typy danych i ułatwić im dostęp do wszystkich użytkowników. W poniższym przykładzie używasz platformy Spark w usłudze Azure Synapse Analytics do odczytywania i przekształcania obiektów w płaską strukturę za pomocą ramek danych. Model sql bezserwerowy w usłudze Azure Synapse Analytics umożliwia bezpośrednie wykonywanie zapytań o takie obiekty i zwracanie tych wyników jako zwykłą tabelę.

Co to są tablice i struktury zagnieżdżone?

Poniższy obiekt pochodzi z usługi Application Insights. W tym obiekcie istnieją zagnieżdżone struktury i tablice zawierające zagnieżdżone struktury.

{
    "id": "66532691-ab20-11ea-8b1d-936b3ec64e54",
    "context": {
        "data": {
            "eventTime": "2020-06-10T13:43:34.553Z",
            "samplingRate": "100.0",
            "isSynthetic": "false"
        },
        "session": {
            "isFirst": "false",
            "id": "38619c14-7a23-4687-8268-95862c5326b1"
        },
        "custom": {
            "dimensions": [
                {
                    "customerInfo": {
                        "ProfileType": "ExpertUser",
                        "RoomName": "",
                        "CustomerName": "diamond",
                        "UserName": "XXXX@yahoo.com"
                    }
                },
                {
                    "customerInfo": {
                        "ProfileType": "Novice",
                        "RoomName": "",
                        "CustomerName": "topaz",
                        "UserName": "XXXX@outlook.com"
                    }
                }
            ]
        }
    }
}

Przykład schematu tablic i zagnieżdżonych struktur

Podczas drukowania schematu ramki danych obiektu (o nazwie df) za pomocą polecenia df.printschemazobaczysz następującą reprezentację:

  • Żółty reprezentuje zagnieżdżone struktury.
  • Zielony reprezentuje tablicę z dwoma elementami.

Kod z żółtym i zielonym wyróżnianiem przedstawiającym źródło schematu

_rid, _ts, i _etag zostały dodane do systemu, ponieważ dokument został wprowadzony do magazynu transakcyjnego usługi Azure Cosmos DB.

Poprzednia ramka danych liczy się tylko dla 5 kolumn i 1 wierszy. Po przekształceniu wyselekcjonowane ramki danych będą miały 13 kolumn i 2 wiersze w formacie tabelarycznym.

Spłaszczają zagnieżdżone struktury i rozszerzają tablice

Platforma Spark w usłudze Azure Synapse Analytics umożliwia łatwe przekształcanie zagnieżdżonych struktur w kolumny i elementy tablicy w wiele wierszy. Aby wykonać implementację, wykonaj następujące kroki.

Schemat blokowy przedstawiający kroki przekształceń platformy Spark

Definiowanie funkcji w celu spłaszczenia zagnieżdżonego schematu

Tej funkcji można używać bez zmiany. Utwórz komórkę w notesie PySpark za pomocą następującej funkcji:

from pyspark.sql.functions import col

def flatten_df(nested_df):
    stack = [((), nested_df)]
    columns = []

    while len(stack) > 0:
        parents, df = stack.pop()

        flat_cols = [
            col(".".join(parents + (c[0],))).alias("_".join(parents + (c[0],)))
            for c in df.dtypes
            if c[1][:6] != "struct"
        ]

        nested_cols = [
            c[0]
            for c in df.dtypes
            if c[1][:6] == "struct"
        ]

        columns.extend(flat_cols)

        for nested_col in nested_cols:
            projected_df = df.select(nested_col + ".*")
            stack.append((parents + (nested_col,), projected_df))

    return nested_df.select(columns)

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

W tym kroku spłaszczasz zagnieżdżony schemat ramki danych (df) do nowej ramki danych (df_flat):

from pyspark.sql.types import StringType, StructField, StructType
df_flat = flatten_df(df)
display(df_flat.limit(10))

Funkcja wyświetlania powinna zwrócić 10 kolumn i 1 wiersz. Tablica i jej zagnieżdżone elementy są nadal obecne.

Przekształć tablicę

W tym miejscu przekształć tablicę , context_custom_dimensionsw ramce df_flatdanych , w nową ramkę df_flat_explodedanych . W poniższym kodzie zdefiniujesz również kolumnę do wybrania:

from pyspark.sql.functions import explode
from pyspark.sql.functions import flatten
from pyspark.sql.functions import arrays_zip
df_flat_explode = df_flat.select("_rid","_ts","id","_etag",explode(df_flat.context_custom_dimensions),"context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")\
.select("_rid","_ts","id","_etag","col.*","context_session_isFirst","context_session_id","context_data_eventTime","context_data_samplingRate","context_data_isSynthetic")
display(df_flat_explode.limit(10))

Funkcja wyświetlania powinna zwracać 10 kolumn i 2 wiersze. Następnym krokiem jest spłaszczenie zagnieżdżonych schematów z funkcją zdefiniowaną w kroku 1.

Używanie funkcji do spłaszczenia zagnieżdżonego schematu

Na koniec użyj funkcji, aby spłaszczyć zagnieżdżony schemat ramki danych df_flat_explode do nowej ramki danych df_flat_explode_flat.

df_flat_explode_flat = flatten_df(df_flat_explode)
display(df_flat_explode_flat.limit(10))

Funkcja wyświetlania powinna zawierać 13 kolumn i 2 wiersze.

Funkcja printSchema ramki df_flat_explode_flat danych zwraca następujący wynik:

Kod przedstawiający ostateczny schemat

Odczytywanie tablic i zagnieżdżonych struktur bezpośrednio

Model bezserwerowy języka SQL umożliwia wykonywanie zapytań i tworzenie widoków i tabel dla takich obiektów.

Najpierw, w zależności od sposobu przechowywania danych, użytkownicy powinni używać następującej taksonomii. Wszystkie elementy wyświetlane w wielkiej literze są specyficzne dla twojego przypadku użycia:

Zbiorczo Formatuj
"https://ACCOUNTNAME.dfs.core.windows.net/FILESYSTEM/PATH/FINENAME.parquet" "Parquet" (ADLSg2)
N'endpoint=https://ACCOUNTNAME.documents-staging.windows-ppe.net:443/; account=ACCOUNTNAME; database=DATABASENAME; collection=COLLECTIONNAME; region=REGIONTOQUERY', SECRET='YOURSECRET' "CosmosDB" (Azure Synapse Link)

Zastąp każde pole w następujący sposób:

  • "YOUR BULK ABOVE" to parametry połączenia źródła danych, z którym nawiązujesz połączenie.
  • "TWÓJ TYP POWYŻEJ" jest formatem używanym do nawiązywania połączenia ze źródłem.
select *
FROM
openrowset(
    BULK 'YOUR BULK ABOVE',
    FORMAT='YOUR TYPE ABOVE'
)
with (id varchar(50),
        contextdataeventTime varchar(50) '$.context.data.eventTime',
        contextdatasamplingRate varchar(50) '$.context.data.samplingRate',
        contextdataisSynthetic varchar(50) '$.context.data.isSynthetic',
        contextsessionisFirst varchar(50) '$.context.session.isFirst',
        contextsessionid varchar(50) '$.context.session.id',
        contextcustomdimensions varchar(max) '$.context.custom.dimensions'
) as q 
cross apply openjson (contextcustomdimensions) 
with ( ProfileType varchar(50) '$.customerInfo.ProfileType',
            RoomName varchar(50) '$.customerInfo.RoomName',
            CustomerName varchar(50) '$.customerInfo.CustomerName',
            UserName varchar(50) '$.customerInfo.UserName'
    )

Istnieją dwa różne typy operacji:

  • Pierwszy typ operacji jest wskazywany w poniższym wierszu kodu, który definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu , Context.Data.eventTime.

    contextdataeventTime varchar(50) '$.context.data.eventTime'
    

    Ten wiersz definiuje kolumnę o nazwie contextdataeventTime , która odwołuje się do zagnieżdżonego elementu , Context>Data>eventTime.

  • Drugi typ operacji używa cross apply do tworzenia nowych wierszy dla każdego elementu w tablicy. Następnie definiuje każdy zagnieżdżony obiekt.

    cross apply openjson (contextcustomdimensions) 
    with ( ProfileType varchar(50) '$.customerInfo.ProfileType', 
    

    Jeśli tablica zawiera 5 elementów z 4 zagnieżdżonymi strukturami, model bezserwerowy SQL zwraca 5 wierszy i 4 kolumny. Model bezserwerowy SQL może wykonywać zapytania bezpośrednio na miejscu, przekształcać tablicę na 2 wiersze i wyświetlać wszystkie zagnieżdżone struktury w kolumnach.

Następne kroki