Spark On MaxCompute如何访问Phonix数据

简介: 如何使用Spark On MaxCompute连接Phonix,将Hbase的数据写入到MaxCompute的对应表中,目前没有对应的案例,为了满足用户的需求。本文主要讲解使用Spark连接Phonix访问Hbase的数据再写入到MaxCompute方案实践。该方案的验证是使用hbase1.1对应Phonix为4.12.0。本文从阿里云Hbase版本的选择、确认VPC、vswitchID、设置白名单和访问方式,Phonix4.12.0的客户端安装,在客户端实现Phonix表的创建和写入,Spark代码在本地IDEA的编写以及pom文件以及vpcList的配置,打包上传jar包并进行冒烟测试。

一、购买Hbase1.1并设置对应资源

1.1购买hbase

hbase主要版本为2.0与1.1,这边选择对应hbase对应的版本为1.1
Hbase与Hbase2.0版本的区别
HBase1.1版本
1.1版本基于HBase社区1.1.2版本开发。
HBase2.0版本
2.0版本是基于社区2018年发布的HBase2.0.0版本开发的全新版本。同样,在此基础上,做了大量的改进和优化,吸收了众多阿里内部成功经验,比社区HBase版本具有更好的稳定性和性能。
1600152836102-e55b53ac-380f-468f-8abb-d858bd02f6d8.png

1.2确认VPC,vsWitchID

确保测试联通性的可以方便可行,该hbase的VPCId,vsWitchID尽量与购买的独享集成资源组的为一致的,独享集成资源的文档可以参考https://help.aliyun.com/document_detail/137838.html
1600152882272-d5f5849e-b0b8-4485-bcb3-c74fe4ff3169.png

1.3设置hbase白名单,其中DataWorks白名单如下,个人ECS也可添加

image.png
根据文档链接选择对应的DataWorks的region下的白名单进行添加https://help.aliyun.com/document_detail/137792.html
1600153040149-a1350bfb-febc-43cd-bd8b-5c1a7b0a2e03.png

1.4查看hbase对应的版本和访问地址

打开数据库链接的按钮,可以查看到Hbase的主版本以及Hbase的专有网络访问地址,以及是否开通公网访问的方式进行连接。

1600152748791-1a0c3c5d-80a8-4296-9210-90d459d80480.png

二、安装Phonix客户端,并创建表和插入数据

2.1安装客户端

根据hbase的版本为1.1选择Phonix的版本为4.12.0根据文档https://help.aliyun.com/document_detail/53600.html 下载对应的客户端文件ali-phoenix-4.12.0-AliHBase-1.1-0.9.tar.gz
登陆客户端执行命令

./bin/sqlline.py 172.16.0.13,172.16.0.15,172.16.0.12:2181

1600077441347-fde10ae4-91d0-4ca3-99d5-5fd89a75be6b.png
创建表:

CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;

插入数据:

UPSERT INTO users (id, username, password) VALUES (1, 'admin', 'Letmein');

2.2查看是否创建和插入成功

在客户端执行命令,查看当前表与数据是否上传成功

select * from users;

1600078146386-cbadedf0-702a-4e3b-9ff1-2176cb85bfcd.png

三、编写对应代码逻辑

3.1编写代码逻辑

在IDEA按照对应得Pom文件进行配置本地得开发环境,将代码涉及到得配置信息填写完整,进行编写测试,这里可以先使用Hbase得公网访问链接进行测试,代码逻辑验证成功后可调整配置参数,具体代码如下


package com.git.phonix
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.phoenix.spark._
/**
  * 本实例适用于Phoenix 4.x版本
  */
object SparkOnPhoenix4xSparkSession {
  def main(args: Array[String]): Unit = {
    //HBase集群的ZK链接地址。
    //格式为:xxx-002.hbase.rds.aliyuncs.com,xxx-001.hbase.rds.aliyuncs.com,xxx-003.hbase.rds.aliyuncs.com:2181
    val zkAddress = args(0)
    //Phoenix侧的表名,需要在Phoenix侧提前创建。Phoenix表创建可以参考:https://help.aliyun.com/document_detail/53716.html?spm=a2c4g.11186623.4.2.4e961ff0lRqHUW
    val phoenixTableName = args(1)
    //Spark侧的表名。
    val ODPSTableName = args(2)
    val sparkSession = SparkSession
      .builder()
      .appName("SparkSQL-on-MaxCompute")
      .config("spark.sql.broadcastTimeout", 20 * 60)
      .config("spark.sql.crossJoin.enabled", true)
      .config("odps.exec.dynamic.partition.mode", "nonstrict")
      //.config("spark.master", "local[4]") // 需设置spark.master为local[N]才能直接运行,N为并发数
      .config("spark.hadoop.odps.project.name", "***")
      .config("spark.hadoop.odps.access.id", "***")
      .config("spark.hadoop.odps.access.key", "***")
      //.config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.end.point", "http://service.cn-beijing.maxcompute.aliyun-inc.com/api")
      .config("spark.sql.catalogImplementation", "odps")
      .getOrCreate()
    //第一种插入方式
    var df = sparkSession.read.format("org.apache.phoenix.spark").option("table", phoenixTableName).option("zkUrl",zkAddress).load()
    df.show()
    df.write.mode("overwrite").insertInto(ODPSTableName)
  }
}

3.2对应Pom文件

pom文件中分为Spark依赖,与ali-phoenix-spark相关的依赖,由于涉及到ODPS的jar包,会在集群中引起jar冲突,所以要将ODPS的包排除掉

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <properties>
        <spark.version>2.3.0</spark.version>
        <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
        <scala.version>2.11.8</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
        <phoenix.version>4.12.0-HBase-1.1</phoenix.version>
    </properties>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>Spark-Phonix</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.jpmml</groupId>
            <artifactId>pmml-model</artifactId>
            <version>1.3.8</version>
        </dependency>
        <dependency>
            <groupId>org.jpmml</groupId>
            <artifactId>pmml-evaluator</artifactId>
            <version>1.3.10</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scalap</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>cupid-sdk</artifactId>
            <version>${cupid.sdk.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-core</artifactId>
            <version>4.12.0-AliHBase-1.1-0.8</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun.odps</groupId>
                    <artifactId>odps-sdk-mapred</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.aliyun.odps</groupId>
                    <artifactId>odps-sdk-commons</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.aliyun.phoenix</groupId>
            <artifactId>ali-phoenix-spark</artifactId>
            <version>4.12.0-AliHBase-1.1-0.8</version>
            <exclusions>
                <exclusion>
                    <groupId>com.aliyun.phoenix</groupId>
                    <artifactId>ali-phoenix-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>false</minimizeJar>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <artifactSet>
                                <includes>
                                    <!-- Include here the dependencies you
                                        want to be packed in your fat jar -->
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>**/log4j.properties</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile-first</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

四、打包上传到DataWorks进行冒烟测试

4.1创建要传入的MaxCompute表

CREATE TABLE IF NOT EXISTS users_phonix
(
    id       INT   ,
    username STRING,
    password STRING
) ;

4.2打包上传到MaxCompute

在IDEA打包要打成shaded包,将所有的依赖包,打入jar包中,由于DatadWork界面方式上传jar包有50M的限制,因此采用MaxCompute客户端进行jar包
1600073394094-678da37c-b504-432b-a430-a022d302bfec.png

4.3选择对应的project环境,查看上传资源,并点击添加到数据开发

进入DataWorks界面选择左侧资源图标,选择对应的环境位开发换进,输入删除文件时的文件名称进行搜索,列表中展示该资源已经上传成,点击提交到数据开发

1600073680606-96fb9057-f274-4afa-a98c-b2c4d94c2c33.png点击提交按钮

1600073719127-33ab00de-0a81-4b3c-a76f-24cadc8e0176.png

4.4配置对应的vpcList参数并提交任务测试

其中的配置vpcList文件的配置信息如下,可具体根据个人hbase的链接,进行配置

{
    "regionId":"cn-beijing",
    "vpcs":[
        {
            "vpcId":"vpc-2ze7cqx2bqodp9ri1vvvk",
            "zones":[
                {
                    "urls":[
                        {
                            "domain":"172.16.0.12",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":2181
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16000
                        },
                        {
                            "domain":"172.16.0.12",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.13",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.15",
                            "port":16020
                        },
                        {
                            "domain":"172.16.0.14",
                            "port":16020
                        }
                    ]
                }
            ]
        }
    ]
}

Spark任务提交任务的配置参数,主类,以及对应的参数
该参数主要为3个参数第一个为Phonix的链接,第二个为Phonix的表名称,第三个为传入的MaxCompute表
name.png

点击冒烟测试按钮,可以看到任务执行成功
1600071547065-f1b1ff1e-8460-4bfc-9e42-7dc1e30f9d0f.png
在临时查询节点中执行查询语句,可以得到数据已经写入MaxCompute的表中
1600756783637-33baf55a-e314-4370-83f8-a97180679bfa.png

总结:

使用Spark on MaxCompute访问Phonix的数据,并将数据写入到MaxCompute的表中经过实践,该方案时可行的。但在实践的时有几点注意事项:
1.结合实际使用情况选择对应的Hbase以及Phonix版本,对应的版本一致,并且所使用的客户端,以及代码依赖都会有所改变。
2.使用公网在IEAD进行本地测试,要注意Hbase白名单,不仅要设置DataWorks的白名单,还需将自己本地的地址加入到白名单中。
3.代码打包时需要将pom中的依赖关系进行梳理,避免ODPS所存在的包在对应的依赖中,进而引起jar包冲突,并且打包时打成shaded包,避免缺失遗漏对应的依赖。

欢迎加入“MaxCompute开发者社区2群”,点击链接申请加入或扫描二维码
https://h5.dingtalk.com/invite-page/index.html?bizSource=____source____&corpId=dingb682fb31ec15e09f35c2f4657eb6378f&inviterUid=E3F28CD2308408A8&encodeDeptId=0054DC2B53AFE745
image.png

相关实践学习
简单用户画像分析
本场景主要介绍基于海量日志数据进行简单用户画像分析为背景,如何通过使用DataWorks完成数据采集 、加工数据、配置数据质量监控和数据可视化展现等任务。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
2月前
|
分布式计算 DataWorks IDE
MaxCompute数据问题之忽略脏数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
47 0
|
2月前
|
SQL 存储 分布式计算
MaxCompute问题之下载数据如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
38 0
|
2月前
|
分布式计算 关系型数据库 MySQL
MaxCompute问题之数据归属分区如何解决
MaxCompute数据包含存储在MaxCompute服务中的表、分区以及其他数据结构;本合集将提供MaxCompute数据的管理和优化指南,以及数据操作中的常见问题和解决策略。
35 0
|
13天前
|
分布式计算 Hadoop 大数据
大数据技术与Python:结合Spark和Hadoop进行分布式计算
【4月更文挑战第12天】本文介绍了大数据技术及其4V特性,阐述了Hadoop和Spark在大数据处理中的作用。Hadoop提供分布式文件系统和MapReduce,Spark则为内存计算提供快速处理能力。通过Python结合Spark和Hadoop,可在分布式环境中进行数据处理和分析。文章详细讲解了如何配置Python环境、安装Spark和Hadoop,以及使用Python编写和提交代码到集群进行计算。掌握这些技能有助于应对大数据挑战。
|
1天前
|
分布式计算 大数据 数据处理
[AIGC大数据基础] Spark 入门
[AIGC大数据基础] Spark 入门
|
10天前
|
数据采集 搜索推荐 大数据
大数据中的人为数据
【4月更文挑战第11天】人为数据,源于人类活动,如在线行为和社交互动,是大数据的关键部分,用于理解人类行为、预测趋势和策略制定。数据具多样性、实时性和动态性,广泛应用于市场营销和社交媒体分析。然而,数据真实性、用户隐私和处理复杂性构成挑战。解决策略包括数据质量控制、采用先进技术、强化数据安全和培养专业人才,以充分发挥其潜力。
14 3
|
13天前
|
运维 供应链 大数据
数据之势丨从“看数”到“用数”,百年制造企业用大数据实现“降本增效”
目前,松下中国旗下的64家法人公司已经有21家加入了新的IT架构中,为松下集团在中国及东北亚地区节约了超过30%的总成本,减少了近50%的交付时间,同时,大幅降低了系统的故障率。
|
1月前
|
SQL 分布式计算 Java
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
Spark学习---SparkSQL(概述、编程、数据的加载和保存、自定义UDFA、项目实战)
98 1
|
1月前
|
分布式计算 DataWorks 关系型数据库
DataWorks报错问题之dataworks同步rds数据到maxcompute时报错如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
1月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
107 2

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute