2024年4月10日发(作者:)
利用Scala语言开发Spark应用程序
park内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。
如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Java Programmers或者
相关Scala书籍进行学习。
AD:
Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。
如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Java Programmers或者
相关Scala书籍进行学习。
本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark
的三种典型应用。
1. WordCount编程实例
WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次
数,编写步骤如下:
步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,
Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参
数指定为 yarn-standalone ,第二个参数是自定义的字符串,举例如下:
valsc=newSparkContext(args(0),
WordCount ,( SPARK_HOME ),Seq(( SPARK_TEST_JAR )))
步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkCon
valtextFile=le(args(1))
当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,
此时你可以使用SparkContext中的hadoopRDD函数,举例如下:
valinputFormatClass=classOf[SequenceFileInputFormat[Text,Text]]varhadoopRdd=RDD(c
onf,inputFormatClass,classOf[Text],classOf[Text])
或者直接创建一个HadoopRDD对象:
varhadoopRdd=newHadoopRDD(sc,conf,classOf[SequenceFileInputFormat[Text,Text,classOf[Text],c
lassOf[Text])
步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中
每行字符串中解析出单词,水草玛瑙 然后将相同单词放到一个桶中,最后统计每个
桶中每个单词出现的频率,举例如下:
valresult=p{case(key,value)= ng().split( s+ }.map(word=
(word,1)).reduceByKey(_+_)
其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换
为另一条记录(一对一关系),高山茶 uceByKey函数将key相同的数据划分到一
个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation。
步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈
数将数据集保存到HDFS目 录下,默认采用Hadoop提供的TextOutputFormat,每条记录以
(key,value) 的形式打印输出,你也可以采用 saveAsSequenceFile函数将数据保存为
SequenceFile格式等,举例如下:
SequenceFile(args(2))
当然,一般我们写Spark程序时,需要包含以下两个头文件:
._importSparkContext._
WordCount完整程序已在 Apache Spark学习:利用Eclipse构建Spark集成开发环境 一文中进
行了介绍,在次不赘述。
需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是
hdfs:hadoop-testtmpinput,输出目 录是hdfs:hadoop-testtmpoutput,其中, hdfs:hadoop-test 是由
Hadoop配置文件core- 中参数指定的,具体替换成你的配置即可。
2. TopK编程实例
TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用
MapReduce实现,则需要编写两个作 业:WordCount和TopK,而使用Spark则只需一个作业,
其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文
的实现并不是最优的,有很大改进空间。
步骤1:首先需要对所有词按照词频排序,如下:
valsorted={case(key,value)= (value,key);exchangekeyandvalue}.sortByKey(true,1)
步骤2:返回前K个:
valtopK=(args(3).toInt)
步骤3:将K各词打印出来:
h(println)
注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,
每个Container存在三个日志 文件,分别是stdout、stderr和syslog,前两个保存的是标准输出
产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中 有内容。
本程序完整代码、编译好的jar包和运行脚本可以从这里下载。下载之后,按照 Apache Spark
学习:利用Eclipse构建Spark集成开发环境 一文操作流程运行即可。
3. SparkJoin编程实例
在推荐领域有一个著名的开放测试集是movielens给的,下载链接是:datasetsmovielens,该测
试集包含三个文件,分别是、、,具体介绍可阅读:,
本节给出的SparkJoin实例则通过连接和两个文件得到平均得分超过4.0
的电影列表,采用的数据集是:ml-1m。程序代码如下:
._importSparkContext._objectSparkJoin{defmain(args:Array[String]){if(args.l
ength!=4){println( unt master rating movie
output )return}valsc=newSparkContext(args(0),
WordCount ,( SPARK_HOME ),Seq(( SPARK_TEST_JAR )))Readratin
gfromHDFSfilevaltextFile=le(args(1))extract(movieid,rating)valrating=(line=
{valfileds=( :: )(fileds(1).toInt,fileds(2).toDouble)})valmovieScores=yKey().ma
p(data=
{valavg=data._a._(data._1,avg)})ReadmoviefromHDFSfilevalmovies=le(args(
2))valmovieskey=(line= {valfileds=( :: )(fileds(0).toInt,fileds(1))}).keyBy(tup=
tup._1)byjoin,weget movie,averageRating,movieName valresult=(tup=
tup._1).join(movieskey).filter(f= f._2._1._2 4.0).map(f=
(f._1,f._2._1._2,f._2._2._2))TextFile(args(3))}}
你可以从这里下载代码、编译好的jar包和运行脚本。
这个程序直接使用Spark编写有些麻烦,可以直接在Shark上编写HQL实现,Shark是基于Spark
的类似Hive的交互式查询引擎,具体可参考:Shark。
4. 总结
Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只
要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照
官方实例编写程序,包括Scala、Java和Python三种语言实例。
原文链接:framework-on-yarnspark-scala-writing-application
【编辑推荐】
Linux环境下C编程指南
本书系统地介绍了在Linux平台下用C语言进行程序开发的过程,通过列举大量的程序实例,
使读者很快掌握在Linux平台下进行C程序开发
发布者:admin,转转请注明出处:http://www.yc00.com/web/1712738597a2113582.html
评论列表(0条)