Save Twitter Stream as JSON files

Spark Streaming is used to collect tweets as the dataset. The tweets are written out in JSON format, one tweet per line. A file of tweets is written every time interval until at least the desired number of tweets is collected.

In order to save the tweets in JSON format you have to add an external library com.google.code.gson, which has an usefull APIs to work with JSON format.

Dependencies

Check you POM file and add all needed dependencies:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>spark-streaming-twitter_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.2.4</version>
    </dependency>
 </dependencies>

Save Twitter Stream in JSON files

The code calls TwitterUtils in the Spark Streaming Twitter library to get a DStream of tweets. Then, map is called to convert the tweets to JSON format. Finally, call for each RDD on the DStream. This example repartitions the RDD to write out so that you can control the number of output files.

import com.google.gson.Gson
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils


object TwitterStreamJSON {

  def main(args: Array[String]): Unit = {

    val outputDirectory = args(0)

    val conf = new SparkConf()
    conf.setAppName("spark-sreaming")
    conf.setMaster("local[*]")

    val sc = new SparkContext(conf)

    val ssc = new StreamingContext(sc, Seconds(30))

    // Configure your Twitter credentials
    val apiKey = ""
    val apiSecret = ""
    val accessToken = "
    val accessTokenSecret = "UP1tsTaFuVXTmDJ3nIxGYo0mhqt3ybOuClEQ1V4oTWvbT"

    System.setProperty("twitter4j.oauth.consumerKey", apiKey)
    System.setProperty("twitter4j.oauth.consumerSecret", apiSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    //in order to avoid IO.Exception
    System.setProperty("hadoop.home.dir", "C:\\HADOOP\\");

    // Create Twitter Stream in JSON
    val tweets = TwitterUtils
      .createStream(ssc, None)
      .map(new Gson().toJson(_))

    val numTweetsCollect = 10000L
    var numTweetsCollected = 0L

    //Save tweets in file
    tweets.foreachRDD((rdd, time) => {
      val count = rdd.count()
      if (count > 0) {
        val outputRDD = rdd.coalesce(1)
        outputRDD.saveAsTextFile(outputDirectory)
        numTweetsCollected += count
        if (numTweetsCollected > numTweetsCollect) {
          System.exit(0)
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

Run you application and check the output directory, you will probably see the list of saved files:

Troubleshooting

If you faced with the Exception such as:

ERROR Shell: Failed to locate the winutils binary in the hadoop binary path

java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

You shoul do the following:
1) Copy the downloaded file to ANY_DIRECTORY/bin/winutils.exe

2) System.setProperty("hadoop.home.dir", "ANY_DIRECTORY");

    // winutils.exe is copied to C:\winutil\bin\

    // System.setProperty\("hadoop.home.dir", "C:\\winutil\\"\);

results matching ""

    No results matching ""