久久精品人人爽,华人av在线,亚洲性视频网站,欧美专区一二三

Spark Streaming是什么

154次閱讀
沒有評論

共計 5320 個字符,預計需要花費 14 分鐘才能閱讀完成。

這篇文章主要講解了“Spark Streaming 是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著丸趣 TV 小編的思路慢慢深入,一起來研究和學習“Spark Streaming 是什么”吧!

一:Spark Streaming 概覽。

1.1    簡單了解 Spark Streaming。

 Spark Streaming 是核心 Spark API 的一個擴展。具有可擴展性,高吞吐量,容錯性,實時性等特征。

  數據從許多來如中攝入數據,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets。

也可以使用復雜的算法與高級別的功能像 map,reduce,join 和 window 處理。

  最后,也可以將處理過的數據推送到文件系統、數據庫。事實上,我們也可以用 Spark 的機器學習和圖形處理數據流上的算法。用圖形表示如下:
  在內部,其工作原理如下。Spark Streaming 接收實時輸入的數據流和數據劃分批次,然后由 Spark 引擎批處理生成的最終結果流。如圖示: 

    另外,Spark Streaming 提供一個高級抽象,稱為離散的流或 DStream,表示連續的流的數據。DStreams 可以被創建從輸入的數據流,如 Kafka, Flume, and Kinesis,

  或采用其他的 DStreams 高級別操作的輸入的數據流。

  在內部,DStream 是以 RDDs 的序列來表示。

首先,看看 Maven 的依賴包(spark-streaming_2.10)管理:

  dependency 
 groupId org.apache.spark /groupId 
 artifactId spark-streaming_2.10 /artifactId 
 version 1.6.1 /version 
 /dependency

1.2    eg:從一個數據服務器監聽 TCP 套接字接收的文本數據中的單詞進行計數

package com.berg.spark.test5.streaming;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
public class SparkStreamingDemo1 {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster(local[2] ).setAppName( NetworkWordCount 
 conf.set( spark.testing.memory ,  269522560000 
 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
 System.out.println(jssc:   + jssc);
 //  創建一個 DStream,  將連接  hostname:port,  比如  master:9999
 JavaReceiverInputDStream String  lines = jssc.socketTextStream(master , 9999);
 System.out.println(lines :   + lines);
 JavaDStream String  words = lines.flatMap(new FlatMapFunction String, String () {
 private static final long serialVersionUID = 1L;
 public Iterable String  call(String x) {return Arrays.asList(x.split(  ));
 // Count each word in each batch
 JavaPairDStream String, Integer  pairs = words.mapToPair(new PairFunction String, String, Integer () {public Tuple2 String, Integer  call(String s) {return new Tuple2 String, Integer (s, 1);
 JavaPairDStream String, Integer  wordCounts = pairs.reduceByKey(new Function2 Integer, Integer, Integer () {public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 // Print the first ten elements of each RDD generated in this DStream to
 // the console
 wordCounts.print();
 jssc.start(); // Start the computation
 jssc.awaitTermination(); // Wait for the computation to terminate}

至于代碼如何運行了,首先在 Linux 下終端輸入:$ nc -lk 9999

然后在 Eclipse 中運行代碼。

隨意輸入一行文本單詞,單詞之間用空格隔開,如下:

hadoop@master:~$ nc -lk 9999
berg hello world berg hello

可以在 Eclipse 控制臺看到如下結果:

Time: 1465386060000 ms
-------------------------------------------
(hello,2)
(berg,2)
(world,1)

1.3 將 HDFS 目錄下的某些文件內容當做 輸入的數據流。

public class SparkStreamingDemo2 {public static void main(String[] args) {SparkConf conf = new SparkConf().setMaster(local[2] ).setAppName( NetworkWordCount 
 conf.set( spark.testing.memory ,  269522560000 
 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
 System.out.println(jssc:   + jssc);
 //  創建一個 DStream,  讀取 HDFS 上的文件, 作為數據源。JavaDStream String  lines = jssc.textFileStream( hdfs://master:9000/txt/sparkstreaming/ 
 System.out.println(lines :   + lines);
 // Split each line into words
 JavaDStream String  words = lines.flatMap(new FlatMapFunction String, String () {
 private static final long serialVersionUID = 1L;
 public Iterable String  call(String x) {System.out.println(Arrays.asList(x.split(  )).get(0));
 return Arrays.asList(x.split(  ));
 // Count each word in each batch
 JavaPairDStream String, Integer  pairs = words.mapToPair(new PairFunction String, String, Integer () {
 private static final long serialVersionUID = 1L;
 public Tuple2 String, Integer  call(String s) {return new Tuple2 String, Integer (s, 1);
 System.out.println(pairs);
 JavaPairDStream String, Integer  wordCounts = pairs.reduceByKey(new Function2 Integer, Integer, Integer () {public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 // Print the first ten elements of each RDD generated in this DStream to the console
 wordCounts.print();
 JavaDStream Long  count = wordCounts.count();
 count.print(); //  統計
 DStream Tuple2 String, Integer  dstream = wordCounts.dstream();
 dstream.saveAsTextFiles( hdfs://master:9000/bigdata/spark/xxxx ,  sparkstreaming 
 //wordCounts.dstream().saveAsTextFiles( hdfs://master:9000/bigdata/spark/xxxx ,  sparkstreaming 
 jssc.start(); 
 jssc.awaitTermination(); // Wait for the computation to terminate}

上述代碼完成的操作是,一直監聽 HDFS 即 hdfs://master:9000/txt/sparkstreaming/ 目錄下是否有文件存入,如果有,則統計文件中的單詞。。。。

嘗試運行程序后,然后往該目錄中手動添加一個文件,會在控制臺看到對該文件內容中的單詞統計后的數據。

注意參數的意義:

 public JavaDStream java.lang.String textFileStream(java.lang.String directory)
 Create an input stream that monitors a Hadoop-compatible filesystem for 
            new files and reads them as text 
                     files (using key as LongWritable, value as Text and input format as TextInputFormat).
                 Files must be written to the monitored directory 
                 by moving them from another location within the same file system. 
                 File names starting with . are ignored.
 Parameters:
 directory – HDFS directory to monitor for new file
 Returns:
 (undocumented)

感謝各位的閱讀,以上就是“Spark Streaming 是什么”的內容了,經過本文的學習后,相信大家對 Spark Streaming 是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是丸趣 TV,丸趣 TV 小編將為大家推送更多相關知識點的文章,歡迎關注!

正文完
 
丸趣
版權聲明:本站原創文章,由 丸趣 2023-08-16發表,共計5320字。
轉載說明:除特殊說明外本站除技術相關以外文章皆由網絡搜集發布,轉載請注明出處。
評論(沒有評論)
主站蜘蛛池模板: 中西区| 乐亭县| 西宁市| 思茅市| 龙江县| 梧州市| 蒲江县| 凤庆县| 吐鲁番市| 平阳县| 同心县| 赞皇县| 乌拉特后旗| 镇康县| 噶尔县| 西青区| 壶关县| 巴楚县| 沙田区| 府谷县| 逊克县| 尚义县| 三门县| 娄底市| 咸丰县| 盐亭县| 黄浦区| 炉霍县| 扎鲁特旗| 永福县| 平舆县| 衡阳县| 永康市| 齐河县| 永安市| 伊宁县| 毕节市| 乌兰浩特市| 辽阳县| 普定县| 濉溪县|