Save Twitter Stream as JSON files
Spark Streaming is used to collect tweets as the dataset. The tweets are written out in JSON format, one tweet per line. A file of tweets is written every time interval until at least the desired number of tweets is collected.
In order to save the tweets in JSON format you have to add an external library com.google.code.gson, which has an usefull APIs to work with JSON format.
Dependencies
Check you POM file and add all needed dependencies:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>spark-streaming-twitter_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
Save Twitter Stream in JSON files
The code calls TwitterUtils in the Spark Streaming Twitter library to get a DStream of tweets. Then, map is called to convert the tweets to JSON format. Finally, call for each RDD on the DStream. This example repartitions the RDD to write out so that you can control the number of output files.
import com.google.gson.Gson
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter.TwitterUtils
object TwitterStreamJSON {
def main(args: Array[String]): Unit = {
val outputDirectory = args(0)
val conf = new SparkConf()
conf.setAppName("spark-sreaming")
conf.setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(30))
// Configure your Twitter credentials
val apiKey = ""
val apiSecret = ""
val accessToken = "
val accessTokenSecret = "UP1tsTaFuVXTmDJ3nIxGYo0mhqt3ybOuClEQ1V4oTWvbT"
System.setProperty("twitter4j.oauth.consumerKey", apiKey)
System.setProperty("twitter4j.oauth.consumerSecret", apiSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
//in order to avoid IO.Exception
System.setProperty("hadoop.home.dir", "C:\\HADOOP\\");
// Create Twitter Stream in JSON
val tweets = TwitterUtils
.createStream(ssc, None)
.map(new Gson().toJson(_))
val numTweetsCollect = 10000L
var numTweetsCollected = 0L
//Save tweets in file
tweets.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0) {
val outputRDD = rdd.coalesce(1)
outputRDD.saveAsTextFile(outputDirectory)
numTweetsCollected += count
if (numTweetsCollected > numTweetsCollect) {
System.exit(0)
}
}
})
ssc.start()
ssc.awaitTermination()
}
}
Run you application and check the output directory, you will probably see the list of saved files:
Troubleshooting
If you faced with the Exception such as:
ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
You shoul do the following:
1) Copy the downloaded file to ANY_DIRECTORY/bin/winutils.exe
2) System.setProperty("hadoop.home.dir", "ANY_DIRECTORY");
// winutils.exe is copied to C:\winutil\bin\ // System.setProperty\("hadoop.home.dir", "C:\\winutil\\"\);