etl

#etl#

已有0人关注此标签

内容分类

社区小助手

使用Apache Spark时如何处理数据库的背压?

我们使用Apache Spark每2个小时执行一次ETL。 有时,Spark在执行读/写操作时会对数据库施加很大压力。 对于Spark Streaming,我可以backpressure在kafka上看到配置。 有没有办法在批处理中处理这个问题?

社区小助手

在build.sbt中,父项目中的依赖项未反映在子模块中

我在intellij idea 2017.1.6 ide中使用SBT 1.8.0作为我的spark scala项目。我想创建一个父项目及其子项目模块。到目前为止,这是我在build.sbt中的内容: lazy val parent = Project("spark-etl-parent",file(".")).settings(name := "spark-etl-parent_1.0",scalaVersion := "2.11.1",libraryDependencies ++= Seq("org.apache.spark" %% "spark-streaming" % sparkVersion % "provided" "org.apache.spark" %% "spark-hive" % sparkVersion % "provided")) lazy val etl = Project("spark-etl-etl",file("etl")).dependsOn(parent).settings(name := "spark-etl-etl_1.0",version := "1.0",scalaVersion := "2.11.1") lazy val redshiftBasin = Project("spark-etl- redshiftBasin",file("redshiftBasin")).dependsOn(parent).settings(name := "spark-etl-redshiftBasin_1.0",version := "1.0",scalaVersion := "2.11.1" ) lazy val s3Basin = Project("spark-etl-s3Basin",file("s3Basin")).dependsOn(parent).settings(name := "spark-etl-s3Basin_1.0",version := "1.0",scalaVersion := "2.11.1")现在,我可以从父模块中的spark-streaming或spark-hive库依赖项中导入任何类,但无法在任何子模块中导入和使用它们。只有当我在任何子模块中明确地将它们指定为库依赖项时,我才能使用它们。 我正在使用Maven构建在pom.xml中寻找类似于依赖项标记的东西。如果我为每个子模块使用单独的build.sbt会有所不同吗?此外,如果我在父配置中执行.aggregate(etl),则会显示错误,因为稍后会声明etl。但是,如果我在父母之前定义etl,我无法在etl config中执行.dependsOn(parent)。

社区小助手

Spark批处理从多列DataFrame写入Kafka主题

在批处理之后,Spark ETL需要向Kafka主题写入包含多个不同列的结果DataFrame。 根据以下Spark文档https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html,写入Kafka的Dataframe在模式中应具有以下必需列: value(必需)字符串或二进制 正如我之前提到的,我有更多列的值,所以我有一个问题 - 如何将整个DataFrame行作为单个消息正确地从我的Spark应用程序发送到Kafka主题?我是否需要将所有列中的所有值连接到具有单个值列(将包含连接值)的新DataFrame中,或者还有更合适的方法来实现它?

原地

使用datax做postgresql到postgresql的数据迁移,报错连接数据库失败

很奇怪的现象是:始终说Reader的数据库是 Available jdbcUrl,然而Writer就会报错:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:java.sql.SQLException: No suitable driver found for ["jdbc:postgresql://127.0.0.1:5432/XXXXXX"]。 即使我将配置文件中的writer和reader连接信息互换,依然会报writer连接失败,reader连接正常。 分别将两个数据库作为reader 写到streamwriter 也没有问题 能print出结果 说明两个数据库本身没有问题 配置的连接信息也没有问题 那问题出在哪里呢???-----2017/10/26----今天又用mysqlwriter测试了,问题依然存在? 配置内容如下{ "job": { "content": [ { "reader": { "name": "postgresqlreader", "parameter": { "column": ["name","disp","data"], "connection": [ { "jdbcUrl": ["jdbc:postgresql://139.XXX.XXX.XX:5432/XXXXX"], "table": ["sys_lic"] } ], "password": "XXXXXXX", "username": "XXXXXX" } }, "writer": { "name": "postgresqlwriter", "parameter": { "column": ["name","disp","data"], "connection": [ { "jdbcUrl": ["jdbc:postgresql://127.0.0.1:5432/XXXXXX"], "table": ["sys_lic"] } ], "password": "XXXXXX", "username": "XXXXXXX" } } } ], "setting": { "speed": { "channel": "1" } } } } 报错信息如下2017-10-25 12:45:15.502 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null2017-10-25 12:45:15.505 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=02017-10-25 12:45:15.506 [main] INFO JobContainer - DataX jobContainer starts job.2017-10-25 12:45:15.516 [main] INFO JobContainer - Set jobId = 02017-10-25 12:45:16.049 [job-0] INFO OriginalConfPretreatmentUtil - Available jdbcUrl:jdbc:postgresql://139.XXX.XXX.XX:5432/XXXXXX.2017-10-25 12:45:16.168 [job-0] INFO OriginalConfPretreatmentUtil - table:[sys_lic] has columns:[name,disp,data].2017-10-25 12:45:16.301 [job-0] ERROR RetryUtil - Exception when calling callable, 异常Msg:Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:java.sql.SQLException: No suitable driver found for ["jdbc:postgresql://127.0.0.1:5432/XXXXXX"]com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-10], Description:[连接数据库失败. 请检查您的 账号、密码、数据库名称、IP、Port或者向 DBA 寻求帮助(注意网络环境).]. - 具体错误信息为:java.sql.SQLException: No suitable driver found for ["jdbc:postgresql://127.0.0.1:5432/XXXXXX"] at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26) ~[datax-common-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.RdbmsException.asConnException(RdbmsException.java:36) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil.connect(DBUtil.java:394) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil.connect(DBUtil.java:384) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil.access$000(DBUtil.java:22) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil$3.call(DBUtil.java:322) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil$3.call(DBUtil.java:319) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.common.util.RetryUtil$Retry.call(RetryUtil.java:164) ~[datax-common-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.common.util.RetryUtil$Retry.doRetry(RetryUtil.java:111) ~[datax-common-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.common.util.RetryUtil.executeWithRetry(RetryUtil.java:30) [datax-common-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil.getConnection(DBUtil.java:319) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.DBUtil.getConnection(DBUtil.java:303) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.util.JdbcConnectionFactory.getConnecttion(JdbcConnectionFactory.java:27) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:105) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.dealColumnConf(OriginalConfPretreatmentUtil.java:140) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.writer.util.OriginalConfPretreatmentUtil.doPretreatment(OriginalConfPretreatmentUtil.java:35) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter$Job.init(CommonRdbmsWriter.java:41) [plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.plugin.writer.postgresqlwriter.PostgresqlWriter$Job.init(PostgresqlWriter.java:33) [postgresqlwriter-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.initJobWriter(JobContainer.java:704) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:304) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.start(Engine.java:92) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.entry(Engine.java:171) [datax-core-0.0.1-SNAPSHOT.jar:na] at com.alibaba.datax.core.Engine.main(Engine.java:204) [datax-core-0.0.1-SNAPSHOT.jar:na]

祁同伟

大数据开发套件替代传统etl 工具,那么问题来了 MaxCompute貌似不支持delete 和update 动作,这样的话如果我数据出现问题,重跑数据 我应该用什么解决方案取做呢?

大数据开发套件替代传统etl 工具,那么问题来了 MaxCompute貌似不支持delete 和update 动作,这样的话如果我数据出现问题,重跑数据 我应该用什么解决方案取做呢?

知与谁同

1. 我们是一家金融公司,之前人少钱少,现在随着人多了,碰到了数据管理上的难点,希望有解决方案 2. 多个数据源的ETL + 多种数据类似(时间序列,复杂对象,基本面数据等)的存储(规模上T) 3. 希望在数据获取,清洗,存储,获取一整套系统上,都有所提高。

1. 我们是一家金融公司,之前人少钱少,现在随着人多了,碰到了数据管理上的难点,希望有解决方案2. 多个数据源的ETL + 多种数据类似(时间序列,复杂对象,基本面数据等)的存储(规模上T)3. 希望在数据获取,清洗,存储,获取一整套系统上,都有所提高。

卓刀

目前HybridDB支持哪些ETL工具?

目前HybridDB支持哪些ETL工具,望解答。

sjtlqy

如何把redis的数据实时的同步到hdfs或者hbase上?

如何把redis的数据实时的同步到hdfs或者hbase上?使用订阅的方式实时同步还是有其他方案?