다음을 통해 공유


상태 저장소의 스키마 진화

이 문서에서는 상태 저장소의 스키마 진화 개요와 지원되는 스키마 변경 유형의 예제를 제공합니다.

상태 저장소의 스키마 진화란?

스키마 진화는 애플리케이션이 데이터 스키마에 대한 변경 내용을 처리하는 기능을 나타냅니다.

Azure Databricks는 사용하는 transformWithState구조적 스트리밍 애플리케이션에 대한 RocksDB 상태 저장소의 스키마 진화를 지원합니다.

스키마 진화는 개발 및 유지 관리의 용이성을 위한 유연성을 제공합니다. 스키마 진화를 사용하여 상태 정보를 손실하거나 기록 데이터를 완전히 다시 처리하지 않고도 상태 저장소의 데이터 모델 또는 데이터 형식을 조정합니다.

요구 사항

스키마 진화를 사용하려면 상태 저장소 인코딩 형식을 Avro로 설정해야 합니다. 현재 세션에 대해 이를 설정하려면 다음을 실행합니다.

spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

스키마 진화는 transformWithState 또는 transformWithStateInPandas을 사용하는 상태 저장 작업에 대해서만 지원됩니다. 이러한 연산자와 관련 API 및 클래스에는 다음과 같은 요구 사항이 있습니다.

  • Databricks Runtime 16.2 이상에서 사용할 수 있습니다.
  • 컴퓨팅은 전용 또는 격리되지 않은 액세스 모드를 사용해야 합니다.
  • RocksDB 상태 저장소 공급자를 사용해야 합니다. Databricks는 컴퓨팅 구성의 일부로 RocksDB를 사용하도록 설정하는 것이 좋습니다.
  • transformWithStateInPandas Databricks Runtime 16.3 이상에서 표준 액세스 모드를 지원합니다.

현재 세션에 대해 RocksDB 상태 저장소 공급자를 사용하도록 설정하려면 다음을 실행합니다.

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

상태 저장소에서 지원되는 스키마 진화 패턴

Databricks는 상태 저장 구조적 스트리밍 작업에 대해 다음과 같은 스키마 진화 패턴을 지원합니다.

패턴 설명
타입 확장 데이터 형식을 더 제한적인 형식에서 덜 제한적인 형식으로 변경합니다.
필드 추가 기존 상태 저장소 변수의 스키마에 새 필드를 추가합니다.
필드 제거 스키마 또는 상태 저장소 변수에서 기존 필드를 제거합니다.
필드 순서 다시 지정 변수의 필드 순서를 다시 지정합니다.
상태 변수 추가 애플리케이션에 새 상태 변수를 추가합니다.
상태 변수 제거 애플리케이션에서 기존 상태 변수를 제거합니다.

스키마 진화는 언제 발생하나요?

상태 저장소의 스키마 진화는 상태 저장 애플리케이션을 정의하는 코드를 업데이트한 결과입니다. 이 때문에 다음 문이 적용됩니다.

  • 쿼리에 대한 원본 데이터의 스키마 변경으로 인해 스키마가 자동으로 진화하지는 않습니다.
  • 스키마 진화는 새 버전의 애플리케이션이 배포될 때만 발생합니다. 스트리밍 쿼리의 한 버전만 동시에 실행할 수 있으므로 스트리밍 작업을 다시 시작하여 상태 변수에 대한 스키마를 발전시켜야 합니다.
  • 코드는 모든 상태 변수를 명시적으로 정의하고 모든 상태 변수에 대한 스키마를 설정합니다.
    • Scala에서 각 변수에 Encoder 대한 스키마를 지정하는 데 사용합니다.
    • Python에서는 스키마를 명시적으로 StructType 구성합니다.

지원되지 않는 스키마 진화 패턴

다음 스키마 진화 패턴은 지원되지 않습니다.

  • 필드 이름 바꾸기: 필드 이름이 일치하므로 필드 이름 바꾸기가 지원되지 않습니다. 필드 이름을 바꾸려는 시도는 필드를 제거하고 새 필드를 추가하여 처리됩니다. 이 작업을 수행해도 필드를 제거하고 추가할 수 있으므로 오류가 발생하지 않지만 원래 필드의 값은 새 필드로 전달되지 않습니다.

  • 키 이름을 바꾸거나 형식을 변경할 수 있습니다. 지도 상태 변수에서 키의 이름 또는 형식을 변경할 수 없습니다.

  • 형식 축소다운캐스팅이라고도 하는 형식 축소 작업은 지원되지 않습니다. 이러한 작업으로 인해 데이터가 손실될 수 있습니다. 다음은 지원되지 않는 형식 축소 작업의 예입니다.

    • double로 좁힐 수 없습니다.floatlongint
    • float은(는) long 또는 int로 좁힐 수 없습니다.
    • long 로 좁힐 수 없습니다. int

상태 저장소의 형식 확장

기본 데이터 형식을 더 많은 수용 형식으로 확장할 수 있습니다. 지원되는 형식 확대 변경은 다음과 같습니다.

  • int로 승격할 수 있습니다. longfloat또는double
  • long으로 승격될 수 있습니다.floatdouble
  • float 으로 승격할 수 있습니다. double
  • string 으로 승격할 수 있습니다. bytes
  • bytes 으로 승격할 수 있습니다. string

기존 값은 새 형식으로 업캐스트됩니다. 예를 들어 1212.00이 됩니다.

transformWithState을 사용한 타입 확장의 예

스칼라

// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV2(value.toLong))
      value
    }
  }
}

파이썬

class IntStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with Integer field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to integer and update state
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class LongStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with Long field (type widening)
        state_schema = StructType([
            StructField("value1", LongType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to long and update state
            value = pdf["value"].iloc[0]
            # When reading state written with IntStateProcessor,
            # it will be automatically converted to Long
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

상태 저장소 값에 필드 추가

기존 상태 저장소 값의 스키마에 새 필드를 추가할 수 있습니다.

이전 스키마로 작성된 데이터를 읽을 때 Avro 인코더는 기본적으로 로 인코딩된 null추가 필드에 대한 데이터를 반환합니다.

Python은 항상 이러한 값을 .로 None해석합니다. Scala는 필드의 형식에 따라 다른 기본 동작을 가합니다. Databricks는 Scala가 누락된 데이터에 대한 값을 표시하지 않도록 논리를 구현하는 것이 좋습니다. 상태 변수에 추가된 필드의 기본값을 참조하세요.

을 사용하여 새 필드를 추가하는 예제 transformWithState

스칼라

// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1),
      // it will be automatically converted to StateV2(1, null)
      val currentState = state.get()
      // Now update with both fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

파이썬

class StateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class StateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Later schema with additional fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Read current state
            current_state = self.state.get()
            # When reading state written with StateV1(1),
            # it will be automatically converted to StateV2(1, None)
            value1 = current_state[0]
            value2 = current_state[1]

            # Now update with both fields populated
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

필드를 제거하여 상태 저장소의 값을 설정합니다.

기존 변수의 스키마에서 필드를 제거할 수 있습니다. 이전 스키마를 사용하여 데이터를 읽을 때는 이전 데이터에 있지만 새 스키마에는 없는 필드가 무시됩니다.

상태 변수에서 필드를 제거하는 예제

스칼라

// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1)
      val currentState = state.get()
      state.update(StateV2(value.toInt))
      value
    }
  }
}

파이썬

class RemoveFieldsOriginalProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with multiple fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class RemoveFieldsReducedProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with field removed
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
            # it will be automatically converted to just (1,)
            current_state = self.state.get()
            value1 = current_state[0]

            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]]
        })

상태 변수의 필드 순서 다시 지정

기존 필드를 추가하거나 제거하는 경우를 포함하여 상태 변수에서 필드의 순서를 변경할 수 있습니다. 상태 변수의 필드는 위치가 아니라 이름으로 일치합니다.

상태 변수의 필드 순서를 다시 지정하는 예제

스칼라

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2("metadata-1", 1)
      val currentState = state.get()
      state.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

파이썬

class OrderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with fields in original order
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ReorderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with reordered fields
        state_schema = StructType([
            StructField("value2", StringType(), True),
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
            # it will be automatically converted to ("metadata-1", 1)
            current_state = self.state.get()
            value2 = current_state[0]
            value1 = current_state[1]

            self.state.update((f"new-metadata-{value}", int(value)))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value2": [current_state[0]],
            "value1": [current_state[1]]
        })

상태 저장 애플리케이션에 상태 변수 추가

쿼리 실행 사이에 상태 변수를 추가할 수도 있습니다.

참고: 이 패턴에는 Avro 인코더가 필요하지 않으며 모든 transformWithState 애플리케이션에서 지원됩니다.

상태 저장 애플리케이션에 상태 변수를 추가하는 예제

스칼라

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

파이썬

class MultiStateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single state variable
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

상태 저장 애플리케이션에서 상태 변수 제거

필드를 제거하는 것 외에도 쿼리 실행 사이에 상태 변수를 제거할 수도 있습니다.

참고: 이 패턴에는 Avro 인코더가 필요하지 않으며 모든 transformWithState 애플리케이션에서 지원됩니다.

상태 저장 애플리케이션에 상태 변수를 제거하는 예제

스칼라

case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
      value
    }
  }
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    // delete old state variable that we no longer need
    getHandle.deleteIfExists("testState2")
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

파이썬

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

class RemoveStateVarProcessor(StatefulProcessor):
    def init(self, handle):
        # Only use one state variable and delete the other
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

        # Delete old state variable that we no longer need
        handle.deleteIfExists("testState2")

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

상태 변수에 추가된 필드의 기본값

기존 상태 변수에 새 필드를 추가하면 이전 스키마를 사용하여 작성된 상태 변수의 동작은 다음과 같습니다.

  • Avro 인코더는 추가된 null 필드에 대한 값을 반환합니다.
  • Python은 이러한 값을 모든 데이터 형식에 대해 None로 변환합니다.
  • Scala 기본 동작은 데이터 형식에 따라 다릅니다.
    • 참조 형식은 null을 반환합니다.
    • 기본 형식은 기본 형식에 따라 다른 기본값을 반환합니다. 예를 들어, 0 유형에 대한 int 또는 false 유형에 대한 bool 등이 포함됩니다.

스키마 진화를 통해 추가된 필드에 플래그를 지정하는 기본 제공 기능이나 메타데이터는 없습니다. 이전 스키마에 존재하지 않는 필드에 대해 반환된 null 값을 처리하는 논리를 구현해야 합니다.

Scala의 경우, 형식 기본값을 사용하는 대신, Option[<Type>]을 사용하여 누락된 값을 None으로 반환할 수 있습니다.

스키마 진화로 인해 형식 값이 반환되는 None 상황을 올바르게 처리하려면 논리를 구현해야 합니다.

상태 변수에 추가된 필드의 기본값 예

스칼라

// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // Reading from state
      val currentState = state.get()

      // Showing how null defaults work for different types
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
      println(s"Current state: $currentState")

      // For primitive types like Long, the UnsafeRow default for null is 0
      val longValue = if (currentState.value3 == 0L) {
        println("The value3 field is the default value (0)")
        100L // Set a real value now
      } else {
        currentState.value3
      }

      // Now update with all fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
      value
    }
  }
}

파이썬

class NullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ExpandedNullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Evolution: Adding new fields with null/default values
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True),
            StructField("value3", LongType(), True),
            StructField("value4", IntegerType(), True),
            StructField("value5", BooleanType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Reading from state
            current_state = self.state.get()

            # Showing how null defaults work in Python
            # When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
            # it will be automatically converted to (1, "metadata-1", None, None, None)
            # In Python, both primitive and reference types will be None

            value1 = current_state[0]
            value2 = current_state[1]
            value3 = current_state[2]  # Will be None when evolved from older schema
            value4 = current_state[3]  # Will be None when evolved from older schema
            value5 = current_state[4]  # Will be None when evolved from older schema

            # Check if value3 is None
            if value3 is None:
                print("The value3 field is None (default value for evolution)")
                value3 = 100  # Set a real value now

            # Now update with all fields populated
            self.state.update((
                value1,
                value2,
                value3,
                value4 if value4 is not None else 42,
                value5 if value5 is not None else True
            ))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]],
            "value3": [current_state[2]],
            "value4": [current_state[3]],
            "value5": [current_state[4]]
        })

제한점

다음 표에서는 스키마 진화 변경에 대한 기본 제한에 대해 설명합니다.

설명 기본 제한 재정의할 Spark 설정 구성
상태 변수에 대한 스키마가 진화합니다. 쿼리 다시 시작에서 여러 스키마 변경 내용을 적용하는 것은 단일 스키마 진화로 계산됩니다. 16 spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold
스트리밍 쿼리에 대한 스키마 진화. 쿼리 다시 시작에서 여러 스키마 변경 내용을 적용하는 것은 단일 스키마 진화로 계산됩니다. 128 spark.sql.streaming.stateStore.maxNumStateSchemaFiles

상태 변수에 대한 스키마 진화 문제를 해결할 때 다음 세부 정보를 신중하게 고려합니다.