Udostępnij przez


Samouczek: COPY INTO z usługą Spark SQL

Usługa Databricks zaleca użycie polecenia COPY INTO do przyrostowego i zbiorczego ładowania danych dla źródeł danych zawierających tysiące plików. Usługa Databricks zaleca używanie Auto Loader dla zaawansowanych zastosowań.

W tym samouczku użyjesz polecenia COPY INTO, aby załadować dane z magazynu obiektów w chmurze do tabeli w obszarze roboczym usługi Azure Databricks.

Wymagania

Krok 1. Konfigurowanie środowiska i tworzenie generatora danych

W tym samouczku założono podstawową znajomość usługi Azure Databricks i domyślnej konfiguracji obszaru roboczego. Jeśli nie możesz uruchomić podanego kodu, skontaktuj się z administratorem obszaru roboczego, aby upewnić się, że masz dostęp do zasobów obliczeniowych i lokalizacji, do której można zapisywać dane.

Należy pamiętać, że podany kod używa parametru source do określenia lokalizacji, którą skonfigurujesz jako COPY INTO źródło danych. Zgodnie z zapisem ten kod wskazuje lokalizację w katalogu głównym systemu plików DBFS. Jeśli masz uprawnienia do zapisu w lokalizacji magazynu obiektów zewnętrznych, zastąp fragment dbfs:/ ciągu źródłowego ścieżką do magazynu obiektów. Ponieważ ten blok kodu wykonuje również rekursywne usuwanie w celu zresetowania tej demonstracji, upewnij się, że nie kierujesz tego na dane produkcyjne, i zachowaj katalog /user/{username}/copy-into-demo zagnieżdżony, aby uniknąć zastępowania lub usuwania istniejących danych.

  1. Utwórz nowy notes i dołącz go do zasobu obliczeniowego.

  2. Skopiuj i uruchom następujący kod, aby zresetować lokalizację przechowywania i bazę danych używaną w tym samouczku.

    %python
    # Set parameters for isolation in workspace and reset demo
    
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"copyinto_{username}_db"
    source = f"dbfs:/user/{username}/copy-into-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    dbutils.fs.rm(source, True)
    
  3. Skopiuj i uruchom następujący kod, aby skonfigurować niektóre tabele i funkcje, które będą używane do losowego generowania danych:

    -- Configure random data generator
    
    CREATE TABLE user_ping_raw
    (user_id STRING, ping INTEGER, time TIMESTAMP)
    USING json
    LOCATION ${c.source};
    
    CREATE TABLE user_ids (user_id STRING);
    
    INSERT INTO user_ids VALUES
    ("potato_luver"),
    ("beanbag_lyfe"),
    ("default_username"),
    ("the_king"),
    ("n00b"),
    ("frodo"),
    ("data_the_kid"),
    ("el_matador"),
    ("the_wiz");
    
    CREATE FUNCTION get_ping()
        RETURNS INT
        RETURN int(rand() * 250);
    
    CREATE FUNCTION is_active()
        RETURNS BOOLEAN
        RETURN CASE
            WHEN rand() > .25 THEN true
            ELSE false
            END;
    

Krok 2: Zapisać przykładowe dane w przechowywaniu danych w chmurze

Zapisywanie w formatach danych innych niż usługa Delta Lake jest rzadkie w usłudze Azure Databricks. Podany tutaj kod zapisuje w formacie JSON, symulując system zewnętrzny, który może zrzucić wyniki z innego systemu do magazynu obiektów.

  1. Skopiuj i uruchom następujący kod, aby napisać partię nieprzetworzonych danych JSON:

    -- Write a new batch of data to the data source
    
    INSERT INTO user_ping_raw
    SELECT *,
      get_ping() ping,
      current_timestamp() time
    FROM user_ids
    WHERE is_active()=true;
    

Krok 3: Ładuj dane JSON idempotentnie przy użyciu COPY INTO

Przed użyciem COPY INTOnależy utworzyć docelową tabelę Delta Lake. Nie musisz podawać żadnych elementów innych niż nazwa tabeli w instrukcji CREATE TABLE .

  1. Skopiuj i uruchom następujący kod, aby utworzyć docelową tabelę delty i załadować dane ze źródła:

    -- Create target table and load data
    
    CREATE TABLE IF NOT EXISTS user_ping_target;
    
    COPY INTO user_ping_target
    FROM ${c.source}
    FILEFORMAT = JSON
    FORMAT_OPTIONS ("mergeSchema" = "true")
    COPY_OPTIONS ("mergeSchema" = "true")
    

Ponieważ ta akcja jest idempotentna, można ją uruchomić wiele razy, ale dane będą ładowane tylko raz.

Krok 4. Podgląd zawartości tabeli

Możesz uruchomić proste zapytanie SQL, aby ręcznie przejrzeć zawartość tej tabeli.

  1. Skopiuj i wykonaj następujący kod, aby wyświetlić podgląd tabeli:

    -- Review updated table
    
    SELECT * FROM user_ping_target
    

Krok 5. Ładowanie większej ilości danych i podgląd wyników

Możesz ponownie uruchomić kroki 2–4 wiele razy, aby umieścić nowe partie losowych, surowych danych JSON w źródle, a następnie w sposób idempotentny załadować je do usługi Delta Lake z użyciem COPY INTO i wyświetlić podgląd wyników. Spróbuj uruchomić te kroki poza kolejnością lub wiele razy, aby zasymulować wiele partii nieprzetworzonych danych zapisywanych lub wykonywanych COPY INTO wiele razy bez przybycia nowych danych.

Krok 6. Samouczek dotyczący czyszczenia

Po ukończeniu tego samouczka możesz wyczyścić skojarzone zasoby, jeśli nie chcesz ich przechowywać.

Skopiuj i uruchom następujący kod, aby usunąć bazę danych, tabele i usunąć wszystkie dane:

%python
# Drop database and tables and remove data

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
dbutils.fs.rm(source, True)

Dodatkowe zasoby