首页 > [大数据之Spark]——Actions算子操作入门实例

[大数据之Spark]——Actions算子操作入门实例

Actions

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

这个方法会传入两个参数,计算这两个参数返回一个结果。返回的结果与下一个参数一起当做参数继续进行计算。

比如,计算一个数组的和。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)scala> data.collect
res6: Array[Int] = Array(1, 2, 3)//collect计算
scala> data.reduce((x,y)=>x+y)
res5: Int = 6

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

返回数据集的所有元素,通常是在使用filter或者其他操作的时候,返回的数据量比较少时使用。

比如,显示刚刚定义的数据集内容。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)scala> data.collect
res6: Array[Int] = Array(1, 2, 3)

count()

Return the number of elements in the dataset.

计算数据集的数据个数,一般都是统计内部元素的个数。

//创建数据集
scala> var data = sc.parallelize(1 to 3,1)//统计个数
scala> data.count
res7: Long = 3scala> var data = sc.parallelize(List(("A",1),("B",1)))
scala> data.count
res8: Long = 2

first()

Return the first element of the dataset (similar to take(1)).

返回数据集的第一个元素,类似take(1)

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))//获取第一条元素
scala> data.first
res9: (String, Int) = (A,1)

take(n)

Return an array with the first n elements of the dataset.

返回数组的头n个元素

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("B",1)))scala> data.take(1)
res10: Array[(String, Int)] = Array((A,1))//如果n大于总数,则会返回所有的数据
scala> data.take(8)
res12: Array[(String, Int)] = Array((A,1), (B,1))//如果n小于等于0,会返回空数组
scala> data.take(-1)
res13: Array[(String, Int)] = Array()scala> data.take(0)
res14: Array[(String, Int)] = Array()

takeSample(withReplacement, num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

这个方法与sample还是有一些不同的,主要表现在:

  • 返回具体个数的样本(第二个参数指定)
  • 直接返回array而不是RDD
  • 内部会将返回结果随机打散
//创建数据集
scala> var data = sc.parallelize(List(1,3,5,7))
data: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :21//随机2个数据
scala> data.takeSample(true,2,1)
res0: Array[Int] = Array(7, 1)//随机4个数据,注意随机的数据可能是重复的
scala> data.takeSample(true,4,1)
res1: Array[Int] = Array(7, 7, 3, 7)//第一个参数是是否重复
scala> data.takeSample(false,4,1)
res2: Array[Int] = Array(3, 5, 7, 1)scala> data.takeSample(false,5,1)
res3: Array[Int] = Array(3, 5, 7, 1)

takeOrdered(n, [ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

基于内置的排序规则或者自定义的排序规则排序,返回前n个元素

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at :21//返回排序数据
scala> data.takeOrdered(3)
res4: Array[String] = Array(a, b, c)

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

将数据集作为文本文件保存到指定的文件系统、hdfs、或者hadoop支持的其他文件系统中。

//创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at :21//保存为test_data_save文件
scala> data.saveAsTextFile("test_data_save")scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])
:24: error: not found: type GzipCodecdata.saveAsTextFile("test_data_save2",classOf[GzipCodec])^
//引入必要的class
scala> import org.apache.hadoop.io.compress.GzipCodec
import org.apache.hadoop.io.compress.GzipCodec//保存为压缩文件
scala> data.saveAsTextFile("test_data_save2",classOf[GzipCodec])

查看文件

[xingoo@localhost bin]$ ll
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save
drwxrwxr-x. 2 xingoo xingoo 4096 Oct 10 23:07 test_data_save2
[xingoo@localhost bin]$ cd test_data_save2
[xingoo@localhost test_data_save2]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 30 Oct 10 23:07 part-00000.gz
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save2]$ cd ..
[xingoo@localhost bin]$ cd test_data_save
[xingoo@localhost test_data_save]$ ll
total 4
-rw-r--r--. 1 xingoo xingoo 10 Oct 10 23:07 part-00000
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:07 _SUCCESS
[xingoo@localhost test_data_save]$ cat part-00000 
b
a
e
f
c

saveAsSequenceFile(path)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

保存为sequence文件

scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)),3)
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[21] at parallelize at :22scala> data.saveAsSequenceFile("kv_test")[xingoo@localhost bin]$ cd kv_test/
[xingoo@localhost kv_test]$ ll
total 12
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00000
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00001
-rw-r--r--. 1 xingoo xingoo 99 Oct 10 23:25 part-00002
-rw-r--r--. 1 xingoo xingoo  0 Oct 10 23:25 _SUCCESS

saveAsObjectFile(path)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().

基于Java序列化保存文件

scala> var data = sc.parallelize(List("a","b","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at :22scala> data.saveAsObjectFile("str_test")scala> var data2 = sc.objectFile[Array[String]]("str_test")
data2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[20] at objectFile at :22scala> data2.collect

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

统计KV中,相同K的V的个数

//创建数据集
scala> var data = sc.parallelize(List(("A",1),("A",2),("B",1)))
data: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at parallelize at :22//统计个数
scala> data.countByKey
res9: scala.collection.Map[String,Long] = Map(B -> 1, A -> 2)

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems.

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

针对每个参数执行,通常在更新互斥或者与外部存储系统交互的时候使用

// 创建数据集
scala> var data = sc.parallelize(List("b","a","e","f","c"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at :22// 遍历
scala> data.foreach(x=>println(x+" hello"))
b hello
a hello
e hello
f hello
c hello

转载于:https://www.cnblogs.com/xing901022/p/5947706.html

更多相关:

  • 学习计划 MyPlan11 主题:Python描述统计、简单统计图形 时间:8.5-8.11周内完成 参考资料:新书《谁说菜鸟不会数据分析python篇》 各位星友们,在这个星球里每个人都要逼迫自己学习未知的领域或知识点,每天进步一点点,积累的时间久了 ,菜鸟也能起飞。 完成情况: 在pandas中,使用describe函数进行描述统...

  • 利用SocketServer模块来实现网络客户端与服务器并发连接非阻塞通信。 首先,先了解下SocketServer模块中可供使用的类: BaseServer:包含服务器的核心功能与混合(mix-in)类挂钩;这个类只用于派生,所以不会生成这个类的实例;可以考虑使用TCPServer和UDPServer。 TCPServer/UDPS...

  • 题目:序列化二叉树 请实现两个函数,分别用来序列化和反序列化二叉树。 示例:  你可以将以下二叉树:     1    /   2   3      /     4   5 序列化为 "[1,2,3,null,null,4,5]" 解题: /*** Definition for a binary tree no...

  • sd.js  import $global from "./global"; import $g from "./sg"; import $ from "jquery"; import {Message, Loading} from "element-ui";//引入饿了么相关组件 import {Base64} from "js-...

  •     原生sd.js----------------------------------------------------------------  const API_ROOT_URL = "http://www.api.com";const $d= {_postList_url: API_ROOT_URL + "/api...

  • 上篇笔记中梳理了一把 resolver 和 balancer,这里顺着前面的流程走一遍入口的 ClientConn 对象。ClientConn// ClientConn represents a virtual connection to a conceptual endpoint, to // perform RPCs. // //...

  • 我的实验是基于PSPNet模型实现二维图像的语义分割,下面的代码直接从得到的h5文件开始往下做。。。 也不知道是自己的检索能力出现了问题还是咋回事,搜遍全网都没有可以直接拿来用的语义分割代码,东拼西凑,算是搞成功了。 实验平台:Windows、VS2015、Tensorflow1.8 api、Python3.6 具体的流程为:...

  • Path Tracing 懒得翻译了,相信搞图形学的人都能看得懂,2333 Path Tracing is a rendering algorithm similar to ray tracing in which rays are cast from a virtual camera and traced through a s...

  • configure_file( [COPYONLY] [ESCAPE_QUOTES] [@ONLY][NEWLINE_STYLE [UNIX|DOS|WIN32|LF|CRLF] ]) 我遇到的是 configure_file(config/config.in ${CMAKE_SOURCE_DIR}/...

  •     直接复制以下代码创建一个名为settings.xml的文件,放到C:UsersAdministrator.m2下即可