久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark Streaming編程技巧是什么

共計(jì) 12688 個(gè)字符,預(yù)計(jì)需要花費(fèi) 32 分鐘才能閱讀完成。

本篇內(nèi)容主要講解“Spark Streaming 編程技巧是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓丸趣 TV 小編來(lái)帶大家學(xué)習(xí)“Spark Streaming 編程技巧是什么”吧!

#Spark Streaming 編程指南 #

## 概述 ## Spark Streaming 是核心 Spark API 的一個(gè)擴(kuò)展,他可以實(shí)現(xiàn)高吞吐量,和容錯(cuò)的實(shí)時(shí)數(shù)據(jù)流處理。

他可以接受許多數(shù)據(jù)源例如 Kafka、Flume、Twitter、ZeroMQ 或者普通的老的 TCP 套接字的數(shù)據(jù)。數(shù)據(jù)可以使用擁有高級(jí)函數(shù)例如 map、reduce、join、和 window 的復(fù)雜算法表達(dá)式進(jìn)行處理。最終,處理的數(shù)據(jù)可以被推送到文件系統(tǒng)、數(shù)據(jù)庫(kù)和在線儀表盤。實(shí)際上,你可以在數(shù)據(jù)流上應(yīng)用 Spark 內(nèi)置的機(jī)器學(xué)習(xí)算法和圖處理算法。

img src= https://cache.yisu.com/upload/information/20210522/355/658120.png /

在內(nèi)部,它的工作原理如下。Spark Streaming 接收實(shí)時(shí)輸入數(shù)據(jù)流,并且將數(shù)據(jù)分割成 batches,which are then processed by the Spark engine to generate the final stream of results in batches.

img src= https://cache.yisu.com/upload/information/20210522/355/658121.png /

Spark Streaming 提供一個(gè)高級(jí)的抽象叫做離散流,或者 DStream。它表示一個(gè)連續(xù)不斷的數(shù)據(jù)流。DStreams 既可以通過(guò)來(lái)自數(shù)據(jù)源例如 Kafka、Flume 的數(shù)據(jù)數(shù)據(jù)流創(chuàng)建,也可以通過(guò)在其他 DStreams 上應(yīng)用高級(jí)操作創(chuàng)建。在內(nèi)部,一個(gè) DStream 被表示成一個(gè) RDDs 的序列。

本指南向你展示如何使用 DStreams 開始編寫 Spark Streaming 程序。你可以使用 Scala 或 Java 編寫 Spark Streaming 程序,本指南中兩者都提供。你將會(huì)發(fā)現(xiàn) tabs 貫穿全文,可以讓你在 Scala 和 Java 代碼片段中選擇。

## 一個(gè)簡(jiǎn)單的例子 ## 在我們進(jìn)入如何編寫你自己的 Spark Streaming 程序的細(xì)節(jié)之前,讓我們快速的看下一個(gè)簡(jiǎn)單的 Spark Streaming 程序是怎樣的。比如說(shuō),我們想計(jì)算一個(gè)通過(guò)監(jiān)聽 TCP 套接字得到的數(shù)據(jù)服務(wù)器上的文本數(shù)據(jù)中單詞的總數(shù)。所有你需要做的如下:

首先,我們創(chuàng)建一個(gè) JavaStreamingContext 對(duì)象,他是所有 Streaming 功能的一個(gè)切入點(diǎn)。除了 Spark 的配置,we specify that any DStream would be processed in 1 second batches.

import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import scala.Tuple2;
// Create a StreamingContext with a local master
JavaStreamingContext jssc = new JavaStreamingContext(local[2] ,  JavaNetworkWordCount , new Duration(1000))

使用這個(gè) context,我們通過(guò)指定 IP 地址和數(shù)據(jù)服務(wù)器的端口來(lái)創(chuàng)建一個(gè)新的 DStream。

// Create a DStream that will connect to serverIP:serverPort, like localhost:9999
JavaReceiverInputDStream String  lines = jssc.socketTextStream(localhost , 9999);

這個(gè) DStream lines 表示數(shù)據(jù)的流將會(huì)從這個(gè)數(shù)據(jù)服務(wù)器接收。流中的每一條記錄都是一行文本。然后,我們通過(guò)空格將行分割成單詞。

// Split each line into words
JavaDStream String  words = lines.flatMap( new FlatMapFunction String, String () { @Override public Iterable String  call(String x) { return Arrays.asList(x.split(  ));
 }
 });

flatMap 是一個(gè) DStream 操作,它通過(guò)使源 DStream 中的每一條記錄生成許多新的記錄而創(chuàng)建一個(gè)新的 DStream。在這個(gè)例子中,每一行將會(huì)被分割成多個(gè) words,words 流被表示成 words DStream。注意,我們定義使用 FlatMapFunction 對(duì)象轉(zhuǎn)換。正如我們一直在探索,在 Java API 中有許多這樣的轉(zhuǎn)換類來(lái)幫助定義 DStream 轉(zhuǎn)換。

接下倆,我們想要計(jì)算這些 words 的和:

// Count each word in each batch
JavaPairDStream String, Integer  pairs = words.map( new PairFunction String, String, Integer () { @Override public Tuple2 String, Integer  call(String s) throws Exception { return new Tuple2 String, Integer (s, 1);
 }
 });
JavaPairDStream String, Integer  wordCounts = pairs.reduceByKey( new Function2 Integer, Integer, Integer () { @Override public Integer call(Integer i1, Integer i2) throws Exception {
 return i1 + i2;
 }
 });
wordCounts.print(); // Print a few of the counts to the console

使用一個(gè) PairFunction,words DStream 被進(jìn)一步 mapped(一對(duì)一轉(zhuǎn)換)成一個(gè) DStream 對(duì) (word,1)。然后,使用 Function2 對(duì)象,it is reduced to get the frequency of words in each batch of data。最后,wordCounts.print() 將會(huì)每秒打印一些生成的和。

Note that when these lines are executed, Spark Streaming only sets up the computation it will perform when it is started, and no real processing has started yet. To start the processing after all the transformations have been setup, we finally call

jssc.start(); // Start the computation
jssc.awaitTermination(); // Wait for the computation to terminate

完整的代碼可以再 Spark Streaming example JavaNetworkWordCount 找到。

如果你已經(jīng)下載并且構(gòu)建了 Spark,你可以像下面這樣運(yùn)行這個(gè)例子。你需要首先運(yùn)行 Netcat(一個(gè)可以再大多數(shù) Unix-like 系統(tǒng)上找到的小工具)作為一個(gè)數(shù)據(jù)服務(wù)器,通過(guò):

$ nc -lk 9999

然后,在一個(gè)不同的終端下,亦可以啟動(dòng)這個(gè)例子,通過(guò):

$ ./bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999

然后,在運(yùn)行 netcat 服務(wù)的終端中輸入的每一行將會(huì)被求和并且每秒打印在屏幕上。他看起來(lái)像這樣:

pre # TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world … # TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount $ ./bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999 … ——————————————- Time: 1357008430000 ms ——————————————- (hello,1) (world,1) … /pre

你也可以在 Spark shell 直接使用 Spark Streaming:

$ bin/spark-shell

… 并且通過(guò)封裝已存在的交互式 shell SparkContext 對(duì)象 sc 來(lái)創(chuàng)建你的 StreamingContext:

val ssc = new StreamingContext(sc, Seconds(1))

When working with the shell, you may also need to send a ^D to your netcat session to force the pipeline to print the word counts to the console at the sink.

## 基礎(chǔ)知識(shí) ## 現(xiàn)在,我們 move beyond the simple example,我們?cè)敿?xì)闡述編寫一個(gè) streaming 應(yīng)用程序你需要了解的 Spark Streaming 的基礎(chǔ)知識(shí)。

### 接入 ### 要編寫你自己的 Spark Streaming 程序,你將需要添加下面的依賴到你的 SBT 或者 Maven 項(xiàng)目中:

groupId = org.apache.spark
artifactId = spark-streaming_2.10
version = 1.0.2

對(duì)于從像 Kafka 和 Flume 這樣的數(shù)據(jù)源獲取數(shù)據(jù)的功能,現(xiàn)在已經(jīng)出現(xiàn)在 Spark Streaming 核心 API 里。你將需要添加相應(yīng)的 attiface spark-streaming-xyz_2.10 到依賴。例如,下面是一些常見的:

pre Source Artifact Kafka spark-streaming-kafka_2.10 Flume spark-streaming-flume_2.10 Twitter spark-streaming-twitter_2.10 ZeroMQ spark-streaming-zeromq_2.10 MQTT spark-streaming-mqtt_2.10 /pre

罪行的列表,請(qǐng)參考 Apache repository 獲得所有支持的源和 artifacts 的列表。

### 初始化 ### 在 Java 中,要初始化一個(gè) Spark Streaming 程序,需要?jiǎng)?chuàng)建一個(gè) JavaStreamingContext 對(duì)象,他是整個(gè) Spark Streaming 功能的切入點(diǎn)。一個(gè) JavaStreamingContext 對(duì)象可以被創(chuàng)建,使用:

new JavaStreamingContext(master, appName, batchInterval, [sparkHome], [jars])

master 參數(shù)是一個(gè)標(biāo)準(zhǔn)的 Spark 集群 URL,并且可以是“l(fā)ocal”作為本地測(cè)試。appName 是你的程序的名字,它將會(huì)在你的集群的 Web UI中顯示。batchInterval 是 batches 的大小,就像之前解釋的。最后,如果運(yùn)行為分布式模式,需要最后兩個(gè)參數(shù)來(lái)部署你的代碼到一個(gè)集群上,就像 Spark programming guide 描述的那樣。此外,基本的 SparkContext 可以如同 ssc.sparkContext 這樣訪問(wèn)。

batch internal 的設(shè)置必須根據(jù)你的應(yīng)用程序的延遲要求和可用的集群資源。查看 Performance Tuning 獲得更對(duì)詳細(xì)信息。

###DStreams### Discretized Stream 或者說(shuō) DStream,是 Spark Streaming 提供的基本的抽象。它表示連續(xù)不斷的數(shù)據(jù)流,或者來(lái)自數(shù)據(jù)源的輸入數(shù)據(jù)流,或者通過(guò)轉(zhuǎn)換輸入流生成的經(jīng)過(guò)處理的數(shù)據(jù)流。在內(nèi)部,它通過(guò)一個(gè)連續(xù)不斷的 RDDs 的序列表示,他是 Spark 的一個(gè)不可變得抽象,分布式數(shù)據(jù)器。Each RDD in a DStream contains data from a certain interval,就像下面的圖表中展示的:

img src= https://cache.yisu.com/upload/information/20210522/355/658122.png /

應(yīng)用在一個(gè) DStream 上的任何操作轉(zhuǎn)換成在基礎(chǔ)的 RDDs 上面的操作。例如,in the earlier example of converting a stream of lines to words, the flatmap operation is applied on each RDD in the lines DStream to generate the RDDs of the words DStream. 下面的圖表展示了這個(gè):

img src= https://cache.yisu.com/upload/information/20210522/355/658124.png /

這些基礎(chǔ)的 RDD 轉(zhuǎn)換是通過(guò) Spark 引擎計(jì)算的。DStream 操作隱藏了大多數(shù)的細(xì)節(jié)并提供開發(fā)者方便的高級(jí) API。這些操作在后面的章節(jié)中有詳細(xì)討論。

### 輸入源 ### 我們已經(jīng)在 [quick example](quick example) 看了 ssc.socketTextStream(…), 它通過(guò)一個(gè) TCP 套接字連接接受文本數(shù)據(jù)創(chuàng)建了一個(gè) DStream。除了套接字,核心 Spark Streaming API 提供了創(chuàng)建 DStream 通過(guò)文件,和將 Akka actors 作為輸入源。

特別的,對(duì)于文件,DStream 可以這樣創(chuàng)建:

jssc.fileStream(dataDirectory);

Spark Streaming 將會(huì)監(jiān)視 dataDirectory 目錄下的任何 Hadoop 兼容的文件系統(tǒng),并且處理這個(gè)目錄下創(chuàng)建的任何文件。

注意:

文件必須有統(tǒng)一的格式

The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

Once moved the files must not be changed.

For more details on streams from files, Akka actors and sockets, see the API documentations of the relevant functions in StreamingContext for Scala and JavaStreamingContext for Java.

此外,通過(guò)源,例如 Kafka、Flume 和 Twitter 創(chuàng)建 DStream 的功能可以通過(guò)導(dǎo)入并添加正確的依賴,就像前面的章節(jié)中解釋的那樣。在 Kafka 的情況下,在添加 artifact spark-streaming-kafka_2.10 到項(xiàng)目的依賴后,你可以像這樣創(chuàng)建一個(gè)來(lái)自 Kafka 的 DStream:

import org.apache.spark.streaming.kafka.*;
KafkaUtils.createStream(jssc, kafkaParams, ...);

更多關(guān)于附加源的細(xì)節(jié),查看相應(yīng)的 API 文檔,此外,你可以實(shí)現(xiàn)你自己的源的定制接收者,查看 Custom Receiver Guide.

### 操作 ### 有兩種 DStream 操作 - 轉(zhuǎn)換和輸出操作。和 RDD 轉(zhuǎn)換類似,DStream 轉(zhuǎn)換操作針對(duì)一個(gè)或者多個(gè) DStream 來(lái)創(chuàng)建新的包含轉(zhuǎn)換數(shù)據(jù)的 DStreams。在數(shù)據(jù)流上應(yīng)用一系列轉(zhuǎn)換后,輸入操作需要調(diào)用,它寫數(shù)據(jù)到一個(gè)額外的數(shù)據(jù)槽中,例如一個(gè)文件系統(tǒng)或者一個(gè)數(shù)據(jù)庫(kù)。

#### 轉(zhuǎn)換 #### DStream 支持許多轉(zhuǎn)換,在一個(gè)普通的 Spark RDD 上。下面是一些常見的轉(zhuǎn)換:

pre Transformation Meaning map(func) Return a new DStream by passing each element of the source DStream through a function func. flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items. filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true. repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions. union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream. count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. cogroup(otherStream, [numTasks]) When called on DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. updateStateByKey(func) Return a new state DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. /pre

最后兩個(gè)轉(zhuǎn)換值得再次解釋。

####UpdateStateByKey 操作 #### updateStateByKey 允許你維護(hù)任意的狀態(tài),同時(shí),可以持續(xù)不斷的更新新信息。使用它,你需要下面兩步:

定義狀態(tài) - 狀態(tài)可以是任意數(shù)據(jù)類型

定義狀態(tài)更新函數(shù) - 指定一個(gè)函數(shù),怎樣從之前的狀態(tài)和新的輸入流的值中更新狀態(tài)

讓我們使用一個(gè)例子闡述。假設(shè)我們想維護(hù)一個(gè)連續(xù)的一個(gè)文本流中的單詞出現(xiàn)的次數(shù)。這里,連續(xù)的和是這個(gè) state,并且是一個(gè) Integer,我們定義 update 函數(shù),像這樣:

import com.google.common.base.Optional;
Function2 List Integer , Optional Integer , Optional Integer  updateFunction =
 new Function2 List Integer , Optional Integer , Optional Integer () { @Override public Optional Integer  call(List Integer  values, Optional Integer  state) {
 Integer newSum = ... // add the new values with the previous running count to get the new count
 return Optional.of(newSum);
 }
 };

下面的應(yīng)用在一個(gè)包含 words 的 DStream 上(假設(shè),Pairs DStream 包含(word,1)對(duì)在 quick example)

update 函數(shù)將會(huì)被每一個(gè) word 調(diào)用,with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count. 完整的 Scala 代碼,查看例子 StatefulNetworkWordCount.

####Transform 操作 ####

####Window 操作 #### 最后,Spark Streaming 還提供了 window 計(jì)算。

####Output 操作 #### 當(dāng)一個(gè)輸出操作被調(diào)用,它出發(fā)一個(gè)流計(jì)算,目前,定義了下面的輸出操作:

pre Output Operation Meaning print() Prints first ten elements of every batch of data in a DStream on the driver. foreachRDD(func) The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. saveAsObjectFiles(prefix, [suffix]) Save this DStream s contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . saveAsTextFiles(prefix, [suffix]) Save this DStream s contents as a text files. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . saveAsHadoopFiles(prefix, [suffix]) Save this DStream s contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: prefix-TIME_IN_MS[.suffix] . /pre

完整的 DStream 操作的列表可以在 API 文檔得到。對(duì)于 Scala API,查看 DStream 和 PairDStreamFunctions, 對(duì)于 Java API,查看 JavaDStream 和 JavaPairDStream.

### 持久化 ### 類似于 RDDs,DStreams 同樣允許開發(fā)者持久化流數(shù)據(jù)到內(nèi)存。就是說(shuō),在一個(gè) DStream 上使用 persist()將會(huì)自動(dòng)的持久化這個(gè) DStream 的每一個(gè) RDD 到內(nèi)存。如果這個(gè) DStream 中的數(shù)據(jù)將會(huì)被計(jì)算多次(例如,在同樣的數(shù)據(jù)上進(jìn)行多個(gè)操作),這是非常有用的。對(duì)于基于 window 的操作例如 reduceByWondow 和 reduceByKeyAndWindow 和基于狀態(tài)的操作例如 updateStateByKey,是默認(rèn)持久化的。因此,通過(guò)基于 window 的操作生成的 DStream 是自動(dòng)持久化到內(nèi)存的,而不需要開發(fā)者去調(diào)用 persist()方法。

對(duì)于數(shù)據(jù)流來(lái)說(shuō),它通過(guò) network(例如 Kafka,F(xiàn)lume,socket 等等)接收數(shù)據(jù),它的默認(rèn)的持久化級(jí)別是復(fù)制數(shù)據(jù)到兩個(gè)節(jié)點(diǎn),以便容錯(cuò)。

注意,不想 RDDs,DSteam 默認(rèn)的持久化級(jí)別是保持?jǐn)?shù)據(jù)在內(nèi)存中序列化。在章節(jié) Performance Tuning 有更多的討論。更多關(guān)于不同持久化級(jí)別的信息可以在 Spark Programming Guide 找到。

###RDD Checkpointing### 一個(gè) stateful 操作是那些在數(shù)據(jù)的多個(gè) batches 上的操作。它包括所有基于 window 的操作和 updateStateByKey 操作。由于 stateful 操作依賴于之前數(shù)據(jù)的 batches,他們隨著時(shí)間連續(xù)不斷的聚集元數(shù)據(jù)。要清除這些數(shù)據(jù),Spark Streaming 支持在存儲(chǔ)中間數(shù)據(jù)到 HDFS 時(shí)進(jìn)行定期的 checkpointing。

啟用 checkpointing,開發(fā)者需要提供 RDD 將被保存的 HDFS 路徑。通過(guò)以下代碼完成:

ssc.checkpoint(hdfsPath) // assuming ssc is the StreamingContext or JavaStreamingContext

一個(gè) DStream 的 checkpointing 的間隔可以這樣設(shè)置:

dstream.checkpoint(checkpointInterval)

對(duì)于 DStream,他必須被 checkpointing(即,DStream 通過(guò) updateStateByKey 創(chuàng)建,并且使用相反的函數(shù) reduceByKeyAndWindow),DStream 的 checkpoint 間隔默認(rèn)設(shè)置為 set to a multiple of the DStream’s sliding interval,例如至少設(shè)置 10 秒。

###Deployment### 和其他任何 Spark 應(yīng)用程序一樣,Spark Streaming 應(yīng)用程序部署在集群上。請(qǐng)參考 deployment guide 獲得更多信息。

如果一個(gè)正在運(yùn)行的 Spark Streaming 應(yīng)用程序需要升級(jí)(包括新的應(yīng)用代碼),這里有兩個(gè)可能的技巧:

The upgraded Spark Streaming application is started and run in parallel to the existing application. Once the new one (receiving the same data as the old one) has been warmed up and ready for prime time, the old one be can be brought down. Note that this can be done for data sources that support sending the data to two destinations (i.e., the earlier and upgraded applications).

The existing application is shutdown gracefully (see StreamingContext.stop(…) or JavaStreamingContext.stop(…) for graceful shutdown options) which ensure data that have been received is completely processed before shutdown. Then the upgraded application can be started, which will start processing from the same point where the earlier application left off. Note that this can be done only with input sources that support source-side buffering (like Kafka, and Flume) as data needs to be buffered while the previous application down and the upgraded application is not yet up.

到此,相信大家對(duì)“Spark Streaming 編程技巧是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是丸趣 TV 網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!

正文完
 
丸趣
版權(quán)聲明:本站原創(chuàng)文章,由 丸趣 2023-08-16發(fā)表,共計(jì)12688字。
轉(zhuǎn)載說(shuō)明:除特殊說(shuō)明外本站除技術(shù)相關(guān)以外文章皆由網(wǎng)絡(luò)搜集發(fā)布,轉(zhuǎn)載請(qǐng)注明出處。
評(píng)論(沒有評(píng)論)
主站蜘蛛池模板: 惠水县| 益阳市| 图木舒克市| 阳信县| 绥滨县| 遂昌县| 霍山县| 晋州市| 铁力市| 凤庆县| 博白县| 公主岭市| 大足县| 石嘴山市| 施甸县| 嘉黎县| 全南县| 红安县| 浦城县| 曲阳县| 宁陕县| 台江县| 临沧市| 确山县| 镇安县| 江华| 江源县| 通许县| 喜德县| 许昌县| 云安县| 水富县| 佛冈县| 平利县| 镇雄县| 嫩江县| 永清县| 博湖县| 灵山县| 响水县| 明星|