Freigeben über


Schemaentwicklung im Zustandsspeicher

Dieser Artikel enthält eine Übersicht über die Schemaentwicklung im Zustandsspeicher und Beispiele für Typen unterstützter Schemaänderungen.

Was ist die Schemaentwicklung im Zustandsspeicher?

Die Schemaentwicklung bezieht sich auf die Fähigkeit einer Anwendung, Änderungen am Datenschema zu behandeln.

Azure Databricks unterstützt Schema-Evolution im RocksDB-Zustandsspeicher für strukturierte Streaming-Anwendungen, die transformWithState verwenden.

Die Schemaentwicklung bietet Flexibilität für die Entwicklung und einfache Wartung. Verwenden Sie die Schemaentwicklung, um das Datenmodell oder die Datentypen in Ihrem Zustandsspeicher anzupassen, ohne Zustandsinformationen zu verlieren oder die vollständige Verarbeitung von historischen Daten zu erfordern.

Anforderungen

Sie müssen das Codierungsformat für den Zustandsspeicher auf Avro festlegen, um die Schemaentwicklung zu verwenden. Führen Sie folgendes aus, um dies für die aktuelle Sitzung festzulegen:

spark.conf.set("spark.sql.streaming.stateStore.encodingFormat", "avro")

Die Schemaevolution wird nur für zustandsbehaftete Vorgänge unterstützt, die transformWithState oder transformWithStateInPandas verwenden. Diese Operatoren und die zugehörigen APIs und Klassen haben die folgenden Anforderungen:

  • Verfügbar in Databricks Runtime 16.2 und höher.
  • Compute muss den dedizierten oder nicht isolierten Zugriffsmodus verwenden.
  • Sie müssen den RocksDB-Statusspeicheranbieter verwenden. Databricks empfiehlt die Aktivierung von RocksDB als Teil der Computekonfiguration.
  • transformWithStateInPandas unterstützt den Standardzugriffsmodus in Databricks Runtime 16.3 und höher.

Führen Sie die folgenden Schritte aus, um den RocksDB-Statusspeicheranbieter für die aktuelle Sitzung zu aktivieren:

spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")

Unterstützte Schemaentwicklungsmuster im Zustandsspeicher

Databricks unterstützt die folgenden Schemaentwicklungsmuster für zustandsbehaftete strukturierte Streaming-Vorgänge.

Muster Description
Typerweiterung Ändern Sie Datentypen von restriktiveren zu weniger restriktiven Typen.
Hinzufügen von Feldern Fügen Sie dem Schema vorhandener Zustandsspeichervariablen neue Felder hinzu.
Entfernen von Feldern Entfernen Sie vorhandene Felder aus dem Schema oder einer Zustandsspeichervariable.
Neuanordnen von Feldern Neuanordnen von Feldern in einer Variablen.
Hinzufügen von Statusvariablen Fügen Sie einer Anwendung eine neue Zustandsvariable hinzu.
Entfernen von Statusvariablen Entfernen Sie eine vorhandene Zustandsvariable aus einer Anwendung.

Wann tritt die Schemaentwicklung auf?

Die Schemaentwicklung im Zustandsspeicher ergibt sich aus dem Aktualisieren des Codes, der Ihre zustandsbehaftete Anwendung definiert. Aus diesem Gründen gelten die folgenden Anweisungen:

  • Die Schemaentwicklung erfolgt nicht automatisch als Ergebnis von Schemaänderungen in den Quelldaten für die Abfrage.
  • Die Schemaentwicklung erfolgt nur, wenn eine neue Version der Anwendung bereitgestellt wird. Da nur eine Version einer Streamingabfrage gleichzeitig ausgeführt werden kann, müssen Sie den Streamingauftrag neu starten, um das Schema für Zustandsvariablen zu entwickeln.
  • Ihr Code definiert explizit alle Zustandsvariablen und legt das Schema für alle Zustandsvariablen fest.
    • In Scala verwenden Sie ein Encoder , um das Schema für jede Variable anzugeben.
    • In Python erstellen Sie explizit ein Schema als ein StructType.

Nicht unterstützte Schemaentwicklungsmuster

Die folgenden Schemaentwicklungsmuster werden nicht unterstützt:

  • Feldumbenennung: Das Umbenennen von Feldern wird nicht unterstützt, da Felder nach Namen abgeglichen werden. Beim Versuch, ein Feld umzubenennen, wird das Feld entfernt und ein neues Feld hinzugefügt. Dieser Vorgang führt nicht zu einem Fehler, da das Entfernen und Hinzufügen von Feldern zulässig ist, aber die Werte aus dem ursprünglichen Feld werden nicht in das neue Feld übertragen.

  • Kann Schlüsselumbenennung oder Typänderungen vornehmen: Sie können den Namen oder den Typ von Schlüsseln in Kartenzustandsvariablen nicht ändern.

  • Typverengung Typverengungsvorgänge, auch als Downcasting bezeichnet, werden nicht unterstützt. Diese Vorgänge können zu Datenverlust führen. Im Folgenden sind Beispiele für nicht unterstützte Typeninschränkungsvorgänge aufgeführt:

    • double kann nicht auf float, long oder int eingeengt werden.
    • float kann weder auf long noch auf int eingegrenzt werden.
    • long kann nicht eingegrenzt werden auf int

Typweiterung im Zustandsspeicher

Sie können primitive Datentypen auf umfangreichere Typen vergrößern. Die folgenden Erweiterungsänderungen werden unterstützt:

  • int kann zu long, zu float oder zu double höher gestuft werden.
  • long kann zu float oder double höher gestuft werden
  • float kann zu double befördert werden
  • string kann zu bytes befördert werden
  • bytes kann zu string befördert werden

Vorhandene Werte werden in den neuen Typ konvertiert. Beispielsweise wird 12 zu 12.00.

Beispiel für eine Typverbreiterung mit transformWithState

Scala

// Initial run with Integer field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with Long field (type widening)
case class StateV2(value1: Long)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV2(value.toLong))
      value
    }
  }
}

Python

class IntStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with Integer field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to integer and update state
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class LongStateProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with Long field (type widening)
        state_schema = StructType([
            StructField("value1", LongType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            # Convert input value to long and update state
            value = pdf["value"].iloc[0]
            # When reading state written with IntStateProcessor,
            # it will be automatically converted to Long
            self.state.update((int(value),))

        # Read current state
        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

Hinzufügen von Feldern zu Zustandsspeicherwerten

Sie können dem Schema vorhandener Zustandsspeicherwerte neue Felder hinzufügen.

Beim Lesen von Daten, die mit dem alten Schema geschrieben wurden, gibt der Avro-Encoder Daten für hinzugefügte Felder zurück, die nativ codiert sind als null.

Python interpretiert diese Werte immer als None. Scala weist je nach Typ des Feldes unterschiedliches Standard-Verhalten auf. Databricks empfiehlt die Implementierung von Logik, um sicherzustellen, dass Scala keine Werte für fehlende Daten angibt. Siehe Standardwerte für Felder, die der Zustandsvariable hinzugefügt wurden.

Beispiele für das Hinzufügen neuer Felder mit transformWithState

Scala

// Initial run with single field
case class StateV1(value1: Integer)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt))
      value
    }
  }
}

// Later run with additional field
case class StateV2(value1: Integer, value2: String)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1),
      // it will be automatically converted to StateV2(1, null)
      val currentState = state.get()
      // Now update with both fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Python

class StateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single field
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "stateValue": [current_state[0]]
        })

class StateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Later schema with additional fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Read current state
            current_state = self.state.get()
            # When reading state written with StateV1(1),
            # it will be automatically converted to StateV2(1, None)
            value1 = current_state[0]
            value2 = current_state[1]

            # Now update with both fields populated
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Felder entfernen, um Werte im Zustandsspeicher zu speichern

Sie können Felder aus dem Schema einer vorhandenen Variablen entfernen. Beim Lesen von Daten mit dem alten Schema werden Felder in den alten Daten, aber nicht im neuen Schema ignoriert.

Beispiele für das Entfernen von Feldern aus Statusvariablen

Scala

// Initial run with multiple fields
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with field removed
case class StateV2(value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1)
      val currentState = state.get()
      state.update(StateV2(value.toInt))
      value
    }
  }
}

Python

class RemoveFieldsOriginalProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with multiple fields
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class RemoveFieldsReducedProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with field removed
        state_schema = StructType([
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with RemoveFieldsOriginalProcessor(1, "metadata-1"),
            # it will be automatically converted to just (1,)
            current_state = self.state.get()
            value1 = current_state[0]

            self.state.update((int(value),))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]]
        })

Neu anordnen von Feldern in einer Zustandsvariable

Sie können Felder in einer Zustandsvariable neu anordnen, z. B. beim Hinzufügen oder Entfernen vorhandener Felder. Felder in Zustandsvariablen werden anhand des Namens und nicht mit der Position abgeglichen.

Beispiele für das Neuanordnen von Feldern in einer Zustandsvariable

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Later run with reordered fields
case class StateV2(value2: String, value1: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2("metadata-1", 1)
      val currentState = state.get()
      state.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Python

class OrderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with fields in original order
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ReorderedFieldsProcessor(StatefulProcessor):
    def init(self, handle):
        # Later schema with reordered fields
        state_schema = StructType([
            StructField("value2", StringType(), True),
            StructField("value1", IntegerType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # When reading state written with OrderedFieldsProcessor(1, "metadata-1"),
            # it will be automatically converted to ("metadata-1", 1)
            current_state = self.state.get()
            value2 = current_state[0]
            value1 = current_state[1]

            self.state.update((f"new-metadata-{value}", int(value)))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value2": [current_state[0]],
            "value1": [current_state[1]]
        })

Hinzufügen einer Statusvariable zu einer zustandsbehafteten Anwendung

Wir können auch Zustandsvariablen zwischen Abfrageausführungen hinzufügen.

Hinweis: Dieses Muster erfordert keinen Avro-Encoder und wird von allen transformWithState Anwendungen unterstützt.

Beispiel für das Hinzufügen einer Zustandsvariablen zu einer zustandsbehafteten Anwendung

Scala

// Initial run with fields in original order
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

case class StateV2(value1: String, value2: Integer)

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(s"new-metadata-${value}", value.toInt))
      value
    }
  }
}

Python

class MultiStateV1Processor(StatefulProcessor):
    def init(self, handle):
        # Initial schema with a single state variable
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

Entfernen einer Statusvariable aus einer zustandsbehafteten Anwendung

Zusätzlich zum Entfernen von Feldern können Sie auch Zustandsvariablen zwischen Abfrageausführungen entfernen.

Hinweis: Dieses Muster erfordert keinen Avro-Encoder und wird von allen transformWithState Anwendungen unterstützt.

Beispiel für das Entfernen einer Zustandsvariablen in eine zustandsbehaftete Anwendung

Scala

case class StateV1(value1: Integer, value2: String)
case class StateV2(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _
  @transient var state2: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    state2 = getHandle.getValueState[StateV2](
      "testState2",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      val currentState2 = state2.get()
      state2.update(StateV2(value.toInt, s"new-metadata-${value}"))
      value
    }
  }
}

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state1: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state1 = getHandle.getValueState[StateV1](
      "testState1",
      Encoders.product[StateV1],
      TTLConfig.NONE)
    // delete old state variable that we no longer need
    getHandle.deleteIfExists("testState2")
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state1.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

Python

class MultiStateV2Processor(StatefulProcessor):
    def init(self, handle):
        # Add a second state variable
        state1_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        state2_schema = StructType([
            StructField("value1", StringType(), True),
            StructField("value2", IntegerType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state1_schema)
        self.state2 = handle.getValueState("testState2", state2_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

            # Access and update the new state variable
            current_state2 = self.state2.get()  # Will be None on first run
            self.state2.update((f"new-metadata-{value}", int(value)))

        current_state1 = self.state1.get()
        current_state2 = self.state2.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "state1_value1": [current_state1[0]],
            "state1_value2": [current_state1[1]],
            "state2_value1": [current_state2[0]],
            "state2_value2": [current_state2[1]]
        })

class RemoveStateVarProcessor(StatefulProcessor):
    def init(self, handle):
        # Only use one state variable and delete the other
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state1 = handle.getValueState("testState1", state_schema)

        # Delete old state variable that we no longer need
        handle.deleteIfExists("testState2")

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state1.update((int(value), f"metadata-{value}"))

        current_state = self.state1.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

Standardwerte für Felder, die der Zustandsvariable hinzugefügt wurden

Wenn Sie einer vorhandenen Zustandsvariable neue Felder hinzufügen, weisen Zustandsvariablen, die mit dem alten Schema geschrieben wurden, das folgende Verhalten auf:

  • Der Avro-Encoder gibt einen null Wert für hinzugefügte Felder zurück.
  • Python konvertiert diese Werte für alle Datentypen in None .
  • Das Standardverhalten von Scala unterscheidet sich je nach Datentyp.
    • Verweistypen geben null zurück.
    • Grundtypen geben einen Standardwert zurück, der sich je nach Grundtyp unterscheidet. Beispiele umfassen 0 für int Typen oder false für bool Typen.

Es gibt keine integrierte Funktionalität oder Metadaten, die das Feld als durch die Schemaentwicklung hinzugefügt kennzeichnen. Sie müssen Logik implementieren, um NULL-Werte zu behandeln, die für Felder zurückgegeben werden, die in Ihrem vorherigen Schema nicht vorhanden waren.

Bei Scala können Sie die Imputation von Standardwerten vermeiden, indem Sie Option[<Type>] verwenden, um fehlende Werte als None zurückzugeben, anstelle der Standardwerte des Typs.

Sie müssen Logik implementieren, um Situationen ordnungsgemäß zu behandeln, in denen None Typwerte aufgrund der Schemaentwicklung zurückgegeben werden.

Beispiel für Standardwerte für hinzugefügte Felder zu einer Zustandsvariable

Scala

// Example demonstrating how null defaults work in schema evolution

import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders

// Initial schema that will be evolved
case class StateV1(value1: Integer, value2: String)

class ProcessorV1 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV1] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV1](
      "testState",
      Encoders.product[StateV1],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      state.update(StateV1(value.toInt, s"metadata-${value}"))
      value
    }
  }
}

// Evolution: Adding a new field with null/default values
case class StateV2(value1: Integer, value2: String, value3: Long, value4: Option[Long])

class ProcessorV2 extends StatefulProcessor[String, String, String] {
  @transient var state: ValueState[StateV2] = _

  override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
    state = getHandle.getValueState[StateV2](
      "testState",
      Encoders.product[StateV2],
      TTLConfig.NONE)
  }

  override def handleInputRows(
    key: String,
    inputRows: Iterator[String],
    timerValues: TimerValues): Iterator[String] = {
    rows.map { value =>
      // Reading from state
      val currentState = state.get()

      // Showing how null defaults work for different types
      // When reading state written with StateV1(1, "metadata-1"),
      // it will be automatically converted to StateV2(1, "metadata-1", 0L, None)
      println(s"Current state: $currentState")

      // For primitive types like Long, the UnsafeRow default for null is 0
      val longValue = if (currentState.value3 == 0L) {
        println("The value3 field is the default value (0)")
        100L // Set a real value now
      } else {
        currentState.value3
      }

      // Now update with all fields populated
      state.update(StateV2(value.toInt, s"metadata-${value}", longValue))
      value
    }
  }
}

Python

class NullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Initial schema
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]
            self.state.update((int(value), f"metadata-{value}"))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]]
        })

class ExpandedNullDefaultsProcessor(StatefulProcessor):
    def init(self, handle):
        # Evolution: Adding new fields with null/default values
        state_schema = StructType([
            StructField("value1", IntegerType(), True),
            StructField("value2", StringType(), True),
            StructField("value3", LongType(), True),
            StructField("value4", IntegerType(), True),
            StructField("value5", BooleanType(), True)
        ])
        self.state = handle.getValueState("testState", state_schema)

    def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
        for pdf in rows:
            value = pdf["value"].iloc[0]

            # Reading from state
            current_state = self.state.get()

            # Showing how null defaults work in Python
            # When reading state written with NullDefaultsProcessor state = (1, "metadata-1"),
            # it will be automatically converted to (1, "metadata-1", None, None, None)
            # In Python, both primitive and reference types will be None

            value1 = current_state[0]
            value2 = current_state[1]
            value3 = current_state[2]  # Will be None when evolved from older schema
            value4 = current_state[3]  # Will be None when evolved from older schema
            value5 = current_state[4]  # Will be None when evolved from older schema

            # Check if value3 is None
            if value3 is None:
                print("The value3 field is None (default value for evolution)")
                value3 = 100  # Set a real value now

            # Now update with all fields populated
            self.state.update((
                value1,
                value2,
                value3,
                value4 if value4 is not None else 42,
                value5 if value5 is not None else True
            ))

        current_state = self.state.get()

        yield pd.DataFrame({
            "id": [key[0]],
            "value1": [current_state[0]],
            "value2": [current_state[1]],
            "value3": [current_state[2]],
            "value4": [current_state[3]],
            "value5": [current_state[4]]
        })

Einschränkungen

In der folgenden Tabelle werden die Standardgrenzwerte für Änderungen der Schemaentwicklung beschrieben:

Description Standardgrenze Spark-Konfiguration zum Überschreiben
Schemaentwicklungen für eine Zustandsvariable. Das Anwenden mehrerer Schemaänderungen in einem Abfrageneustart zählt als einzelne Schemaentwicklung. 16 spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold
Schemaentwicklungen für die Streamingabfrage. Das Anwenden mehrerer Schemaänderungen in einem Abfrageneustart zählt als einzelne Schemaentwicklung. 128 spark.sql.streaming.stateStore.maxNumStateSchemaFiles

Berücksichtigen Sie beim Beheben der Schemaentwicklung für Zustandsvariablen die folgenden Details sorgfältig: