きっかけ
本当は、hadoop使ってビックデータ処理がどんなものか探る予定だったけど、環境構築で挫折。。。
あまりにも面倒だったので、sparkでアプローチしてみようと思い、記事を書くに至る。
環境準備
macで実験。
Homebrewをインストールした前提で話をする。
OS
$ sw_vers ProductName: Mac OS X ProductVersion: 10.14.4 BuildVersion: 18E226
Homebrew
$ brew -v Homebrew 2.1.1 Homebrew/homebrew-core (git revision 858af; last commit 2019-05-01) Homebrew/homebrew-cask (git revision 606f3; last commit 2019-04-30)
インストール
$ brew install apache-spark $ brew install scala $ brew install sbt
Sparkの設定
export Spark_HOME=Spark_HOME-/usr/local/Celler/apache-spark/x.y.z
x.y.zはインストールしたバージョンを指定。
自分は、2.12.8
だった。
$scala -version Scala code runner version 2.12.8 -- Copyright 2002-2018, LAMP/EPFL and Lightbend, Inc.
実験
参考サイトを参考に、環境を生成。
ディレクトリ構造を間違えて、かなり迷った。。。
ディレクトリ構成
WordCount/ build.sbt(ビルド方法を定義するsbtファイル) input.txt(ワードカウントする対象の入力ファイル) project/ (sbtの追加設定を入れるファイル) assembly.sbt src/ main/ scala/ jp/ excite/ news/ WordCountApp.scala
build.sbt
name := "WordCountApp" version := "1.0.0" scalaVersion := "2.11.8" resolvers += "Atilika Open Source repository" at "http://www.atilika.org/nexus/content/repositories/atilika" libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" libraryDependencies += "org.atilika.kuromoji" % "kuromoji" % "0.7.7" assemblyMergeStrategy in assembly := { case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".properties" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".xml" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".types" => MergeStrategy.first case PathList(ps @ _*) if ps.last endsWith ".class" => MergeStrategy.first case "application.conf" => MergeStrategy.concat case "unwanted.txt" => MergeStrategy.discard case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } mainClass in assembly := Some("WordCountApp")
assembly.sbt
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns) addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.6")
WordCountApp.scala
package jp.excite.news import java.util.regex.{Matcher, Pattern} import scala.collection.convert.WrapAsScala._ import org.apache.spark.SparkConf import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.atilika.kuromoji.Tokenizer import org.atilika.kuromoji.Token object WordCountApp{ def main(args: Array[String]) { //スパークの環境設定 val sparkConf = new SparkConf().setMaster("local[4]").setAppName("WordCount App") val sc = new SparkContext(sparkConf) //kuromojiのトークナイザ val tokenizer = Tokenizer.builder.mode(Tokenizer.Mode.NORMAL).build() //テキストファイルから1行ずつ読み込み。名詞を配列に分解する。 //テキストファイルからRDDオブジェクトを取得する。 val input = sc.textFile("input.txt").flatMap(line => { val tokens : java.util.List[Token] = Tokenizer.builder().build().tokenize(line) val output : scala.collection.mutable.ArrayBuffer[String] = new collection.mutable.ArrayBuffer[String]() tokens.foreach(token => { if(token.getAllFeatures().indexOf("名詞") != -1) { output += token.getSurfaceForm() }}) output// return }) //ワードカウントを行う。数える名詞をキーにし、キーを元に加算処理を行う。 val wordCounts = input.map(x => (x, 1L)).reduceByKey((x, y)=> x + y) val output = wordCounts.map( x => (x._2, x._1)).sortByKey(false).saveAsTextFile("output") } }
参考サイトだと、wordCountsが重複して定義してあったので、削除したのを上記にのっけた。
実行方法
sbt run
実行すると、WordCount/output に出力される。
part-0000xに出力される。
感想
hadoopでいろいろやろうとしたが、環境準備でダメだった。。。
その点、sparkは簡単に試せた。
scalaの構文は全然知らなかったけど、kuromojiは、知っていた。
何となくでも読めた。
ただ、sbtのscalaのビルドがどうなっているのかで、ちょっとハマったけど。
問題は、spark使うメリットが良くわからなかったことかな。。。
もうちょい、深く使ってみないと、わからないかもしれない。
もしかして、StreamAPIが入ったことで、並列処理がStreamAPIでもできるから、Javaでメリット薄いように感じるのだろうか?
ハマったこと
ディレクトリ構造を読みまちがて、java.lang.RuntimeException: No main class detected
が出てしまったこと。
最初、WordCountApp.scala
を、src/jp/excite/news/WordCountApp.scala
に配置してしまって、scalaのソースとして参照されていなかった。
src/scala/jp/excite/news/WordCountApp.scala
が正解。パッケージ名にscalaなんて入ってこないから、相当悩んだ。。。
scalaのディレクトリ構造は、srcの下にscalaがあるのが正解だって気づくまで、かなり悩んでしまった。
これは、Javaとscalaを共存させるために、こうなっているのだろうか?
これからの課題
- SparkとStreamAPIを比較してみる
- Scalaを学んでみる
- sbtを調べてみる