使用java开发spark

  • 时间:
  • 浏览:604
  • 来源:成都艾邦软件开发

环境搭建 安装jdk 和maven

1. 安装jdk并配置环境变量

系统变量→新建 JAVA_HOME 变量 。

变量值填写jdk的安装目录本人是 E:Javajdk1.7.0)

系统变量→寻找 Path 变量→编辑

在变量值最后输入 %JAVA_HOME%\bin;%JAVA_HOME%jre\bin;注意原来Path的变量值末尾有没有;号如果没有先输入号再输入上面的代码

系统变量→新建 CLASSPATH 变量值填写   .;%JAVA_HOME%lib;%JAVA_HOME%lib\tools.jar注意最前面有一点

2. Maven的安装和配置

解压apache-maven-3.1.1-bin.zip并把解压后的文件夹下的apache-maven-3.1.1文件夹移动到D:Java下如果没有Java这个文件夹的话请自行创建

新建系统变量   MAVEN_HOME  变量值D:Javaapache-maven-3.1.1。编辑系统变量 Path 添加变量值 ;%MAVEN_HOME%\bin

在mave 的目录中修改conf/settings.xmllocalRepository属性添加localRepositoryD:/repository/localRepository 修改maven下载jar 位置。

3. eclipse 中java 和maven 的配置

点击 window -java -Installed JREs -add -standard vm  ,点击next 然后选择jdk 的安装路径点击finish即可

点击window -Maven -Installations -add 在弹出页面选择mave 的安装路径然后点击finish然后在列表中选择我们自己刚添加的那个maven信息。

然后点击window -Maven -User Setings    右侧User Settings 点击browse 现在maven  conf目录下的setttings.xml .主要是修改maven下载依赖包存放的位置

创建maven项目

1. 创建maven项目

点击file -new -others -maven project  点击next选择maven-archetype-quickstart 点击nextgroup id 为 com.dt.sparkartifact id 为 sparkApps然后点击finish

2. 修改jdk 和pom文件

创建maven项目后默认的jdk 1.5改成我们前面安装好的jdk1.8 。在项目上右击 build path -configure build path 。在弹出页面点击Libraries选中jre system library 。点击 edit在弹出框选择 workspace default jre 然后点击finish 然后在点击ok将pom文件修改为如下内容然后等待eclipse 下载maven依赖的jar包并编译工程。编译好工程后有个错误提示在此错误列上右击选择 quick fix 在弹出页面点击finish即可。

project xmlns/POM/4.0.0 xmlns:xsi/2001/XMLSchema-instance

  xsi:schemaLocation/POM/4.0.0 /xsd/maven-4.0.0.xsd

  modelVersion4.0.0/modelVersion

  groupIdcom.dt.spark/groupId

  artifactIdSparkApps/artifactId

  version0.0.1-SNAPSHOT/version

  packagingjar/packaging

  nameSparkApps/name

  url/url

  properties

project.build.sourceEncodingUTF-8/project.build.sourceEncoding

  /properties

  dependencies

dependency

groupIdjunit/groupId

artifactIdjunit/artifactId

version3.8.1/version

scopetest/scope

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-core_2.10/artifactId

version1.6.0/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-sql_2.10/artifactId

version1.6.0/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-hive_2.10/artifactId

version1.6.0/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming_2.10/artifactId

version1.6.0/version

/dependency

dependency

groupIdorg.apache.hadoop/groupId

artifactIdhadoop-client/artifactId

version2.6.0/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-streaming-kafka_2.10/artifactId

version1.6.0/version

/dependency

dependency

groupIdorg.apache.spark/groupId

artifactIdspark-graphx_2.10/artifactId

version1.6.0/version

/dependency

  /dependencies

  build

sourceDirectorysrc/main/java/sourceDirectory

testSourceDirectorysrc/main/test/testSourceDirectory

plugins

plugin

artifactIdmaven-assembly-plugin/artifactId

configuration

descriptorRefs

descriptorRefjar-with-dependencies/descriptorRef

/descriptorRefs

archive

manifest

maniClass/maniClass

/manifest

/archive

/configuration

executions

execution

idmake-assembly/id

phasepackage/phase

goals

goalsingle/goal

/goals

/execution

/executions

/plugin

plugin

groupIdorg.codehaus.mojo/groupId

artifactIdexec-maven-plugin/artifactId

version1.3.1/version

executions

execution

goals

goalexec/goal

/goals

/execution

/executions

configuration

executablejava/executable

includeProjectDependenciesfalse/includeProjectDependencies

classpathScopecompile/classpathScope

mainClasscom.dt.spark.SparkApps.WordCount/mainClass

/configuration

/plugin

plugin

groupIdorg.apache.maven.plugins/groupId

artifactIdmaven-compiler-plugin/artifactId

configuration

source1.6/source

target1.6/target

/configuration

/plugin

/plugins

/build

/project

3. 创建 包路径以及java代码

包路径 com.dt.spark.SparkApps  右击 new -package 在弹出页面name中填写com.dt.spark.SparkApps.cores,点击finish的。

包路径下com.dt.spark.SparkApps.cores 右击 new -class 在弹出窗口中name 中填写 WordCount 点击finish然后在 WordCount 中编写如下代码。

package com.dt.spark.SparkApps.cores;

import java.util.Arrays;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.FlatMapFunction;

import org.apache.spark.api.java.function.Function2;

import org.apache.spark.api.java.function.PairFunction;

import org.apache.spark.api.java.function.VoidFunction;

import scala.Function;

import scala.Tuple2;

/**

 * 使用java的方式开发进行本地测试sparkwordcount 程序

 * author DT大数据梦工厂

 *

 */

public class WordCount {

public static void main(String[] args) {

 /**

* 1步创建Spark的配置对象SparkConf设置Spark程序的运行时的配置信息

* 例如说通过setMaster来设置程序要链接的Spark集群的MasterURL,如果设置

* local则代表Spark程序在本地运行特别适合于机器配置条件非常差例如

* 只有1G的内存的初学者*

*/

SparkConf conf new SparkConf().setAppName(Spark WordCount written by java).setMaster(local);

/**

* 2步创建SparkContext对象

* SparkContextSpark程序所有功能的唯一入口无论是采用ScalaJavaPythonR等都必须有一个SparkContext(不同的语言具体的类名称不同如果是java 的为javaSparkContext)

* SparkContext核心作用初始化Spark应用程序运行所需要的核心组件包括DAGSchedulerTaskSchedulerSchedulerBackend

* 同时还会负责Spark程序往Master注册程序等

* SparkContext是整个Spark应用程序中最为至关重要的一个对象

*/

JavaSparkContext scnew JavaSparkContext(conf); //其底层就是scalasparkcontext

/**

* 3步根据具体的数据来源HDFSHBaseLocal FSDBS3等通过SparkContext来创建RDD

* JavaRDD的创建基本有三种方式根据外部的数据来源例如HDFS、根据Scala集合、由其它的RDD操作

* 数据会被JavaRDD划分成为一系列的Partitions分配到每个Partition的数据属于一个Task的处理范畴

*/

JavaRDDString lines sc.textFile(D://spark-1.6.0-bin-hadoop2.6//README.md);

 /**

* 4步对初始的JavaRDD进行Transformation级别的处理例如mapfilter等高阶函数等的编程来进行具体的数据计算

* 4.1步讲每一行的字符串拆分成单个的单词

*/

JavaRDDString words  lines.flatMap(new FlatMapFunctionString,String(){ //如果是scala由于Sam转化所以可以写成一行代码

Override

public IterableString call(String line) throws Exception {

// TODO Auto-generated method stub

return Arrays.asList(line.split( ));

}

});

/**

* 4步对初始的JavaRDD进行Transformation级别的处理例如mapfilter等高阶函数等的编程来进行具体的数据计算

* 4.2步在单词拆分的基础上对每个单词实例计数为1也就是word (word, 1)

*/

JavaPairRDDString,Integer pairswords.mapToPair(new PairFunctionString, String, Integer() {

Override

public Tuple2String, Integer call(String word) throws Exception {

// TODO Auto-generated method stub

return new Tuple2String, Integer(word,1);

}

});

/**

* 4步对初始的RDD进行Transformation级别的处理例如mapfilter等高阶函数等的编程来进行具体的数据计算

* 4.3步在每个单词实例计数为1基础之上统计每个单词在文件中出现的总次数

*/

JavaPairRDDString,Integer wordsCount pairs.reduceByKey(new Function2Integer, Integer, Integer() { //对相同的Key进行Value的累计包括LocalReducer级别同时Reduce

Override

public Integer call(Integer v1, Integer v2) throws Exception {

// TODO Auto-generated method stub

return v1v2;

}

});

wordsCount.foreach(new VoidFunctionTuple2String,Integer() {

Override

public void call(Tuple2String, Integer pairs) throws Exception {

// TODO Auto-generated method stub

System.out.println(pairs._1 :  pairs._2);

}

});

sc.close();

}

}

在代码区右击 run as - java application 。来运行此程序查看运行结果如果要开发cluster 的代码请参考前面第8或者第9

1、导入pom文件
?xml version“1.0” encoding“UTF-8”?
project xmlns“/POM/4.0.0” xmlns:xsi“/2001/XMLSchema-instance”
xsi:schemaLocation“/POM/4.0.0 /xsd/maven-4.0.0.xsd”
modelVersion4.0.0/modelVersion

groupIdcn.kgc/groupIdartifactIdspark_day0104/artifactIdversion1.0-SNAPSHOT/versionnamespark_day0104/name!-- FIXME change it to the projects website --url/urlpropertiesproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targethadoop.version2.6.0-cdh5.14.2/hadoop.versionhive.version1.1.0-cdh5.14.2/hive.versionhbase.version1.2.0-cdh5.14.2/hbase.versionscala.version2.11.8/scala.versionspark.version2.4.4/spark.version/propertiesrepositoriesrepositoryidcloudera/idurl/artifactory/cloudera-repos//url/repository/repositoriesdependencies!--scala--dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion2.11.8/version/dependency!-- spark-core --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.11/artifactIdversion${spark.version}/version/dependency!-- spark-sql --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.11/artifactIdversion${spark.version}/version/dependency!-- spark-hive --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.11/artifactIdversion2.4.4/version/dependency!-- mysql-connector-java --dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.31/version/dependency!-- spark-graphx --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-graphx_2.11/artifactIdversion${spark.version}/version/dependency!-- hadoop --dependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-common/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-hdfs/artifactIdversion${hadoop.version}/version/dependency!-- log4j --dependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependency!-- junit --dependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.11/version/dependency!-- kafka-clients --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion0.11.0.2/version/dependency/dependenciesbuildplugins!--java打包插件--plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.2/versionconfigurationsource1.8/sourcetarget1.8/targetencodingUTF-8/encoding/configurationexecutionsexecutionphasecompile/phasegoalsgoalcompile/goal/goals/execution/executions/plugin!--scala打包插件--plugingroupIdorg.scala-tools/groupIdartifactIdmaven-scala-plugin/artifactIdversion2.15.2/versionexecutionsexecutionidscala-compile-first/idgoalsgoalcompile/goal/goalsconfigurationincludesinclude**/*.scala/include/includes/configuration/execution/executions/plugin!--将依赖打入jar包--plugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-assembly-plugin/artifactIdversion2.6/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins/build

2、创建spark.class
import org.apache.spark.{SparkConf, SparkContext}

object SPARK1 extends App {
var conf: SparkConf new SparkConf().setMaster(“local[2]”).setAppName(“sparkTest”)
val sc:SparkContextSparkContext.getOrCreate(conf)
sc.textFile(“D:/notes/kgcspark/data/spark1.txt”)
// sc.textFile(“file:///hadooptmp/spark1.txt”)
.flatMap(xx.split( ))
.map(x(x,1))
.reduceByKey()
.collect
.foreach(println)
//关闭资源
sc.stop()
}