使用Spark Streaming进行情感分析

AnjaBrother 9年前
   <p>这里将使用推ter流式数据,它符合所有所需:持续而且无止境的数据源。</p>    <h2>Spark Streaming</h2>    <p>Spark Streaming在电子书 <a href="/misc/goto?guid=4959673672958189294" rel="nofollow,noindex">《手把手教你学习Spark》</a> 第六章有详细介绍,这里略过Streaming API的详细介绍,直接进行程序开发 。</p>    <h2>程序开发设置部分</h2>    <p>程序开发起始部分需要做好准备工作。</p>    <pre>  <code class="language-python">val config = new SparkConf().setAppName("推ter-stream-sentiment")  val sc = new SparkContext(config)  sc.setLogLevel("WARN")    val ssc = new StreamingContext(sc, Seconds(5))    System.setProperty("推ter4j.oauth.consumerKey", "consumerKey")  System.setProperty("推ter4j.oauth.consumerSecret", "consumerSecret")  System.setProperty("推ter4j.oauth.accessToken", accessToken)  System.setProperty("推ter4j.oauth.accessTokenSecret", "accessTokenSecret")    val stream = 推terUtils.createStream(ssc, None)</code></pre>    <p>这里创建一个Spark Context <strong>sc</strong> ,设置日志级别为WARN来消除Spark生成的日志。使用 <strong>sc</strong> 创建Streaming Context <strong>ssc</strong> ,然后设置 推ter证书来获得 推ter网站数据。</p>    <h2>推ter上现在的趋势是什么?</h2>    <p>很容易的能够找到任意给定时刻的推ter趋势,仅仅需要计算数据流每个标签的数目。让我们看下Spark如何实现这个操作的。</p>    <pre>  <code class="language-python">val tags = stream.flatMap { status =>     status.getHashtagEntities.map(_.getText)  }  tags.countByValue()     .foreachRDD { rdd =>         val now = org.joda.time.DateTime.now()         rdd           .sortBy(_._2)           .map(x => (x, now))           .saveAsTextFile(s"~/推ter/$now")       }</code></pre>    <p>首先从Tweets获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。</p>    <h2>分析Tweets</h2>    <p>现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对Tweets的大数据和食物两个不相关的主题进行情感分析。</p>    <p>有几种API可以在Tweets上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。</p>    <p>在 <strong>build.sbt</strong> 文件中增加相对应的依赖。</p>    <pre>  <code class="language-python">libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"  libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"</code></pre>    <p>现在,我们通过Streaming过滤一定的哈希标签,只选择感兴趣的Tweets,如下所示:</p>    <pre>  <code class="language-python">val tweets = stream.filter {t =>       val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)       tags.contains("#bigdata") && tags.contains("#food")     }</code></pre>    <p>得到Tweets上所有标签,然后标记出#bigdata和 #food两个标签。接下来定一个函数从Tweets抽取相关的情感:</p>    <pre>  <code class="language-python">def detectSentiment(message: String): SENTIMENT_TYPE</code></pre>    <p>然后对detectSentiment进行测试以确保其可以工作:</p>    <pre>  <code class="language-python">it("should detect not understood sentiment") {       detectSentiment("") should equal (NOT_UNDERSTOOD)  }    it("should detect a negative sentiment") {       detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)  }    it("should detect a neutral sentiment") {       detectSentiment("I'm watching a movie") should equal (NEUTRAL)  }    it("should detect a positive sentiment") {       detectSentiment("It was a nice experience.") should equal (POSITIVE)  }    it("should detect a very positive sentiment") {       detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)  }</code></pre>    <p>完整列子如下:</p>    <pre>  <code class="language-python">val data = tweets.map { status =>     val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)     val tags = status.getHashtagEntities.map(_.getText.toLowerCase)       (status.getText, sentiment.toString, tags)  }</code></pre>    <p>data中包含相关的情感。</p>    <h2>和SQL协同进行分析</h2>    <p>现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用SQL查询。具体操作如下:</p>    <pre>  <code class="language-python">val sqlContext = new SQLContext(sc)    import sqlContext.implicits._    data.foreachRDD { rdd =>     rdd.toDF().registerTempTable("sentiments")  }</code></pre>    <p>将Dstream转换成DataFrame,然后注册成一个临时表,其他喜欢使用SQL的同事就可以使用不同的数据源啦。</p>    <p>sentiment表可以被任意查询,也可以使用Spark SQL和其他数据源(比如,Cassandra数据等)进行交叉查询。查询DataFrame的列子:</p>    <pre>  <code class="language-python">sqlContext.sql("select * from sentiments").show()</code></pre>    <h2>窗口操作</h2>    <p>Spark Streaming的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。</p>    <p>为了使用窗口函数,你需要checkpoint流数据,具体详情见 <a href="/misc/goto?guid=4959673673052866348" rel="nofollow,noindex">http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing</a> 。</p>    <p>简单的一个窗口操作:</p>    <pre>  <code class="language-python">tags     .window(Minutes(1))     . (...)</code></pre>    <h2>结论</h2>    <p>此列子虽然简单,但是其可以使用Spark解决实际问题。我们可以计算推ter上主题趋势。</p>    <p> </p>    <p>来自: <a href="/misc/goto?guid=4959673673136096051" rel="nofollow">http://www.infoq.com/cn/articles/emotional-analysis-using-streaming-spark</a></p>    <p> </p>