Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Met de COPY INTO SQL-opdracht kunt u gegevens uit een bestandslocatie laden in een Delta-tabel. Dit is een herhaalbare en idempotente bewerking; bestanden in de bronlocatie die al zijn geladen, worden overgeslagen.
COPY INTO biedt de volgende mogelijkheden:
- Eenvoudig configureerbare bestands- of mapfilters uit cloudopslag, waaronder S3-, ADLS-, ABFS-, GCS- en Unity Catalog-volumes.
- Ondersteuning voor meerdere bronbestandsindelingen: CSV-, JSON-, XML-, Avro-, ORC-, Parquet-, tekst- en binaire bestanden
- Exactly-once (idempotent) bestandsverwerking als standaard
- Schema-afleiding van doeltabel, schema-toewijzing, samenvoegen en evolutie
Notitie
Voor een meer schaalbare en robuuste ervaring voor bestandsopname raadt Databricks aan dat SQL-gebruikers gebruikmaken van streamingtabellen. Zie Streaming-tabellen.
Waarschuwing
COPY INTO respecteert de werkruimte-instelling voor verwijderingsvectoren. Indien ingeschakeld, worden verwijderingsvectoren ingeschakeld in de doeltabel wanneer COPY INTO wordt uitgevoerd op een SQL-warehouse of rekenkracht waarop Databricks Runtime 14.0 of hoger wordt uitgevoerd. Als dit is ingeschakeld, blokkeren verwijderingsvectoren query's voor een tabel in Databricks Runtime 11.3 LTS en hieronder. Zie verwijderingsvectoren in Databricks en verwijdervectoren automatisch inschakelen.
Vereisten
Een accountbeheerder moet de stappen volgen in Gegevenstoegang configureren voor opname om de toegang tot gegevens in de opslag van cloudobjecten te configureren voordat gebruikers gegevens kunnen laden met behulp van COPY INTO.
Voorbeeld: Gegevens laden in een schemaloze Delta Lake-tabel
Notitie
Deze functie is beschikbaar in Databricks Runtime 11.3 LTS en hoger.
U kunt lege tijdelijke aanduidingen voor Delta-tabellen maken, zodat het schema later wordt afgeleid tijdens een COPY INTO opdracht door mergeSchema in te stellen op true in COPY_OPTIONS:
CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
De bovenstaande SQL-instructie is idempotent en kan worden gepland om gegevens exact één keer op te nemen in een Delta-tabel.
Notitie
De lege Delta-tabel is niet bruikbaar buiten COPY INTO.
INSERT INTO en MERGE INTO worden niet ondersteund om gegevens naar schemaloze Delta-tabellen te schrijven. Nadat gegevens met COPY INTOin de tabel zijn ingevoegd, kan de tabel query's uitvoeren.
Zie Doeltabellen maken voor COPY INTO.
Voorbeeld: Schema instellen en gegevens laden in een Delta Lake-tabel
In het volgende voorbeeld ziet u hoe u een Delta-tabel maakt en vervolgens de sql-opdracht COPY INTO gebruikt om voorbeeldgegevens uit Databricks-gegevenssets in de tabel te laden. U kunt de voorbeeldcode Python, R, Scala of SQL uitvoeren vanuit een notebook dat is gekoppeld aan een Azure Databricks-cluster. U kunt de SQL-code ook uitvoeren vanuit een query die is gekoppeld aan een SQL-warehouse in Databricks SQL.
SQL
DROP TABLE IF EXISTS default.loan_risks_upload;
CREATE TABLE default.loan_risks_upload (
loan_id BIGINT,
funded_amnt INT,
paid_amnt DOUBLE,
addr_state STRING
);
COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;
SELECT * FROM default.loan_risks_upload;
-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0 | 1000 | 182.22 | CA |
-- +---------+-------------+-----------+------------+
-- | 1 | 1000 | 361.19 | WA |
-- +---------+-------------+-----------+------------+
-- | 2 | 1000 | 176.26 | TX |
-- +---------+-------------+-----------+------------+
-- ...
Python
table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" \
"loan_id BIGINT, " + \
"funded_amnt INT, " + \
"paid_amnt DOUBLE, " + \
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name + \
" FROM '" + source_data + "'" + \
" FILEFORMAT = " + source_format
)
loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)
display(loan_risks_upload_data)
'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
'''
R
library(SparkR)
sparkR.session()
table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"
sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))
sql(paste("CREATE TABLE ", table_name, " (",
"loan_id BIGINT, ",
"funded_amnt INT, ",
"paid_amnt DOUBLE, ",
"addr_state STRING)",
sep = ""
))
sql(paste("COPY INTO ", table_name,
" FROM '", source_data, "'",
" FILEFORMAT = ", source_format,
sep = ""
))
loan_risks_upload_data = tableToDF(table_name)
display(loan_risks_upload_data)
# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0 | 1000 | 182.22 | CA |
# +---------+-------------+-----------+------------+
# | 1 | 1000 | 361.19 | WA |
# +---------+-------------+-----------+------------+
# | 2 | 1000 | 176.26 | TX |
# +---------+-------------+-----------+------------+
# ...
Scala
val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"
spark.sql("DROP TABLE IF EXISTS " + table_name)
spark.sql("CREATE TABLE " + table_name + " (" +
"loan_id BIGINT, " +
"funded_amnt INT, " +
"paid_amnt DOUBLE, " +
"addr_state STRING)"
)
spark.sql("COPY INTO " + table_name +
" FROM '" + source_data + "'" +
" FILEFORMAT = " + source_format
)
val loan_risks_upload_data = spark.table(table_name)
display(loan_risks_upload_data)
/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0 | 1000 | 182.22 | CA |
+---------+-------------+-----------+------------+
| 1 | 1000 | 361.19 | WA |
+---------+-------------+-----------+------------+
| 2 | 1000 | 176.26 | TX |
+---------+-------------+-----------+------------+
...
*/
Voer de volgende code uit om de tabel op te schonen:
Python
spark.sql("DROP TABLE " + table_name)
R
sql(paste("DROP TABLE ", table_name, sep = ""))
Scala
spark.sql("DROP TABLE " + table_name)
SQL
DROP TABLE default.loan_risks_upload
Metagegevensbestanden opschonen
U kunt VACUUM gebruiken om niet-verwezen metagegevensbestanden die zijn aangemaakt door COPY INTO in Databricks Runtime 15.2 en hoger, op te schonen.
Referentie
- Databricks Runtime 7.x en hoger:
COPY INTO
Aanvullende bronnen
- Zie
COPY INTOvoor veelvoorkomende gebruikspatronen, waaronder voorbeelden van meerdereCOPY INTObewerkingen voor dezelfde Delta-tabel.