본문 바로가기

Dev/Data

Spark 테스트 (Windows, Scala, Self-Contained Applications)

# Scala 를 이용한 Spark Self-Contained Applications 테스트 


# 설치 

- Scala 다운로드 : http://www.scala-lang.org/download/all.html (2.10.6 버전)

- SBT(Scala Build Tool) 다운로드 : http://www.scala-sbt.org/download.html

- 두개의 프로그램 모두 공백이 없는 경로에 설치 하거나 mklink를 이용하여 공백이 없는 경로에 접근 가능하게 작업

- 각각의 "scala\bin" "sbt\bin" 디렉토리를 PATH 설정 


# 빌드 준비 

- 아래와 같은 Spark 빌드 환경 설정을 위해 .sbt 파일을 작성

- Scala 버전 2.10.6, Spark 버전 1.5.2 

name := "App"
version := "1.0"
scalaVersion := "2.10.6"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"

- sbt run 을 실행하여 빌드 및 실행 (처음에는 의존성 파일 설정을 위해 오래 걸림


- sublime text 3 사용시 Scala 빌드 설정 파일

- scala.sublime-build

{

    "cmd": ["sbt_bin_path\\sbt.bat", "run"],

    "working_dir": "${project_path:${folder}}",

    "selector": "source.scala"

}




# Scala 코드

http://cdecl.tistory.com/306 의  spark_movelens.py 와 같은 기능을 하는 Scala 코드

- Scala의 Self-Contained Applications 환경에서는 sc.stop() 을 하지 않으면 "ERROR Utils: uncaught error in thread SparkListenerBus, stopping SparkContext" 이란 에러 발생함 

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.log4j.Logger
import org.apache.log4j.Level


object App {
	def main(args: Array[String]) {
		Logger.getLogger("org").setLevel(Level.WARN)
		Logger.getLogger("akka").setLevel(Level.WARN)

		val sc = new SparkContext("local[*]", "MyApp")
		RDDRun(sc)
		sc.stop()
	}


	def RDDRun(sc: SparkContext) {
		val ratings = "D:/hadoop/data/ml-20m/ratings.csv"
		
		val rddMovie = (sc: SparkContext) => {
			val movies =  "D:/hadoop/data/ml-20m/movies.csv"
			val rdd = sc.textFile(movies)
			val header = rdd.first()

			rdd.filter(_ != header).map(_.split(",")).map(x => (x(0), x(1)))
		}

		val rdd = sc.textFile(ratings)
		val header = rdd.first()

		val rddR = rdd.filter(_ != header).map(_.split(",")).map(x => (x(1), x(2).toFloat))
			.groupByKey().mapValues(x=> x.sum / x.size)
			.join(rddMovie(sc)).sortBy(_._2._1)
			.map(x=> (x._1, x._2._1, x._2._2))		

		for (t <- rddR.collect()) {
			println("%s, %f, %s".format(t._1, t._2, t._3))
		}

	}
}


# 비교

- Python 코드 : [Finished in 175.3s] http://cdecl.tistory.com/306 

- Scala 코드 : [Finished in 70.5s]

- Scala 쪽이 월등하게 빠르므로, 프로토타입 같은 작업시 Python이 간단해 보이지만 Staging 서비스의 경우 Scala로 작성하는 것이 성능상 이점이 있음


태그