开发者社区> 问答> 正文

更改Spark中的Metastore URI

在我的工作中,有不同的环境(开发,预生产和生产),在每个环境中,我们在其Hive Metastore中都有某些表。我的用户有权通过直接访问和查询所有这些Metastore,但我想使用sqlContext(或HiveContext)在spark-shell会话中访问这些Metallores。

例如,当我使用ssh访问Preproduction环境时,如果我启动spark-shell会话,它会自动创建一个sqlContext变量,我可以使用该变量对Preproduction Metastore执行查询。

我也可以使用beeline对Preproduction Metastore的Production Metoreore执行查询,所以我尝试更改Hive中的一些配置(如何在SparkSQL中以编程方式连接到Hive Metastore?)。我更改了以下属性:

hive.metastore.uris和hive.server2.authentication.kerberos.principal到生产环境中的对应属性。

spark-shell中的代码:

System.setProperty("hive.server2.authentication.kerberos.principal","hive/URL@URL2")

System.setProperty("hive.metastore.uris","thrift://URLTOPRODUCTION:9083")
import org.apache.spark.sql.hive.HiveContext
val hive=new HiveContext(sc)
val df=hive.sql("SELECT * FROM DB.Table limit 10")

但是当我执行前一个代码块的最后一个句子时,我遇到了以下错误。

java.lang.IllegalArgumentException: java.net.UnknownHostException: nameservice1

at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:406)

at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)

at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)

at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:762)

at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:693)

at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache

$$ anonfun$1.apply(interfaces.scala:449) at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache $$

anonfun$1.apply(interfaces.scala:447)

at scala.collection.TraversableLike

$$ anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike $$

anonfun$flatMap$1.apply(TraversableLike.scala:251)

at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)

at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:108)

at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)

at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477)

at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation

$$ fileStatusCache$lzycompute(interfaces.scala:489) at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation $$

fileStatusCache(interfaces.scala:487)

at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:398)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation

$$ metadataCache$lzycompute(ParquetRelation.scala:145) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation $$

metadataCache(ParquetRelation.scala:143)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation

$$ anonfun$6.apply(ParquetRelation.scala:202) at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation $$

anonfun$6.apply(ParquetRelation.scala:202)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:202)

at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636)

at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)

at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)

at org.apache.spark.sql.hive.HiveMetastoreCatalog

$$ anonfun$12.apply(HiveMetastoreCatalog.scala:504) at org.apache.spark.sql.hive.HiveMetastoreCatalog $$

anonfun$12.apply(HiveMetastoreCatalog.scala:503)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.sql.hive.HiveMetastoreCatalog.org$apache$spark$sql$hive$HiveMetastoreCatalog

$$ convertToParquetRelation(HiveMetastoreCatalog.scala:503) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions $$

anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:565)

at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions

$$ anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:545) at org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$transformUp$1.apply(TreeNode.scala:335)

at org.apache.spark.sql.catalyst.trees.TreeNode

$$ anonfun$transformUp$1.apply(TreeNode.scala:335) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:334) at org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$5.apply(TreeNode.scala:332)

at org.apache.spark.sql.catalyst.trees.TreeNode

$$ anonfun$5.apply(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$4.apply(TreeNode.scala:281)

at scala.collection.Iterator

$$ anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$5.apply(TreeNode.scala:332)

at org.apache.spark.sql.catalyst.trees.TreeNode

$$ anonfun$5.apply(TreeNode.scala:332) at org.apache.spark.sql.catalyst.trees.TreeNode $$

anonfun$4.apply(TreeNode.scala:281)

at scala.collection.Iterator

$$ anon$11.next(Iterator.scala:328) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:321) at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:332) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:545) at org.apache.spark.sql.hive.HiveMetastoreCatalog$ParquetConversions$.apply(HiveMetastoreCatalog.scala:539) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$

anonfun$execute$1

$$ anonfun$apply$1.apply(RuleExecutor.scala:83) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$

anonfun$execute$1

$$ anonfun$apply$1.apply(RuleExecutor.scala:80) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor $$

anonfun$execute$1.apply(RuleExecutor.scala:80)

at org.apache.spark.sql.catalyst.rules.RuleExecutor

$$ anonfun$execute$1.apply(RuleExecutor.scala:72) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:72) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:37) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:37) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:35) at org.apache.spark.sql.DataFrame.(DataFrame.scala:133) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:829) at $iwC $$

iwC

$$ iwC $$

iwC

$$ iwC $$

iwC

$$ iwC $$

iwC.(:30)

at $iwC

$$ iwC $$

iwC

$$ iwC $$

iwC

$$ iwC $$

iwC.(:35)

at $iwC

$$ iwC $$

iwC

$$ iwC $$

iwC

$$ iwC.(:37) at $iwC $$

iwC

$$ iwC $$

iwC

$$ iwC.(:39) at $iwC $$

iwC

$$ iwC $$

iwC.(:41)

at $iwC

$$ iwC $$

iwC.(:43)

at $iwC

$$ iwC.(:45) at $iwC.(:47) at (:49) at .(:53) at .() at .(:7) at .() at $print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop $$

loop(SparkILoop.scala:670)

at org.apache.spark.repl.SparkILoop

$$ anonfun$org$apache$spark$repl$SparkILoop $$

process$1.apply$mcZ$sp(SparkILoop.scala:997)

at org.apache.spark.repl.SparkILoop

$$ anonfun$org$apache$spark$repl$SparkILoop $$

process$1.apply(SparkILoop.scala:945)

at org.apache.spark.repl.SparkILoop

$$ anonfun$org$apache$spark$repl$SparkILoop $$

process$1.apply(SparkILoop.scala:945)

at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)

at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop

$$ process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064) at org.apache.spark.repl.Main$.main(Main.scala:35) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit $$

runMain(SparkSubmit.scala:730)

at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)

at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)

at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.net.UnknownHostException: nameservice1

... 141 more

我正在使用Spark 1.6.0和Scala 2.10.5的Cloudera发行版。

展开
收起
社区小助手 2018-12-21 13:56:01 3593 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    最后,在我查看了spark-shell在服务器中自动创建的sqlContext变量的配置之后,我看到有很多url和配置变量,我在HDFS或其他服务器中没有权限。需要对PROD Metastore执行查询。

    我知道用直线工作查询PROD Metastore,我知道我可以通过JDBC查询这个Metastore,所以我把这个调用的JDBC URL带到beeline。

    然后我使用这个JDBC URL并开始使用本机Java(来自Scala)方法和实用程序来连接DBvíaJDBC:

    /We will need hive-jdbc-0.14.0.jar to connect to a Hive metastore via JDBC /
    import java.sql.ResultSetMetaData
    import java.sql.{DriverManager, Connection, Statement, ResultSet}
    import org.apache.spark.sql.types.StringType
    import org.apache.spark.sql.types.StructField
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.Row
    / In the following lines I connect to Prod Metastore via JDBC and I execute the query as if I am connecting to a simple DB. Notice that, using this method, you are not using distributed computing /
    val url="jdbc:hive2://URL_TO_PROD_METASTORE/BD;CREDENTIALS OR URL TO KERBEROS"
    val query="SELECT * FROM BD.TABLE LIMIT 100"
    val driver="org.apache.hive.jdbc.HiveDriver"
    Class.forName(driver).newInstance
    val conn: Connection = DriverManager.getConnection(url)
    val r: ResultSet = conn.createStatement.executeQuery(query)
    val list =scala.collection.mutable.MutableList[Row]()
    / Now we want to get all the values from all the columns. Notice that I creat a ROW object for each row of the results. Then I add each Row to a MutableList/
    while(r.next()){
    var value : Array[String] = new ArrayString)
    for(i<-1 to r.getMetaData.getColumnCount()){
    value(i-1) = r.getString(i)}
    list+=Row.fromSeq(value)}

    / Now we have the results of the query to PROD metastore and we want to transform this data to a Dataframe so we have to create a StructType with the names of the columns and we also need a list of rows with previous results /
    var array : Array[StructField] = new ArrayStructField)
    for(i<- 1 to r.getMetaData.getColumnCount){
    array(i-1) =StructField(r.getMetaData.getColumnName(i),StringType)}
    val struct=StructType(array)
    val rdd=sc.parallelize(list)
    val df=sqlContext.createDataFrame(rdd,struct)
    r.close
    conn.close
    请注意,此问题与我的其他答案有关。因为将Hive查询的结果导出到CSV的最佳方法是使用Spark(如何将Hive表导出为CSV文件?)。我想从PRE服务器中的Spark会话查询Prod Metastore。

    2019-07-17 23:23:26
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Apache Kudu & Apache Spark SQL 立即下载
SPARK AND COUCHBASE AUGMENTING THE OPERATIONAL DATABASE WITH SPARK 立即下载
The state of SQL-on-Hadoop in 立即下载