おいしいブログ

Apache Spark Play Ground

2018-09-01

仕事でたまにお世話になるApache Spark。よくよく忘れた頃にやってくるので適度に触れて忘れないでおこうということで、遊び場用のリポジトリを作りました。

mylde/spark-play-ground-scala - GitHub

ということで、Sparkをサクッと動かせる環境を作ります。

build.sbt

name := "spark-play-ground"

version := "0.1"
scalaVersion := "2.11.12"

libraryDependencies ++= Seq(
  "org.scala-lang" % "scala-library" % "2.11.12",
  "org.scala-lang" % "scala-reflect" % "2.11.12",
  "org.scala-lang.modules" %% "scala-xml" % "1.1.0",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
  "ch.qos.logback" % "logback-classic" % "1.2.3",

  "org.apache.spark" %% "spark-core" % "2.3.1" % Provided,
  "org.apache.spark" %% "spark-sql" % "2.3.1" % Provided,
  "org.apache.spark" %% "spark-streaming" % "2.3.1" % Provided,
  "org.apache.spark" %% "spark-mllib" % "2.3.1" % Provided,
  "org.apache.spark" %% "spark-streaming-kafka" % "1.6.3" % Provided,

  "org.scalatest" %% "scalatest" % "3.2.0-SNAP10" % Test,
  "org.scalacheck" %% "scalacheck" % "1.14.0" % Test
)

fork in run := true

run in Compile := Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run)).evaluated
runMain in Compile := Defaults.runMainTask(fullClasspath in Compile, runner in(Compile, run)).evaluated

//wartremoverErrors ++= Warts.allBut(Wart.NonUnitStatements)

ざっと環境は今時点では以下になっています。

  • scala-lang 2.11.12
  • spark-core 2.3.1
  • spark-sql 2.3.1

あとは、ログ周りと今は必要ないライブラリとか入っています。

## Word Countを実装する

何はともかく、とりあえず何か動かしてみようということで、やっぱり最初はWord Countでしょうか🤔 ということでRDDを使って書いてみます。

動かしてみます.

sbt:spark-play-ground> run
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list

Multiple main classes detected, select one to run:

 [1] com.mylde.pg.spark.WordCountDF
 [2] com.mylde.pg.spark.WordCountDS
 [3] com.mylde.pg.spark.WordCountRdd

Enter number: 3

[info] Running (fork) com.mylde.pg.spark.WordCountRdd 
[error] SLF4J: Class path contains multiple SLF4J bindings.
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/ch.qos.logback/logback-classic/jars/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[error] SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
[info] (e,3)
[info] (c,4)
[info] (a,4)
[info] (d,2)
[info] (b,2)
[info] (f,3)
[success] Total time: 9 s, completed Sep 1, 2018 5:11:39 PM
sbt:spark-play-ground> 

簡単で良いです😊😊

次はDataFrameを使って書いてみます。SQLContextからDataFrameを作る時にStructTypeを使って名前と型情報を与えてやります。あとはそれぞれアルファベット毎の出現回数をカウントしているだけですが、異なる方法で行っています。最後の方法は、カラム毎にsumcountしてエイリアスを付けてなど柔軟な集約が行えるので便利です。

実行してみます。

sbt:spark-play-ground> run
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list

Multiple main classes detected, select one to run:

 [1] com.mylde.pg.spark.WordCountDF
 [2] com.mylde.pg.spark.WordCountDS
 [3] com.mylde.pg.spark.WordCountRdd

Enter number: 1

[info] Running (fork) com.mylde.pg.spark.WordCountDF 
[error] SLF4J: Class path contains multiple SLF4J bindings.
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/ch.qos.logback/logback-classic/jars/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[error] SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
[info] +-----+-----+
[info] |alpha|count|
[info] +-----+-----+
[info] |    f|    3|
[info] |    e|    3|
[info] |    d|    2|
[info] |    c|    4|
[info] |    b|    2|
[info] |    a|    4|
[info] +-----+-----+
[info] +-----+------------+
[info] |alpha|count(alpha)|
[info] +-----+------------+
[info] |    f|           3|
[info] |    e|           3|
[info] |    d|           2|
[info] |    c|           4|
[info] |    b|           2|
[info] |    a|           4|
[info] +-----+------------+
[info] +-----+---+
[info] |alpha|  c|
[info] +-----+---+
[info] |    f|  3|
[info] |    e|  3|
[info] |    d|  2|
[info] |    c|  4|
[info] |    b|  2|
[info] |    a|  4|
[info] +-----+---+
[success] Total time: 17 s, completed Sep 1, 2018 5:48:39 PM
sbt:spark-play-ground> 

最後はDataSetを使ってやりましょう。DataFrameでのStructTypeの代わりにCase Classを使って同じことをやります。

実行してみます。

sbt:spark-play-ground> run
[warn] Multiple main classes detected.  Run 'show discoveredMainClasses' to see the list

Multiple main classes detected, select one to run:

 [1] com.mylde.pg.spark.WordCountDF
 [2] com.mylde.pg.spark.WordCountDS
 [3] com.mylde.pg.spark.WordCountRdd

Enter number: 2

[info] Running (fork) com.mylde.pg.spark.WordCountDS 
[error] SLF4J: Class path contains multiple SLF4J bindings.
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/ch.qos.logback/logback-classic/jars/logback-classic-1.2.3.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: Found binding in [jar:file:~/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[error] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[error] SLF4J: Actual binding is of type [ch.qos.logback.classic.util.ContextSelectorStaticBinder]
[info] +-----+-----+
[info] |alpha|count|
[info] +-----+-----+
[info] |    f|    3|
[info] |    e|    3|
[info] |    d|    2|
[info] |    c|    4|
[info] |    b|    2|
[info] |    a|    4|
[info] +-----+-----+
[info] +-----+------------+
[info] |alpha|count(alpha)|
[info] +-----+------------+
[info] |    f|           3|
[info] |    e|           3|
[info] |    d|           2|
[info] |    c|           4|
[info] |    b|           2|
[info] |    a|           4|
[info] +-----+------------+
[info] +-----+---+
[info] |alpha|  c|
[info] +-----+---+
[info] |    f|  3|
[info] |    e|  3|
[info] |    d|  2|
[info] |    c|  4|
[info] |    b|  2|
[info] |    a|  4|
[info] +-----+---+
[success] Total time: 13 s, completed Sep 1, 2018 5:51:35 PM
sbt:spark-play-ground> 

ということで、Sparkの遊び場が出来ました。HDFSからファイルを読んだりHBaseからデータを読んで集計して、HDFSに書き戻すなりHBaseに書き出すなりというのはよく使うので、その辺りのコードスニペットなりを作ったり、ちょっと試したいことを動かすというのに使っていこうかなと思っています。