다음을 통해 공유


Lakeflow Spark 선언적 파이프라인의 흐름 예제

예: 여러 Kafka 토픽의 스트리밍 테이블에 쓰기

다음 예제들에서는 이름이 kafka_target인 스트리밍 테이블을 만들고 두 개의 Kafka 토픽에서 스트리밍 테이블로 기록합니다.

파이썬

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dp.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )

SQL

CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

SQL 쿼리에서 read_kafka() 사용되는 테이블 반환 함수에 대한 자세한 내용은 SQL 언어 참조의 read_kafka 참조하세요.

Python에서는 프로그래밍 방식으로 단일 테이블을 대상으로 하는 여러 흐름을 만들 수 있습니다. 다음 예제에서는 Kafka 항목 목록에 대해 이 패턴을 보여 줍니다.

비고

이 패턴에는 루프를 사용하여 for 테이블을 만드는 것과 동일한 요구 사항이 있습니다. 흐름을 정의하는 함수에 Python 값을 명시적으로 전달해야 합니다. 루프에서 테이블 만들기를 for 참조하세요.

from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

  @dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
  def topic_flow(topic=topic_name):
    return (
      spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,...")
        .option("subscribe", topic)
        .load()
    )

예: 일회성 데이터 백필 실행

쿼리를 실행하여 기존 스트리밍 테이블에 데이터를 추가하려면 .를 사용합니다 append_flow.

기존 데이터 집합을 추가한 후 다음과 같은 여러 옵션이 있습니다.

  • 쿼리가 백필 디렉터리에 도착하는 경우 새 데이터를 추가하려면 쿼리를 그대로 둡니다.
  • 이를 한 번 백필하고 다시 실행하지 않으려면 파이프라인을 한 번 실행한 후 쿼리를 제거합니다.
  • 쿼리를 한 번 실행하고 데이터가 완전히 새로 고쳐지는 경우에만 다시 실행하려면 추가 흐름에서 매개 변수를 once 설정합니다True. SQL에서 INSERT INTO ONCE을(를) 사용하십시오.

다음 예제에서는 쿼리를 실행하여 기록 데이터를 스트리밍 테이블에 추가합니다.

파이썬

from pyspark import pipelines as dp

@dp.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dp.append_flow(
  target = "csv_target",
  once = True)
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")

SQL

CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  read_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO ONCE
  csv_target BY NAME
SELECT * FROM
  read_files(
    "path/to/backfill/data/dir",
    "csv"
  );

자세한 예제는 파이프라인을 사용하여 기록 데이터 백필을 참조하세요.

예: 대신 추가 흐름 처리 사용 UNION

다중 원본을 결합하고 단일 스트리밍 테이블에 쓸 수 있도록 UNION 절 쿼리를 사용하는 대신 어펜드 플로우 쿼리를 사용할 수 있습니다. 대신 추가 흐름 쿼리를 UNION 사용하면 전체 새로 고침을 실행하지 않고 여러 원본에서 스트리밍 테이블에 추가할 수 있습니다.

다음 Python 예제에는 여러 데이터 원본을 구문과 결합하는 쿼리에 UNION이(가) 포함되어 있습니다.

@dp.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")
  )

  raw_orders_eu = (
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")
  )

  return raw_orders_us.union(raw_orders_eu)

다음 예제에서는 UNION 쿼리를 추가 흐름 쿼리로 대체합니다.

파이썬

dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")

SQL

CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/us",
    format => "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/eu",
    format => "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  STREAM read_files(
    "/path/to/orders/apac",
    format => "csv"
  );