曾深爱过的电池
3 月前 |
Am new to Hortonworks and trying to setup a Spark to kafka connectivity using scala which is given below as spark consumer.
The below given jar built it from scala IDE, and using spark-submit i tried to run the jar from ../spark2/bin path.
Help/Guide me to fix this issue.
HDP 3.1.0.0
Getting error as below -
java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history
[root@wpst01 bin]# ./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar 19/05/08 18:18:15 INFO SparkContext: Running Spark version 2.3.2.3.1.0.0-78 19/05/08 18:18:15 INFO SparkContext: Submitted application: kafkalab 19/05/08 18:18:15 INFO SecurityManager: Changing view acls to: root 19/05/08 18:18:15 INFO SecurityManager: Changing modify acls to: root 19/05/08 18:18:15 INFO SecurityManager: Changing view acls groups to: 19/05/08 18:18:15 INFO SecurityManager: Changing modify acls groups to: 19/05/08 18:18:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 19/05/08 18:18:15 INFO Utils: Successfully started service 'sparkDriver' on port 34204. 19/05/08 18:18:15 INFO SparkEnv: Registering MapOutputTracker 19/05/08 18:18:15 INFO SparkEnv: Registering BlockManagerMaster 19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 19/05/08 18:18:15 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 19/05/08 18:18:15 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-12c9c574-f248-4ec0-9517-205d073614d3 19/05/08 18:18:15 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 19/05/08 18:18:15 INFO SparkEnv: Registering OutputCommitCoordinator 19/05/08 18:18:15 INFO log: Logging initialized @2147ms 19/05/08 18:18:15 INFO Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-06-05T22:41:56+05:30, git hash: 84205aa28f11a4f31f2a3b86d1bba2cc8ab69827 19/05/08 18:18:15 INFO Server: Started @2241ms 19/05/08 18:18:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 19/05/08 18:18:15 INFO AbstractConnector: Started ServerConnector@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} 19/05/08 18:18:15 INFO Utils: Successfully started service 'SparkUI' on port 4041. 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5dda6f9{/jobs,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@287f94b1{/jobs/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@30b34287{/jobs/job,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@62f87c44{/jobs/job/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@48f5bde6{/stages,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@525d79f0{/stages/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5149f008{/stages/stage,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2ca65ce4{/stages/stage/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@327120c8{/stages/pool,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@5707c1cb{/stages/pool/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2b5cb9b2{/storage,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@35038141{/storage/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@ecf9049{/storage/rdd,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@672f11c2{/storage/rdd/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2970a5bc{/environment,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@50305a{/environment/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@72efb5c1{/executors,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@6d511b5f{/executors/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@41200e0c{/executors/threadDump,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40f33492{/executors/threadDump/json,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@4fbdc0f0{/static,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@40bffbca{/,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@2449cff7{/api,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@37d80fe7{/jobs/job/kill,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@384fc774{/stages/stage/kill,null,AVAILABLE,@Spark} 19/05/08 18:18:15 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://wpst01:4041 19/05/08 18:18:16 INFO SparkContext: Added JAR file:/usr/local/src/softwares/com.steris-0.0.1-SNAPSHOT-jar-with-dependencies.jar at spark://wpst01:34204/jars/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1557319696003 19/05/08 18:18:16 INFO Executor: Starting executor ID driver on host localhost 19/05/08 18:18:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 43644. 19/05/08 18:18:16 INFO NettyBlockTransferService: Server created on wpst01:43644 19/05/08 18:18:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/05/08 18:18:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, wpst01, 43644, None) 19/05/08 18:18:16 INFO BlockManagerMasterEndpoint: Registering block manager wpst01:43644 with 366.3 MB RAM, BlockManagerId(driver, wpst01, 43644, None) 19/05/08 18:18:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, wpst01, 43644, None) 19/05/08 18:18:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, wpst01, 43644, None) 19/05/08 18:18:16 INFO ContextHandler: Started o.s.j.s.ServletContextHandler@47b2e9e1{/metrics/json,null,AVAILABLE,@Spark} 19/05/08 18:18:16 INFO deprecation: No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS 19/05/08 18:18:17 ERROR SparkContext: Error initializing SparkContext. java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100) at org.apache.spark.SparkContext.<init>(SparkContext.scala:522) at org.com.steris.com.steris.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34) at org.com.steris.com.steris.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/05/08 18:18:17 INFO AbstractConnector: Stopped Spark@52350abb{HTTP/1.1,[http/1.1]}{0.0.0.0:4041} 19/05/08 18:18:17 INFO SparkUI: Stopped Spark web UI at http://wpst01:4041 19/05/08 18:18:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 19/05/08 18:18:17 INFO MemoryStore: MemoryStore cleared 19/05/08 18:18:17 INFO BlockManager: BlockManager stopped 19/05/08 18:18:17 INFO BlockManagerMaster: BlockManagerMaster stopped 19/05/08 18:18:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 19/05/08 18:18:17 INFO SparkContext: Successfully stopped SparkContext Exception in thread "main" java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1587) at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1580) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1595) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:100) at org.apache.spark.SparkContext.<init>(SparkContext.scala:522) at org.com.st.com.st.Iot_kafka_Consumer$.main(Iot_kafka_Consumer.scala:34) at org.com.st.com.st.Iot_kafka_Consumer.main(Iot_kafka_Consumer.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:904) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 19/05/08 18:18:17 INFO ShutdownHookManager: Shutdown hook called 19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-b5f4edb6-cbcb-48d1-b741-76a83cf380d7 19/05/08 18:18:17 INFO ShutdownHookManager: Deleting directory /tmp/spark-f2c592ca-4d7f-4342-9745-01b045baae47
Spark-Submit command
./spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar
My Kafka Consumer scala Code
package iotloganalytics
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ //import org.json4s.native.JsonFormats.parse import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe //import com.datastax.spark.connector._ //import com.datastax.bdp.spark.writer.BulkTableWriter._ //import com.datastax.spark.connector._ //import com.datastax.spark.connector.streaming._ import org.apache.spark.streaming.dstream.ConstantInputDStream import org.apache.spark.sql.functions.explode import org.apache.spark.sql.Row import org.apache.spark.sql import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; object KafkaCons { def main(args:Array[String]) val sparkConf = new SparkConf() .setAppName("IotAnalytics") .setMaster("local[*]") .set("spark.sql.crossJoin.enabled", "true") val sparkcontext = new SparkContext(sparkConf) val sqlContext = new SQLContext(sparkcontext) import sqlContext.implicits._ sparkcontext.setLogLevel("ERROR") val ssc = new StreamingContext(sparkcontext, Seconds(5)) ssc.checkpoint("file:///tmp/checkpointdir") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "@172.25.122.140:6667", //"bootstrap.servers" -> "35.209.4.97:9092", //"bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "STE", "auto.offset.reset" -> "earliest" val topics = Array("STE-DF-OR") val stream = KafkaUtils.createDirectStream[String, String]( PreferConsistent, Subscribe[String, String](topics, kafkaParams) val kafkastream = stream.map(record => (record.key, record.value)) kafkastream.print() println("Before kafka2") val inputStream = kafkastream.map(rec => rec._2); inputStream.print println("Before kafka3") inputStream.foreachRDD(rdd=> //val jsonrdd = rdd.filter(_.contains("results")) if(!rdd.isEmpty) rdd.foreach(println) val df = sqlContext.read.text("rdd") df.show ssc.start() ssc.awaitTermination()Hi @ashok.kumar,
The log is pointing to `java.io.FileNotFoundException: File does not exist: hdfs:/spark2-history`, meaning that in your spark-defaults.conf file, you have specified this directory to be your Spark Events logging dir. In this HDFS path, Spark will try to write it's event logs - not to be confused with YARN application logs, or your application logs -, and it's failing to find it.
You might want to check your spark-defaults.conf file, and point `spark.eventLog.dir` to either a valid hdfs path, or a local path where your Spark Application has access to write. For example, assuming that your client is a Linux/MacOSX machine, you can simply create a /tmp/spark-events directory, grant appropriate write access to it, and then configure spark-defaults.conf to be like:
spark.eventLog.dir=file:///tmp/spark-events
This property can also be overriden, which will be easier for quick tests, i.e.:
/spark-submit --class org.com.st.com.st.Iot_kafka_Consumer --master local[*] --conf spark.eventLog.dir="file:///tmp/spark-events" /usr/local/src/softwares/com.st-0.0.1-SNAPSHOT-jar-with-dependencies.jar
BR,
David Bompart
@dbompart - Tons of Thanks ... That Issue got resolved .... But getting new issues given below.
I have checked the spark-default.conf but couldn't find any relevance.
19/05/11 13:03:36 ERROR JobScheduler: Error running job streaming job 1557560015000 ms.2 org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd; at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666) at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83) at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://cch1wpsteris01:8020/user/root/rdd; at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:719) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$15.apply(DataSource.scala:390) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:389) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:693) at org.apache.spark.sql.DataFrameReader.text(DataFrameReader.scala:666) at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:83) at iotloganalytics.KafkaCons$$anonfun$main$1.apply(KafkaCons.scala:75) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)