大数据学习
bigdata learning
Toggle navigation
大数据学习
主页
openGauss数据库
Flume
MongoDB
Hadoop
数据库实验
Kafka
Zookeeper
Hbase
Manual
Spark
Neo4j
InfluxDB
RabbitMQ
Flink
About Me
归档
标签
02-Spark-Shell命令
无
2023-04-24 13:04:29
58
0
0
bigdata
# 通过 Spark Shell 进行交互分析 Spark shell 提供了简单的方式来学习 API,也提供了交互的方式来分析数据。Spark Shell 支持 Scala 和 Python,本教程选择使用 Scala 来进行介绍。 执行如下命令启动 Spark Shell: ```bash nbu@/usr/local/spark$./bin/spark-shell ``` 启动成功后如图所示,会有 “scala >” 的命令提示符。 退出spark如下: ```bash scala> :quit ```  ## 基础操作 Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为 RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。 我们从 ./README 文件新建一个 RDD,代码如下(本文出现的 Spark 交互式命令代码中,与位于同一行的注释内容为该命令的说明,命令之后的注释内容表示交互式输出结果): ```scala scala> val textFile = sc.textFile("file:///usr/local/spark/README.md") textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24 ``` >在启动进入spark-shell以后,spark-shell就默认提供了一个SparkContext对像,即为sc。 >代码中通过 “file://” 前缀指定读取本地文件。Spark shell 默认是读取 HDFS 中的文件。 RDDs 支持两种类型的操作 - [actions](http://spark.apache.org/docs/latest/programming-guide.html#actions): 在数据集上运行计算后返回值 - [transformations](http://spark.apache.org/docs/latest/programming-guide.html#transformations): 转换, 从现有数据集创建一个新的数据集 下面我们就来演示 count() 和 first() 操作: ```scala scala> textFile.count() // RDD 中的 item 数量,对于文本文件,就是总行数 res0: Long = 104 scala> textFile.first() // RDD 中的第一个 item,对于文本文件,就是第一行内容 res1: String = # Apache Spark ``` 接着演示 transformation,通过 filter transformation 来返回一个新的 RDD,代码如下: ```scala scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) // 筛选出包含 Spark 的行 linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:25 scala> linesWithSpark.count() // 统计行数 res2: Long = 19 ``` 可以看到一共有 19 行内容包含 Spark。 action 和 transformation 可以用链式操作的方式结合使用,使代码更为简洁: ```scala scala> textFile.filter(line => line.contains("Spark")).count() // 统计包含 Spark 的行数 res3: Long = 19 ``` ## RDD 的更多操作 RDD 的 actions 和 transformations 可用在更复杂的计算中,例如通过如下代码可以找到包含单词最多的那一行内容共有几个单词: ```scala scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Int = 16 ``` 代码首先将每一行内容 map 为一个整数,这将创建一个新的 RDD,并在这个 RDD 中执行 reduce 操作,找到最大的数。map()、reduce()中的参数是 Scala 的函数字面量(function literals,也称为闭包 closures),并且可以使用语言特征或 Scala/Java 的库。例如,通过使用 Math.max() 函数(需要导入 Java 的 Math 库),可以使上述代码更容易理解: ```scala scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 16 ``` Hadoop MapReduce 是常见的数据流模式,在 Spark 中同样可以实现(下面这个例子也就是 WordCount): ```scala val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) // 实现单词统计 wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:26 scala> wordCounts.collect() // 输出单词统计结果 res6: Array[(String, Int)] = Array((package,1), (this,1), (integration,1), (Python,2), (page](http://spark.apache.org/documentation.html).,1), (cluster.,1), (its,1), ([run,1), (There,1), (general,3), (have,1), (pre-built,1), (Because,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (info,1), (["Specifying,1), ("yarn",1), ([params]`.,1), (first,1), ([project,1), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (are,1), (systems.,1), (params,1), (scala>,1), (DataFrames,,1), (provides,1), (refer,2), (configure,1), (Interactive,2), (R,,1), (can,6), (build,3), (... ``` ## 缓存 Spark 支持在集群范围内将数据集缓存至每一个节点的内存中,可避免数据传输,当数据需要重复访问时这个特征非常有用,例如查询体积小的“热”数据集,或是运行如 PageRank 的迭代算法。调用 cache(),就可以将数据集进行缓存: ```scala linesWithSpark.cache() ```
上一篇:
02-Neo4j简介
下一篇:
02-Yarn配置
文档导航