エンターテイメント!!

遊戯王好きのJavaエンジニアのブログ。バーニングソウルを会得する特訓中。

Spark試し実装

きっかけ

本当は、hadoop使ってビックデータ処理がどんなものか探る予定だったけど、環境構築で挫折。。。
あまりにも面倒だったので、sparkでアプローチしてみようと思い、記事を書くに至る。

環境準備

macで実験。
Homebrewをインストールした前提で話をする。

OS

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.14.4
BuildVersion:   18E226

Homebrew

$ brew -v
Homebrew 2.1.1
Homebrew/homebrew-core (git revision 858af; last commit 2019-05-01)
Homebrew/homebrew-cask (git revision 606f3; last commit 2019-04-30)

インストール

$ brew install apache-spark
$ brew install scala
$ brew install sbt

Sparkの設定

export Spark_HOME=Spark_HOME-/usr/local/Celler/apache-spark/x.y.z

x.y.zはインストールしたバージョンを指定。
自分は、2.12.8だった。

$scala -version
Scala code runner version 2.12.8 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.

実験

参考サイトを参考に、環境を生成。
ディレクトリ構造を間違えて、かなり迷った。。。

ディレクトリ構成

WordCount/
  build.sbt(ビルド方法を定義するsbtファイル)
  input.txt(ワードカウントする対象の入力ファイル)
  project/ (sbtの追加設定を入れるファイル)
    assembly.sbt
  src/
    main/
      scala/
        jp/
          excite/
            news/
              WordCountApp.scala

build.sbt

name := "WordCountApp"
version := "1.0.0"
scalaVersion := "2.11.8"
resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7"
assemblyMergeStrategy in assembly := {
  case PathList("javax", "servlet", xs @ _*)         => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first
  case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first
  case "application.conf"                            => MergeStrategy.concat
  case "unwanted.txt"                                => MergeStrategy.discard
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}
mainClass in assembly := Some("WordCountApp")

assembly.sbt

resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")

WordCountApp.scala

package jp.excite.news

import java.util.regex.{Matcher, Pattern}
import scala.collection.convert.WrapAsScala._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import org.atilika.kuromoji.Tokenizer
import org.atilika.kuromoji.Token
 
object WordCountApp{
def main(args: Array[String]) {
    //スパークの環境設定
    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App")
    val sc = new SparkContext(sparkConf)
    //kuromojiのトークナイザ
    val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build()
    //テキストファイルから1行ずつ読み込み。名詞を配列に分解する。
    //テキストファイルからRDDオブジェクトを取得する。
        val input = sc.textFile("input.txt").flatMap(line => {
        val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line)
        val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]()
        tokens.foreach(token => {
            if(token.getAllFeatures().indexOf("名詞") != -1) {
            output += token.getSurfaceForm()
        }})
        output// return
    })
    //ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。
    val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y)
    val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("output")
    }
}

参考サイトだと、wordCountsが重複して定義してあったので、削除したのを上記にのっけた。

実行方法

sbt run

実行すると、WordCount/output に出力される。
part-0000xに出力される。

感想

hadoopでいろいろやろうとしたが、環境準備でダメだった。。。
その点、sparkは簡単に試せた。

scalaの構文は全然知らなかったけど、kuromojiは、知っていた。
何となくでも読めた。
ただ、sbtのscalaのビルドがどうなっているのかで、ちょっとハマったけど。
問題は、spark使うメリットが良くわからなかったことかな。。。
もうちょい、深く使ってみないと、わからないかもしれない。
もしかして、StreamAPIが入ったことで、並列処理がStreamAPIでもできるから、Javaでメリット薄いように感じるのだろうか?

ハマったこと

ディレクトリ構造を読みまちがて、java.lang.RuntimeException: No main class detected が出てしまったこと。
最初、WordCountApp.scalaを、src/jp/excite/news/WordCountApp.scala に配置してしまって、scalaのソースとして参照されていなかった。
src/scala/jp/excite/news/WordCountApp.scala が正解。パッケージ名にscalaなんて入ってこないから、相当悩んだ。。。
scalaディレクトリ構造は、srcの下にscalaがあるのが正解だって気づくまで、かなり悩んでしまった。
これは、Javascalaを共存させるために、こうなっているのだろうか?

これからの課題

  • SparkとStreamAPIを比較してみる
  • Scalaを学んでみる
  • sbtを調べてみる

参考サイト

「Apache Spark」×「Scala」で分散処理入門 : エキサイト公式 エンジニアブログ