开发者社区> 问答> 正文

如何在sparkcontext.parallelize(...)。map()中执行Hive查询?

我无法执行下面的代码。此代码尝试使用SparkContext runJob()方法内的SparkSession从hive表执行hive查询。

val lines = sparkSession.sparkContext.parallelize(Seq("hello world"),1)
sparkSession.sparkContext.runJob(lines, (t: TaskContext, it: Iterator[String]) => {

val conf = new SparkConf().setAppName("Testing")
val session = SparkSession.builder().master("local[*]").config(conf).enableHiveSupport().getOrCreate()

var df : DataFrame= session.sql(s"select count(*) from employee")
println("Task attempt id => " + t.taskAttemptId() + " and count:" + df.first)

})

I am able to get the result in local IntelliJ. However when I try to run the JAR in "yarn cluster" mode, I am getting the below exception:-

18/12/17 12:18:27 ERROR Executor: Exception in task 0.3 in stage 0.0 (TID 3)
java.io.StreamCorruptedException: invalid type code: 00

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
at org.apache.spark.rdd.ParallelCollectionPartition

$$ anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) at org.apache.spark.rdd.ParallelCollectionPartition $$

anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70)

at org.apache.spark.rdd.ParallelCollectionPartition

$$ anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:298) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: java.util.NoSuchElementException: key not found: 0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:206) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:147) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:198) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.UnionRDD $$

anonfun$1.apply(UnionRDD.scala:84)

at org.apache.spark.rdd.UnionRDD

$$ anonfun$1.apply(UnionRDD.scala:84) at scala.collection.TraversableLike $$

anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike

$$ anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:84) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD $$

anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD

$$ anonfun$partitions$2.apply(RDD.scala:250) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.rdd.RDD.partitions(RDD.scala:250) at org.apache.spark.ShuffleDependency.(Dependency.scala:91) at org.apache.spark.sql.execution.exchange.ShuffleExchange$.prepareShuffleDependency(ShuffleExchange.scala:261) at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:84) at org.apache.spark.sql.execution.exchange.ShuffleExchange $$

anonfun$doExecute$1.apply(ShuffleExchange.scala:121)

at org.apache.spark.sql.execution.exchange.ShuffleExchange

$$ anonfun$doExecute$1.apply(ShuffleExchange.scala:112) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 38 more Caused by: java.util.NoSuchElementException: key not found: 0 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.mutable.HashMap.apply(HashMap.scala:65) at org.apache.spark.storage.BlockInfoManager.lockForReading(BlockInfoManager.scala:196) at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:450) at org.apache.spark.broadcast.TorrentBroadcast $$

anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:210)

at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269)

如何解决这个问题?

展开
收起
社区小助手 2018-12-19 17:08:05 4513 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    这在技术上是不可能的,Spark SQL / Spark Core不支持它。

    Spark SQL中的Hive查询被转换为RDD,RDD本身就是分布式计算的描述。

    SparkContext.runJob 是作为Spark作业触发分布式计算(它本身就是一组任务)。

    看到区别?前者是描述,而后者是执行。它们是不同的,不会一起工作。

    2019-07-17 23:23:04
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
Hive Bucketing in Apache Spark 立即下载
spark替代HIVE实现ETL作业 立即下载
2019大数据技术公开课第五季—Hive迁移到MaxCompute最佳实践 立即下载