Compartilhar via


CREATE STREAMING TABLE

Aplica-se a:marca de seleção Sim Databricks SQL

Cria uma tabela de streaming, uma tabela Delta com suporte extra para streaming ou processamento de dados incremental.

As tabelas de streaming são suportadas apenas nos Pipelines Declarativos do Lakeflow Spark e no Databricks SQL com Unity Catalog. Executar esse comando na computação do Databricks Runtime com suporte analisa apenas a sintaxe. Consulte Desenvolver código de pipelines declarativos do Lakeflow Spark com SQL.

Sintaxe

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( { column_identifier column_type [column_properties] } [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

column_properties
  { NOT NULL |
    COMMENT column_comment |
    column_constraint |
    MASK clause } [ ... ]

table_clauses
  { PARTITIONED BY (col [, ...]) |
    CLUSTER BY clause |
    COMMENT table_comment |
    DEFAULT COLLATION UTF8_BINARY |
    TBLPROPERTIES clause |
    schedule |
    WITH { ROW FILTER clause } } [...]

schedule
  { SCHEDULE [ REFRESH ] schedule_clause |
    TRIGGER ON UPDATE [ AT MOST EVERY trigger_interval ] }

schedule_clause
  { EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS } |
  CRON cron_string [ AT TIME ZONE timezone_id ]}

Parâmetros

  • REFRESH

    Se especificado, atualiza a tabela com os dados mais recentes disponíveis nas fontes definidas na consulta. Somente novos dados que chegam antes do início da consulta são processados. Novos dados que são adicionados às fontes durante a execução do comando são ignorados até a próxima atualização. A operação de atualização de CREATE OR REFRESH é totalmente declarativa. Se um comando de atualização não especificar todos os metadados da instrução de criação da tabela original, os metadados não especificados serão excluídos.

  • SE NÃO EXISTIR

    Cria a tabela de streaming se ela não existir. Se já houver uma tabela com esse nome, a instrução CREATE STREAMING TABLE será ignorada.

    Você pode especificar no máximo uma opção: IF NOT EXISTS ou OR REFRESH.

  • table_name

    O nome da tabela a ser criada. O nome não deve incluir uma especificação temporal ou uma especificação de opções. Se o nome não for qualificado, a tabela será criada no esquema atual.

  • especificação_da_tabela

    Esta cláusula opcional define a lista de colunas e os tipos, as propriedades, as descrições e as restrições de coluna associados.

    Se você não definir colunas no esquema da tabela, deverá especificar AS query.

    • column_identifier

      Um nome exclusivo para a coluna.

      • tipo_de_coluna

        Especifica o tipo dos dados da coluna.

      • NÃO NULO

        Se especificado, a coluna não aceitará valores NULL.

      • COMENTÁRIO column_comment

        Um literal de cadeia de caracteres para descrever a coluna.

      • column_constraint

        Importante

        Esse recurso está em uma versão prévia.

        Adiciona uma restrição de chave primária ou chave estrangeira à coluna em uma tabela de streaming. Restrições não são suportadas para tabelas no catálogo hive_metastore.

      • Cláusula MASK

        Adiciona uma função para mascarar colunas e anonimizar dados sensíveis. Todas as consultas provenientes dessa coluna recebem o resultado da avaliação dessa função sobre a coluna no lugar do valor original da coluna. Isso pode ser útil para fins de controle de acesso refinado em que a função pode inspecionar as associações de identidade ou grupo do usuário invocando para decidir se deseja rasurar o valor.

      • CONSTRAINT expectation_name ESPERE (expressão_de_expectativa) [ EM CASO DE VIOLAÇÃO { FALHA UPDATE | DESCARTAR LINHA } ]

        Adiciona expectativas de qualidade de dados à tabela. Essas expectativas de qualidade de dados podem ser controladas ao longo do tempo e acessadas por meio do log de eventos da tabela de streaming. Uma FAIL UPDATE expectativa causa a falha do processamento tanto ao criar a tabela quanto ao atualizá-la. Uma expectativa DROP ROW faz com que toda a linha seja removida se a expectativa não for atendida.

        expectation_expr pode ser composto por literais, identificadores de coluna dentro da tabela e funções ou operadores determinísticos e internos do SQL, exceto:

        Além disso, expr não deve conter nenhuma subconsulta.

      • restrição_de_tabela

        Importante

        Esse recurso está em uma versão prévia.

        Adiciona restrições de chave primária informativa ou chave estrangeira informativa a uma tabela de streaming. As restrições de chave não têm suporte para tabelas no catálogo hive_metastore.

  • tabela_cláusulas

    Opcionalmente, especifique particionamento, comentários, propriedades definidas pelo usuário e uma agenda de atualização para a nova tabela. Cada subcláusula só pode ser especificada uma vez.

    • PARTICIONADO POR

      Uma lista opcional de colunas para particionar a tabela.

      Observação

      O agrupamento líquido oferece uma solução flexível e otimizada para agrupamento. Considere usar CLUSTER BY em vez de PARTITIONED BY para tabelas de streaming.

    • CLUSTER BY

      Uma cláusula opcional para agrupar por um subconjunto de colunas. Use o clustering líquido automático com CLUSTER BY AUTOe o Databricks escolhe de forma inteligente as chaves de clustering para otimizar o desempenho da consulta. Consulte Usar clustering líquido para tabelas.

      O clustering líquido não pode ser combinado com PARTITIONED BY.

    • COMENTÁRIO table_comment

      Um STRING literal para descrever a tabela.

    • COLAÇÃO PADRÃO UTF8_BINARY

      Aplica-se a:check marked yes Databricks SQL check marked yes Databricks Runtime 17.1 and above

      Força a ordenação padrão da tabela de streaming para UTF8_BINARY. Essa cláusula será obrigatória se o esquema no qual a tabela é criada tiver uma ordenação padrão diferente de UTF8_BINARY. A ordenação padrão da tabela de streaming é usada como a ordenação padrão dentro do query e para tipos de coluna.

    • TBLPROPERTIES

      Opcionalmente, define uma ou mais propriedades definidas pelo usuário.

      Use essa configuração para especificar o canal de runtime do Lakeflow Spark Declarative Pipelines usado para executar essa instrução. Defina o valor da pipelines.channel propriedade como "PREVIEW" ou "CURRENT". O valor padrão é "CURRENT". Para obter mais informações sobre os canais do Lakeflow Spark Declarative Pipelines, consulte os canais de runtime do Lakeflow Spark Declarative Pipelines.

    • horário

      A agenda pode ser uma SCHEDULE instrução ou uma TRIGGER instrução.

      • SCHEDULE [ REFRESH ] schedule_clause

        • EVERY number { HOUR | HOURS | DAY | DAYS | WEEK | WEEKS }

          Para agendar uma atualização que ocorre periodicamente, use EVERY sintaxe. Se a sintaxe EVERY for especificada, a tabela de streaming ou a visão materializada será atualizada periodicamente no intervalo especificado com base no valor fornecido, como HOUR, HOURS, DAY, DAYS, WEEK ou WEEKS. A tabela a seguir lista os valores inteiros aceitos para number.

          Unidade de tempo Valor inteiro
          HOUR or HOURS <1 = H <= 72
          DAY or DAYS <1 = D <= 31
          WEEK or WEEKS <1 = W <= 8

          Observação

          As formas singular e plural da unidade de tempo incluída são semanticamente equivalentes.

        • CRON cron_string [ AT TIME ZONE timezone_id ]

          Para agendar uma atualização usando um valor de Quartz Cron. Valores de time_zone_values válidos são aceitos. Não há suporte para AT TIME ZONE LOCAL.

          Se AT TIME ZONE estiver ausente, o fuso horário da sessão será usado. Se AT TIME ZONE estiver ausente e o fuso horário da sessão não estiver definido, um erro será lançado. SCHEDULE é semanticamente equivalente a SCHEDULE REFRESH.

        O agendamento pode ser fornecido como parte do comando CREATE. Use ALTER STREAMING TABLE ou execute o comando CREATE OR REFRESH com a cláusula SCHEDULE para alterar o cronograma de uma tabela de streaming após a criação.

      • GATILHO ATIVADO UPDATE [ NO MÁXIMO A CADA TRIGGER_INTERVAL ]

        Importante

        O TRIGGER ON UPDATE recurso está em Beta.

        Opcionalmente, defina a tabela para atualizar quando uma fonte de dados upstream for atualizada, no máximo uma vez a cada minuto. Defina um valor para AT MOST EVERY exigir pelo menos um tempo mínimo entre as atualizações.

        As fontes de dados upstream devem ser tabelas Delta externas ou gerenciadas (incluindo exibições materializadas ou tabelas de streaming) ou exibições gerenciadas cujas dependências são limitadas a tipos de tabela compatíveis.

        Habilitar eventos de arquivo pode tornar os gatilhos com mais desempenho e aumenta alguns dos limites de atualizações de gatilho.

        A trigger_interval instrução INTERVAL é de pelo menos 1 minuto.

        TRIGGER ON UPDATE tem as seguintes limitações

        • Não mais do que 10 fontes de dados de origem por tabela de streaming ao usar TRIGGER ON UPDATE.
        • No máximo 1000 tabelas de streaming ou exibições materializadas podem ser especificadas com TRIGGER ON UPDATE.
        • A AT MOST EVERY cláusula tem o padrão de 1 minuto e não pode ter menos de 1 minuto.
  • Cláusula ROW FILTER WITH

    Adiciona uma função de filtro de linha à tabela. Todas as consultas subsequentes dessa tabela recebem um subconjunto das linhas em que a função é avaliada como TRUE booliano. Isso pode ser útil para fins de controle de acesso refinado, em que a função pode inspecionar a identidade ou as associações de grupo do usuário que a invocou para decidir se deseja filtrar algumas linhas.

  • Consulta de sistema autônomo

    Essa cláusula preenche a tabela usando os dados de query. Essa consulta deve ser uma consulta de streaming. Isso pode ser feito adicionando a palavra-chave STREAM a qualquer relação que você queira processar incrementalmente. Quando você especifica um query e um table_specification juntos, o esquema de tabela especificado em table_specification deve conter todas as colunas retornadas pelo query, caso contrário, você receberá um erro. Todas as colunas especificadas em table_specification, mas não retornadas por query retornam valores null, quando consultadas.

Diferenças entre tabelas de streaming e outras tabelas

As tabelas de streaming são tabelas com estado, projetadas para lidar com cada linha apenas uma vez à medida que você processa um conjunto de dados em crescimento. Como a maioria dos conjuntos de dados cresce continuamente ao longo do tempo, as tabelas de streaming são boas para a maioria das cargas de trabalho de ingestão. As tabelas de streaming são ideais para pipelines que exigem atualização de dados e baixa latência. As tabelas de streaming também podem ser úteis para transformações de escala maciça, pois os resultados podem ser calculados incrementalmente à medida que novos dados chegam, mantendo os resultados atualizados sem a necessidade de recompor totalmente todos os dados de origem a cada atualização. As tabelas de streaming são projetadas para fontes de dados que são somente acréscimo.

As tabelas de streaming aceitam comandos adicionais, como REFRESH, que processa os dados mais recentes disponíveis nas fontes fornecidas na consulta. As alterações na consulta fornecida só são refletidas em novos dados chamando um REFRESH, não dados processados anteriormente. Para aplicar as alterações nos dados existentes também, você precisa executar REFRESH TABLE <table_name> FULL para executar um FULL REFRESH. As atualizações completas reprocessam todos os dados disponíveis na origem com a definição mais recente. Não é recomendável chamar atualizações completas em fontes que não mantêm todo o histórico dos dados ou têm períodos de retenção curtos, como Kafka, pois a atualização completa trunca os dados existentes. Talvez não seja possível recuperar dados antigos se os dados não estiverem mais disponíveis na origem.

Filtros de linha e máscaras de coluna

Os filtros de linha permitem especificar uma função que se aplica como um filtro sempre que uma verificação de tabela busca linhas. Esses filtros garantem que as consultas subsequentes retornem apenas linhas para as quais o predicado de filtro é avaliado como true.

As máscaras de coluna permitem mascarar os valores de uma coluna sempre que uma verificação de tabela busca linhas. Todas as consultas futuras envolvendo essa coluna receberão o resultado da avaliação da função sobre a coluna, substituindo o valor original da coluna.

Para obter mais informações sobre como usar filtros de linha e máscaras de coluna, consulte filtros de linha e máscaras de coluna.

Gerenciando filtros de linha e máscaras de coluna

Filtros de linha e máscaras de coluna em tabelas de transmissão devem ser adicionados, atualizados ou removidos por meio da instrução CREATE OR REFRESH.

Comportamento

  • Atualizar como Definidor: Quando as instruções CREATE OR REFRESH ou REFRESH atualizam uma tabela de streaming, as funções de filtro de linha são executadas com os direitos do definidor (como o proprietário da tabela). Isso significa que a atualização da tabela usa o contexto de segurança do usuário que criou a tabela de transmissão.
  • Consulta: embora a maioria dos filtros seja executada com os direitos do definidor, as funções que verificam o contexto do usuário (como CURRENT_USER e IS_MEMBER) são exceções. Essas funções são executadas como o invocador. Essa abordagem impõe controles de acesso e segurança de dados específicos do usuário com base no contexto do usuário atual.

Observabilidade

Use DESCRIBE EXTENDED, INFORMATION_SCHEMA ou o Explorador de Catálogo para examinar os filtros de linha e máscaras de coluna existentes que se aplicam a uma determinada tabela de transmissão. Essa funcionalidade permite que os usuários auditem e examinem as medidas de acesso e proteção de dados em tabelas de transmissão.

Limitações

  • Somente os proprietários de tabelas podem atualizar tabelas de streaming para obter os dados mais recentes.
  • ALTER TABLE os comandos não são permitidos em tabelas de streaming. A definição e as propriedades da tabela devem ser alteradas por meio da instrução CREATE OR REFRESH ou ALTER STREAMING TABLE.
  • Não há suporte para a evolução do esquema de tabela por meio de comandos DML como INSERT INTO e MERGE.
  • Não há suporte para os seguintes comandos em tabelas de streaming:
    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Não há suporte para o Compartilhamento Delta.
  • Não há suporte para renomear a tabela ou alterar o proprietário.
  • Restrições de tabela, como PRIMARY KEY e FOREIGN KEY não têm suporte para tabelas de streaming no hive_metastore catálogo.
  • Não há suporte para colunas geradas, colunas de identidade e colunas padrão.

Exemplos

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Creates a streaming table that scheduled to refresh when upstream data is updated.
-- The refresh frequency of triggered_data is at most once an hour.
> CREATE STREAMING TABLE triggered_data
  TRIGGER ON UPDATE AT MOST EVERY INTERVAL 1 hour
  AS SELECT *
  FROM STREAM source_stream_data;

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE EVERY 1 HOUR
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing EXPECT (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Sets the runtime channel to "PREVIEW"
> CREATE STREAMING TABLE st_preview
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  AS SELECT * FROM STREAM sales;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a row filter and a column mask
> CREATE OR REFRESH STREAMING TABLE masked_csv_data (
    id int,
    name string,
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn
  )
  WITH ROW FILTER catalog.schema.us_filter_fn ON (region)
  AS SELECT *
  FROM STREAM read_files('s3://bucket/path/sensitive_data')