Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Przesyłanie strumieniowe ze strukturą platformy Apache Spark umożliwia implementowanie skalowalnych, odpornych na błędy aplikacji do przetwarzania strumieni danych. Strukturalne przesyłanie strumieniowe jest oparte na silniku Spark SQL i ulepsza konstrukcje z ramek danych i zbiorów danych Spark SQL, dzięki czemu można pisać zapytania strumieniowe w taki sam sposób, jak zapytania wsadowe.
Aplikacje do przesyłania strumieniowego ze strukturą działają w klastrach HDInsight Spark i łączą się z danymi przesyłanymi strumieniowo z platformy Apache Kafka, gniazdem TCP (na potrzeby debugowania), usługą Azure Storage lub usługą Azure Data Lake Storage. Dwie ostatnie opcje, które opierają się na usługach magazynu zewnętrznego, umożliwiają obserwowanie nowych plików dodanych do magazynu i przetwarzanie ich zawartości tak, jakby były przesyłane strumieniowo.
Strukturalne przesyłanie strumieniowe tworzy długotrwałe zapytanie, podczas którego stosujesz operacje do danych wejściowych, takie jak wybór, projekcja, agregacja, okienkowanie i łączenie strumieniowego DataFrame z referencyjnymi DataFrame. Następnie wyprowadź wyniki do magazynu plików (obiektów blob usługi Azure Storage lub usługi Data Lake Storage) lub do dowolnego magazynu danych przy użyciu kodu niestandardowego (takiego jak usługa SQL Database lub Power BI). Przesyłanie strumieniowe ze strukturą udostępnia również dane wyjściowe do konsoli do debugowania lokalnie oraz do tabeli w pamięci, dzięki czemu można wyświetlić dane wygenerowane do debugowania w usłudze HDInsight.
Uwaga
Strukturalne przesyłanie strumieniowe platformy Spark zastępuje Spark Streaming (DStreams). W przyszłości Strukturalne przesyłanie strumieniowe będzie otrzymywać ulepszenia i będzie utrzymywane, podczas gdy DStreams będą znajdować się tylko w trybie utrzymania. Strukturalne przesyłanie strumieniowe nie jest obecnie tak kompletne, jak DStreams dla źródeł i ujść, które obsługuje w standardowej konfiguracji, więc oceń swoje wymagania, aby wybrać odpowiednią opcję przetwarzania strumieniowego Spark.
Strumienie jako tabele
Spark Structured Streaming reprezentuje strumień danych jako tabelę, która ma nieograniczoną głębokość, to znaczy tabela nadal rośnie wraz z nadejściem nowych danych. Ta tabela wejściowa jest stale przetwarzana przez długotrwałe zapytanie, a wyniki wysyłane do tabeli wyjściowej:
W przypadku przesyłania strumieniowego z użyciem struktur danych, dane docierają do systemu i są natychmiast pozyskiwane do tabeli wejściowej. Zapisujesz zapytania, używając API DataFrame i Dataset, które wykonują operacje na tej tabeli wejściowej. Dane wyjściowe zapytania dają kolejną tabelę, tabelę wyników. Tabela wyników zawiera wyniki zapytania, aby pobrać dane dla zewnętrznego składowiska danych, takiego jak relacyjna baza danych. Czas przetwarzania danych z tabeli wejściowej jest kontrolowany przez interwał wyzwalacza. Domyślnie interwał wyzwalacza wynosi zero, więc Structured Streaming stara się przetwarzać dane natychmiast po ich nadejściu. W praktyce oznacza to, że gdy tylko Strukturalne Przesyłanie Strumieniowe skończy przetwarzanie poprzedniego zapytania, rozpoczyna nowe przetwarzanie na wszelkich nowo otrzymanych danych. Wyzwalacz można skonfigurować tak, aby był uruchamiany w odstępach czasu, tak aby dane przesyłane strumieniowo są przetwarzane w partiach opartych na czasie.
Dane w tabelach wyników mogą zawierać tylko dane, które są nowe od czasu ostatniego przetworzenia zapytania (tryb dołączania), lub tabela może być odświeżana za każdym razem, gdy pojawiają się nowe dane, dzięki czemu tabela zawiera wszystkie dane wyjściowe od rozpoczęcia zapytania przesyłania strumieniowego (tryb pełny).
Tryb dołączania
W trybie dołączania tylko wiersze dodane do tabeli wyników od ostatniego uruchomienia zapytania są obecne w tabeli wyników i zapisywane w magazynie zewnętrznym. Na przykład najprostsze zapytanie po prostu kopiuje wszystkie dane z tabeli wejściowej do tabeli wyników bez zmian. Za każdym razem, gdy interwał wyzwalacza upłynie, nowe dane są przetwarzane, a wiersze reprezentujące te nowe dane są wyświetlane w tabeli wyników.
Rozważmy scenariusz, w którym przetwarzasz dane telemetryczne z czujników temperatury, takich jak termostat. Załóżmy, że pierwszy wyzwalacz przetworzył jedno zdarzenie o godzinie 00:01 dla urządzenia 1 z odczytem temperatury 95 stopni. W pierwszym wyzwalaczu zapytania w tabeli wyników pojawia się tylko wiersz z czasem 00:01. W czasie 00:02 po nadejściu innego zdarzenia jedynym nowym wierszem jest wiersz o godzinie 00:02, więc tabela wyników będzie zawierać tylko ten jeden wiersz.
W przypadku korzystania z trybu dołączania zapytanie będzie stosowało projekcje (wybierając kolumny, których dotyczy), filtrowanie (wybieranie tylko wierszy spełniających określone warunki) lub łączenie (rozszerzanie danych danymi ze statycznej tabeli odnośników). Tryb dołączania ułatwia przesyłanie tylko nowych, odpowiednich danych do pamięci zewnętrznej.
Tryb pełny
Rozważmy ten sam scenariusz, tym razem przy użyciu trybu pełnego. W trybie pełnym cała tabela danych wyjściowych jest odświeżona na każdym wyzwalaczu, więc tabela zawiera dane nie tylko z ostatniego uruchomienia wyzwalacza, ale ze wszystkich przebiegów. Możesz użyć trybu pełnego, aby skopiować dane niezmodyfikowane z tabeli wejściowej do tabeli wyników. Przy każdym uruchomieniu nowe wiersze wyników pojawiają się wraz ze wszystkimi poprzednimi wierszami. Tabela wyników wyjściowych spowoduje zapisanie wszystkich zebranych danych od rozpoczęcia zapytania i w końcu zabraknie pamięci. Tryb kompletny jest przeznaczony do użycia z zapytaniami agregowanymi, które podsumowują dane przychodzące w jakiś sposób, więc na każdym wyzwalaczu tabela wyników jest aktualizowana przy użyciu nowego podsumowania.
Załóżmy, że do tej pory są już przetwarzane dane o wartości pięciu sekund i nadszedł czas, aby przetworzyć dane na szóstą sekundę. Tabela wejściowa zawiera zdarzenia dotyczące czasu 00:01 i godziny 00:03. Celem tego przykładowego zapytania jest zapewnienie średniej temperatury urządzenia co pięć sekund. Implementacja tego zapytania stosuje agregację, która zbiera wszystkie wartości mieszczące się w każdym 5-sekundowym oknie, oblicza średnią temperaturę i tworzy wiersz dla średniej temperatury w tym interwale. Na końcu pierwszego 5-sekundowego okna znajdują się dwie krotki: (00:01, 1, 95) i (00:03, 1, 98). Dlatego dla okna 00:00-00:05 agregacja generuje krotkę ze średnią temperaturą 96,5 stopni. W następnym 5-sekundowym oknie istnieje tylko jeden punkt danych w czasie 00:06, więc wynikowa średnia temperatura wynosi 98 stopni. W czasie 00:10 przy użyciu trybu pełnego tabela wyników zawiera wiersze dla obu okien 00:00-00:05 i 00:05-00:10, ponieważ zapytanie zwraca wszystkie zagregowane wiersze, a nie tylko nowe. W związku z tym tabela wyników nadal rośnie w miarę dodawania nowych okien.
Nie wszystkie zapytania korzystające z trybu pełnego spowodują wzrost tabeli bez ograniczeń. Rozważmy w poprzednim przykładzie, że zamiast średniej temperatury według przedziału czasu, średnia jest zamiast tego według identyfikatora urządzenia. Tabela wyników zawiera stałą liczbę wierszy (jeden na urządzenie) ze średnią temperaturą dla urządzenia we wszystkich punktach danych odebranych z tego urządzenia. W miarę odbierania nowych temperatur tabela wyników jest aktualizowana tak, aby średnie w tabeli były zawsze aktualne.
Składniki aplikacji do przesyłania strumieniowego ze strukturą platformy Spark
Proste przykładowe zapytanie może podsumować odczyty temperatury według godzinnych okien. W takim przypadku dane są przechowywane w plikach JSON w usłudze Azure Storage (dołączonej jako domyślny magazyn dla klastra usługi HDInsight):
{"time":1469501107,"temp":"95"}
{"time":1469501147,"temp":"95"}
{"time":1469501202,"temp":"95"}
{"time":1469501219,"temp":"95"}
{"time":1469501225,"temp":"95"}
Te pliki JSON są przechowywane w temps podfolderze pod kontenerem klastra usługi HDInsight.
Definiowanie źródła danych wejściowych
Najpierw skonfiguruj ramkę danych, która opisuje źródło danych i wszystkie ustawienia wymagane przez to źródło. Ten przykład jest pobierany z plików JSON w usłudze Azure Storage i stosuje do nich schemat w czasie odczytu.
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
//Cluster-local path to the folder containing the JSON files
val inputPath = "/temps/"
//Define the schema of the JSON files as having the "time" of type TimeStamp and the "temp" field of type String
val jsonSchema = new StructType().add("time", TimestampType).add("temp", StringType)
//Create a Streaming DataFrame by calling readStream and configuring it with the schema and path
val streamingInputDF = spark.readStream.schema(jsonSchema).json(inputPath)
Stosowanie zapytania
Następnie zastosuj zapytanie zawierające żądane operacje względem ramki danych przesyłania strumieniowego. W tym przypadku agregacja grupuje wszystkie wiersze w oknach 1-godzinnych, a następnie oblicza minimalną, średnią i maksymalną temperaturę w tym 1-godzinnym przedziale czasu.
val streamingAggDF = streamingInputDF.groupBy(window($"time", "1 hour")).agg(min($"temp"), avg($"temp"), max($"temp"))
Definiowanie ujścia wyjściowego
Następnie zdefiniuj lokalizację docelową wierszy, które są dodawane do tabeli wyników w każdym interwale wyzwalacza. W tym przykładzie wyprowadza wszystkie wiersze do tabeli temps w pamięci, którą można później zapytać za pomocą SparkSQL. Pełny tryb wyjściowy zapewnia, że wszystkie wiersze dla wszystkich okien są wyprowadzane za każdym razem.
val streamingOutDF = streamingAggDF.writeStream.format("memory").queryName("temps").outputMode("complete")
Uruchamianie zapytania
Uruchom zapytanie przesyłania strumieniowego i uruchom je do momentu odebrania sygnału zakończenia.
val query = streamingOutDF.start()
Wyświetlanie wyników
Gdy zapytanie jest uruchomione, w tej samej usłudze SparkSession można uruchomić zapytanie SparkSQL względem temps tabeli, w której są przechowywane wyniki zapytania.
select * from temps
To zapytanie daje wyniki podobne do następujących:
| okno | min(temp) | avg(temp) | maks(temp) |
|---|---|---|---|
| {u'start': u'2016-07-26T02:00:00.000Z', u'end'... | 95 | 95.231579 | 99 |
| {u'start': u'2016-07-26T03:00:00.000Z', u'end'... | 95 | 96.023048 | 99 |
| {u'start': u'2016-07-26T04:00:00.000Z', u'end'... | 95 | 96.797133 | 99 |
| {u'start': u'2016-07-26T05:00:00.000Z', u'end'... | 95 | 96.984639 | 99 |
| {u'start': u'2016-07-26T06:00:00.000Z', u'end'... | 95 | 97.014749 | 99 |
| {u'start': u'2016-07-26T07:00:00.000Z', u'end'... | 95 | 96.980971 | 99 |
| {u'start': u'2016-07-26T08:00:00.000Z', u'end'... | 95 | 96.965997 | 99 |
Aby uzyskać szczegółowe informacje na temat interfejsu API Spark Structured Streaming wraz z obsługiwanymi źródłami danych wejściowych, operacjami i wyjściami danych, zobacz Przewodnik programowania przesyłania strumieniowego ze strukturą platformy Apache Spark.
Tworzenie punktów kontrolnych i zapisywanie dzienników z wyprzedzeniem
Aby zapewnić odporność i odporność na uszkodzenia, przesyłanie strumieniowe ze strukturą opiera się na punktach kontrolnych, aby zapewnić nieprzerwane przetwarzanie strumienia, nawet w przypadku awarii węzłów. W usłudze HDInsight, platforma Spark tworzy punkty kontrolne do trwałego przechowywania w usłudze Azure Storage lub Data Lake Storage. Te punkty kontrolne przechowują informacje o postępie zapytania przesyłania strumieniowego. Ponadto Strukturalne przesyłanie strumieniowe używa logu zapisu z wyprzedzeniem (WAL). Plik WAL rejestruje dane, które zostały odebrane, ale jeszcze nie przetworzone przez zapytanie. Jeśli wystąpi awaria i ponowne uruchomienie przetwarzania z pliku WAL, wszystkie zdarzenia odebrane ze źródła nie zostaną utracone.
Wdrażanie aplikacji Spark Streaming
Zazwyczaj aplikację Spark Streaming można utworzyć lokalnie w pliku JAR, a następnie wdrożyć ją na platformie Spark w usłudze HDInsight, kopiując plik JAR do domyślnego magazynu dołączonego do klastra usługi HDInsight. Aplikację można uruchomić przy użyciu interfejsów API REST usługi Apache Livy dostępnych w klastrze przy użyciu operacji POST. Treść pliku POST zawiera dokument JSON, który zawiera ścieżkę do pliku JAR, nazwę klasy, której główna metoda definiuje i uruchamia aplikację przesyłania strumieniowego oraz opcjonalnie wymagania dotyczące zasobów zadania (takie jak liczba funkcji wykonawczych, pamięci i rdzeni) oraz wszystkie ustawienia konfiguracji wymagane przez kod aplikacji.
Stan wszystkich aplikacji można również sprawdzić za pomocą żądania GET względem punktu końcowego usługi LIVY. Na koniec możesz zakończyć działającą aplikację, wysyłając żądanie DELETE względem punktu końcowego usługi LIVY. Aby uzyskać szczegółowe informacje na temat interfejsu API usługi LIVY, zobacz Zdalne zadania za pomocą usługi Apache LIVY