开发者社区> 问答> 正文

Spark:从2.1.0升级到2.2.1时,Dataframe动作确实很慢

我刚刚将spark 2.1.0升级为spark 2.2.1。有没有人看到极端缓慢的行为dataframe.filter(…).collect()?特别是以前的collect操作filter。dataframe.collect似乎运行正常。但是,dataframe.filter(…).collect()需要永远。它只包含2条记录。它在单元测试中。当我回到Spark 2.1.0时,它恢复正常速度

我看过线程转储,找不到明显的原因。我已经努力确保我使用的所有库也使用Spark 2.2.1。

它似乎陷入了这种堆栈跟踪

scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151)
scala.collection.mutable.HashSet.addEntry(HashSet.scala:40)
scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:142)
scala.collection.mutable.HashSet.addElem(HashSet.scala:40)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59)
scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40)
scala.collection.generic.Growable

$$ anonfun $$

plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.generic.Growable

$$ anonfun $$

plus$plus$eq$1.apply(Growable.scala:59)
scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46)
scala.collection.mutable.HashSet.clone(HashSet.scala:83)
scala.collection.mutable.HashSet.clone(HashSet.scala:40)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50)
scala.collection.SetLike

$$ anonfun $$

plus$plus$1.apply(SetLike.scala:141)
scala.collection.SetLike

$$ anonfun $$

plus$plus$1.apply(SetLike.scala:141)
scala.collection.TraversableOnce

$$ anonfun$foldLeft$1.apply(TraversableOnce.scala:157) scala.collection.TraversableOnce $$

anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:316)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:972)
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:151)
scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104)
scala.collection.SetLike$class.$plus$plus(SetLike.scala:141)
org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus$plus(ExpressionSet.scala:50)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode

$$ anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:323) org.apache.spark.sql.catalyst.plans.logical.UnaryNode $$

anonfun$getAliasedConstraints$1.apply(LogicalPlan.scala:320)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.plans.logical.UnaryNode.getAliasedConstraints(LogicalPlan.scala:320)
org.apache.spark.sql.catalyst.plans.logical.Project.validConstraints(basicLogicalOperators.scala:65)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.logical.Aggregate.validConstraints(basicLogicalOperators.scala:555)
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints$lzycompute(QueryPlan.scala:188) => holding Monitor(org.apache.spark.sql.catalyst.plans.logical.Aggregate@1129881457})
org.apache.spark.sql.catalyst.plans.QueryPlan.constraints(QueryPlan.scala:188)
org.apache.spark.sql.catalyst.plans.QueryPlan.getConstraints(QueryPlan.scala:196)
org.apache.spark.sql.catalyst.optimizer.PruneFilters

$$ anonfun$apply$16 $$

anonfun$25.apply(Optimizer.scala:717)
org.apache.spark.sql.catalyst.optimizer.PruneFilters

$$ anonfun$apply$16 $$

anonfun$25.apply(Optimizer.scala:716)
scala.collection.TraversableLike

$$ anonfun$partition$1.apply(TraversableLike.scala:314) scala.collection.TraversableLike $$

anonfun$partition$1.apply(TraversableLike.scala:314)
scala.collection.immutable.List.foreach(List.scala:392)
scala.collection.TraversableLike$class.partition(TraversableLike.scala:314)
scala.collection.AbstractTraversable.partition(Traversable.scala:104)
org.apache.spark.sql.catalyst.optimizer.PruneFilters

$$ anonfun$apply$16.applyOrElse(Optimizer.scala:716) org.apache.spark.sql.catalyst.optimizer.PruneFilters $$

anonfun$apply$16.applyOrElse(Optimizer.scala:705)
org.apache.spark.sql.catalyst.trees.TreeNode

$$ anonfun$2.apply(TreeNode.scala:267) org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$2.apply(TreeNode.scala:267)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:705)
org.apache.spark.sql.catalyst.optimizer.PruneFilters.apply(Optimizer.scala:704)
org.apache.spark.sql.catalyst.rules.RuleExecutor

$$ anonfun$execute$1 $$

anonfun$apply$1.apply(RuleExecutor.scala:85)
org.apache.spark.sql.catalyst.rules.RuleExecutor

$$ anonfun$execute$1 $$

anonfun$apply$1.apply(RuleExecutor.scala:82)
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
scala.collection.immutable.List.foldLeft(List.scala:84)
org.apache.spark.sql.catalyst.rules.RuleExecutor

$$ anonfun$execute$1.apply(RuleExecutor.scala:82) org.apache.spark.sql.catalyst.rules.RuleExecutor $$

anonfun$execute$1.apply(RuleExecutor.scala:74)
scala.collection.immutable.List.foreach(List.scala:392)
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89) => holding Monitor(org.apache.spark.sql.execution.QueryExecution@1193326176})
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:2837)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2387)

展开
收起
社区小助手 2019-01-02 14:53:19 2563 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    固定它。所以问题在于这个属性spark.sql.constraintPropagation.enabled。默认值true在Spark 2.2.1中。堆栈跟踪表明它卡在一些查询计划生成中。

    简答:将上述属性设置为false。 spark.conf.set(SQLConf.CONSTRAINT_PROPAGATION_ENABLED.key, false)

    2019-07-17 23:24:24
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Hybrid Cloud and Apache Spark 立即下载
Scalable Deep Learning on Spark 立即下载
Comparison of Spark SQL with Hive 立即下载