package com.bigdata.spark.sparksql

import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming._

object twitterdata {
  // //https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-twitter_2.11/2.3.0
  //https://developer.twitter.com/en/apps
  //http://twitter4j.org/javadoc/twitter4j/Status.html
  // https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream/4.0.4
  // https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core/4.0.4
  def main(args: Array[String]) {
    val spark = SparkSession.builder.master("local[2]").appName("kafka_wordcount").getOrCreate()
    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val APIkey = "VhsiY76RM3KBvPkBENvUrucUD"//APIKey
    val APIsecretkey = "YO1olvRkfLa34fjiSB34egt0RWyIwlaBVAWSnKhbpZdboiylFi"// (API secret key)
    val Accesstoken = "181460431-OoGqcsZFAHlgNPR0zZUIEH9KVXkhydKS6YZYW5Sl" //Access token
    val Accesstokensecret = "pjlCSkdkH9KNiElMhcqnxZtl0epkgWvP4yGLhmyUvN38z" //Access token secret

//    val searchFilter = "tensorflow,pytorch,ai,Artificial Intelligence"
    //  val pipelineFile = ""
    val searchFilter = "ysrcp,tdp,janasena"

    val interval = 10


    //  import spark.sqlContext.implicits._
    System.setProperty("twitter4j.oauth.consumerKey", APIkey)
    System.setProperty("twitter4j.oauth.consumerSecret", APIsecretkey)
    System.setProperty("twitter4j.oauth.accessToken", Accesstoken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", Accesstokensecret)

    //    val ssc = new StreamingContext(spark.sparkContext, Seconds(interval))
    val tweetStream = TwitterUtils.createStream(ssc, None, Seq(searchFilter.toString))
    // now tweetStream is dstream

    tweetStream.foreachRDD { a =>
      val rdd = a.toString()

      import org.apache.spark.sql.SparkSession
      val spark = SparkSession.builder.config(a.sparkContext.getConf).getOrCreate()
      // (topic, data)
      //(jul22,name,age,ciy
      //key, value
  /*    val maxval = (x:String)=> if(x.length<=250) x

      val func = udf(maxval)
  */
  //val d = a.map(x => (x.getSource(),x.getRetweetCount() , x.getText()))
    //  d.collect.foreach(println)
      val df1 = a.map(x => (x.getUser().getName().toUpperCase(),Option(x.getGeoLocation).map{ _.getLatitude}.getOrElse(0.0) , x.getText(),x.getCreatedAt().getTime)).toDF("createdUser","geolocation","text","createddate")
      df1.printSchema()
      df1.show(8,false)

      // val df1 = rdd.map(x=>x).map(x=>x.split(" ")).map(x => (x(0), x(1), x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9))).toDF("ip","c1","c2","date","usdtime","getput","errorcode","nums","url","browser").drop("c1","c2")
      /*df1.createOrReplaceTempView("tab")
      val res = spark.sql("select * from tab")

      val mhost = "jdbc:oracle:thin://@myora.c308se87akou.ap-south-1.rds.amazonaws.com:1521/ORCL"
      val mprop = new java.util.Properties()
      mprop.setProperty("driver","oracle.jdbc.OracleDriver")
      mprop.setProperty("user","ousername")
      mprop.setProperty("password","opassword")

      res.write.mode(SaveMode.Append).format("org.apache.spark.sql.cassandra").option("table","").option("keyspace","").save()

*/
    }



    ssc.start()
    ssc.awaitTermination()


  }
}

twitter data analysis

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.