相关文章推荐

Am new to Hortonworks and trying to setup a Spark to kafka connectivity using scala which is given below as spark consumer.

The below given jar built it from scala IDE, and using spark-submit i tried to run the jar from ../spark2/bin path.

Help/Guide me to fix this issue.

HDP 3.1.0.0


Getting error as below -


 java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history


[root@wpst01 bin]# ./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar
19/05/08 18:18:15 INFO SparkContext: Running Spark version 2.3.2.3.1.0.0-78
19/05/08 18:18:15 INFO SparkContext: Submitted application: kafkalab
19/05/08 18:18:15 INFO SecurityManager: Changing view acls to: root
19/05/08 18:18:15 INFO SecurityManager: Changing modify acls to: root
19/05/08 18:18:15 INFO SecurityManager: Changing view acls groups to:
19/05/08 18:18:15 INFO SecurityManager: Changing modify acls groups to:
19/05/08 18:18:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
19/05/08 18:18:15 INFO Utils: Successfully started service 'sparkDriver' on port 34204.
19/05/08 18:18:15 INFO SparkEnv: Registering MapOutputTracker
19/05/08 18:18:15 INFO SparkEnv: Registering BlockManagerMaster
19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/05/08 18:18:15 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-12c9c574-f248-4ec0-9517-205d073614d3
19/05/08 18:18:15 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/05/08 18:18:15 INFO SparkEnv: Registering OutputCommitCoordinator
19/05/08 18:18:15 INFO log: Logging initialized @2147ms
19/05/08 18:18:15 INFO Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-06-05T22:41:56+05:30, git hash: 84205aa28f11a4f31f2a3b86d1bba2cc8ab69827
19/05/08 18:18:15 INFO Server: Started @2241ms
19/05/08 18:18:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
19/05/08 18:18:15 INFO AbstractConnector: Started ServerConnector@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
19/05/08 18:18:15 INFO Utils: Successfully started service 'SparkUI' on port 4041.
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5dda6f9{/jobs,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@287f94b1{/jobs/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@30b34287{/jobs/job,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@62f87c44{/jobs/job/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@48f5bde6{/stages,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@525d79f0{/stages/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5149f008{/stages/stage,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2ca65ce4{/stages/stage/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@327120c8{/stages/pool,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5707c1cb{/stages/pool/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b5cb9b2{/storage,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@35038141{/storage/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@ecf9049{/storage/rdd,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@672f11c2{/storage/rdd/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2970a5bc{/environment,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@50305a{/environment/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@72efb5c1{/executors,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6d511b5f{/executors/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41200e0c{/executors/threadDump,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40f33492{/executors/threadDump/json,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4fbdc0f0{/static,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40bffbca{/,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2449cff7{/api,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@37d80fe7{/jobs/job/kill,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@384fc774{/stages/stage/kill,null,AVAILABLE,@Spark}
19/05/08 18:18:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://wpst01:4041
19/05/08 18:18:16 INFO SparkContext: Added JAR file:/usr/local/src/softwares/com.steris-0.0.1-SNAPSHOT-jar-with-dependencies.jar at spark://wpst01:34204/jars/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1557319696003
19/05/08 18:18:16 INFO Executor: Starting executor ID driver on host localhost
19/05/08 18:18:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43644.
19/05/08 18:18:16 INFO NettyBlockTransferService: Server created on wpst01:43644
19/05/08 18:18:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/05/08 18:18:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManagerMasterEndpoint: Registering block manager wpst01:43644 with 366.3 MB RAM, BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, wpst01, 43644, None)
19/05/08 18:18:16 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@47b2e9e1{/metrics/json,null,AVAILABLE,@Spark}
19/05/08 18:18:16 INFO deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS
19/05/08 18:18:17 ERROR SparkContext: Error initializing SparkContext.
java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history
        at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587)
        at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595)
        at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:522)
        at org.com.steris.com.steris.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34)
        at org.com.steris.com.steris.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/05/08 18:18:17 INFO AbstractConnector: Stopped Spark@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041}
19/05/08 18:18:17 INFO SparkUI: Stopped Spark web UI at http://wpst01:4041
19/05/08 18:18:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/08 18:18:17 INFO MemoryStore: MemoryStore cleared
19/05/08 18:18:17 INFO BlockManager: BlockManager stopped
19/05/08 18:18:17 INFO BlockManagerMaster: BlockManagerMaster stopped
19/05/08 18:18:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/05/08 18:18:17 INFO SparkContext: Successfully stopped SparkContext
Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history
        at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587)
        at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595)
        at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100)
        at org.apache.spark.SparkContext.<init>(SparkContext.scala:522)
        at org.com.st.com.st.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34)
        at org.com.st.com.st.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
19/05/08 18:18:17 INFO ShutdownHookManager: Shutdown hook called
19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-b5f4edb6-cbcb-48d1-b741-76a83cf380d7
19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-f2c592ca-4d7f-4342-9745-01b045baae47


Spark-Submit command

./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar

My Kafka Consumer scala Code


package iotloganalytics



import org.apache.spark.SparkConf
import  org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
//import org.json4s.native.JsonFormats.parse
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
//import com.datastax.spark.connector._
//import com.datastax.bdp.spark.writer.BulkTableWriter._
//import com.datastax.spark.connector._
//import com.datastax.spark.connector.streaming._
import org.apache.spark.streaming.dstream.ConstantInputDStream
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.Row
import org.apache.spark.sql
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
object KafkaCons {  
 def main(args:Array[String])
   val sparkConf = new SparkConf()
                         .setAppName("IotAnalytics")
                         .setMaster("local[*]")
                         .set("spark.sql.crossJoin.enabled", "true")    
    val sparkcontext = new SparkContext(sparkConf)
    val sqlContext   = new SQLContext(sparkcontext)
    import sqlContext.implicits._
    sparkcontext.setLogLevel("ERROR")
        val ssc = new StreamingContext(sparkcontext, Seconds(5))
        ssc.checkpoint("file:///tmp/checkpointdir")
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "@172.25.122.140:6667",
           //"bootstrap.servers" -> "35.209.4.97:9092",
          //"bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "STE",
          "auto.offset.reset" -> "earliest"
        val topics = Array("STE-DF-OR")
        val stream = KafkaUtils.createDirectStream[String, String](
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        val kafkastream = stream.map(record => (record.key, record.value))
        kafkastream.print()
        println("Before kafka2")
        val inputStream = kafkastream.map(rec => rec._2);
        inputStream.print
        println("Before kafka3")
        inputStream.foreachRDD(rdd=>
                  //val jsonrdd = rdd.filter(_.contains("results"))
        if(!rdd.isEmpty)
              rdd.foreach(println)
              val df = sqlContext.read.text("rdd")
              df.show
    ssc.start()
    ssc.awaitTermination()
						

Hi @ashok.kumar,


The log is pointing to `java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history`, meaning that in your spark-defaults.conf file, you have specified this directory to be your Spark Events logging dir. In this HDFS path, Spark will try to write it's event logs - not to be confused with YARN application logs, or your application logs -, and it's failing to find it.


You might want to check your spark-defaults.conf file, and point `spark.eventLog.dir` to either a valid hdfs path, or a local path where your Spark Application has access to write. For example, assuming that your client is a Linux/MacOSX machine, you can simply create a /tmp/spark-events directory, grant appropriate write access to it, and then configure spark-defaults.conf to be like:


spark.eventLog.dir=file:///tmp/spark-events


This property can also be overriden, which will be easier for quick tests, i.e.:

/spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] --conf spark.eventLog.dir="file:///tmp/spark-events" /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar


BR,

David Bompart

@dbompart - Tons of Thanks ... That Issue got resolved .... But getting new issues given below.

I have checked the spark-default.conf but couldn't find any relevance.


19/05/11 13:03:36 ERROR JobScheduler: Error running job streaming job 1557560015000 ms.2
org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd;
        at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693)
        at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666)
        at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83)
        at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd;
        at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693)
        at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666)
        at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83)
        at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            
            
            
            
            Apache Hadoop
             and associated open source project names are trademarks of the 
              Apache Software Foundation. For a complete list of trademarks, click here.
 
推荐文章