Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Polecenie COPY INTO SQL umożliwia ładowanie danych z lokalizacji pliku do tabeli delty. Jest to operacja powtarzalna i idempotentna; pliki w lokalizacji źródłowej, które zostały już załadowane, są pomijane.
COPY INTO oferuje następujące możliwości:
- Łatwe konfigurowanie filtrów plików lub katalogów z magazynu w chmurze, w tym woluminów S3, ADLS, ABFS, GCS i Unity Catalog.
- Obsługa wielu formatów plików źródłowych: CSV, JSON, XML, Avro, ORC, Parquet, text i binarnych
- Przetwarzanie plików dokładnie raz (idempotentne) domyślnie
- Wnioskowanie, mapowanie, scalanie i ewolucja schematu tabeli docelowej
Uwaga
Aby uzyskać bardziej skalowalne i niezawodne środowisko pozyskiwania plików, Databricks zaleca użytkownikom SQL korzystanie z tabel strumieniowych. Zobacz Tablice streamingowe.
Ostrzeżenie
COPY INTO uwzględnia ustawienia obszaru roboczego dotyczące wektorów usuwania. Jeśli to ustawienie jest włączone, wektory usuwania są włączone w tabeli docelowej, gdy COPY INTO działa na magazynie SQL lub zasobach obliczeniowych z uruchomionym środowiskiem Databricks Runtime 14.0 lub nowszym. Gdy zostaną włączone, wektory usuwania blokują zapytania dotyczące tabeli w środowisku Databricks Runtime w wersji 11.3 LTS i wcześniejszych. Zobacz Co to są wektory usuwania? i Automatyczne włączanie wektorów usuwania.
Wymagania
Administrator konta musi wykonać kroki opisane w temacie Konfigurowanie dostępu do danych na potrzeby pozyskiwania w celu skonfigurowania dostępu do danych w magazynie obiektów w chmurze, zanim użytkownicy będą mogli ładować dane przy użyciu polecenia COPY INTO.
Przykład: ładowanie danych do tabeli usługi Delta Lake bez schematu
Uwaga
Ta funkcja jest dostępna w środowisku Databricks Runtime 11.3 LTS lub nowszym.
Możesz utworzyć puste tabele Delta jako symbole zastępcze, aby schemat został wywnioskowany później podczas polecenia COPY INTO, ustawiając mergeSchema na true w COPY_OPTIONS:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
Powyższa instrukcja SQL jest idempotentna i można zaplanować jej uruchomienie w celu jednokrotnego wprowadzenia danych do tabeli Delta.
Uwaga
Pusta tabela delty nie nadaje się do użycia poza COPY INTO.
INSERT INTO i MERGE INTO nie są obsługiwane do zapisywania danych w tabelach delta bez schematu. Po wstawieniu danych do tabeli z COPY INTO, tabela staje się możliwa do zapytania.
Zobacz Tworzenie tabel docelowych dla elementu COPY INTO.
Przykład: ustawianie schematu i ładowanie danych do tabeli usługi Delta Lake
W poniższym przykładzie pokazano, jak utworzyć tabelę Delta, a następnie użyć polecenia COPY INTO SQL, aby załadować przykładowe dane z zestawów danych Databricks do tabeli. Możesz uruchomić przykładowy kod Python, R, Scala lub SQL z notesu dołączonego do klastra usługi Azure Databricks. Możesz również uruchomić kod SQL z zapytania skojarzonego z usługą SQL Warehouse w usłudze Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Skala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Aby wyczyścić, uruchom następujący kod, który usuwa tabelę:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Skala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Czyszczenie plików metadanych
Możesz uruchomić VACUUM, aby wyczyścić nieużywane pliki metadanych utworzone przez COPY INTO w środowisku Databricks Runtime 15.2 lub nowszym.
Referencja
- Databricks Runtime 7.x oraz nowsze:
COPY INTO
Dodatkowe zasoby
- COPY INTO
- Aby zapoznać się z typowymi wzorcami użycia, w tym przykładami wielu
COPY INTOoperacji przeciwko tej samej tabeli Delta, zapoznaj się z Typowe wzorce ładowania danych przy użyciuCOPY INTO.