流数据

#流数据#

已有0人关注此标签

内容分类

社区小助手

Spark 【问答合集】

如何使用spark将kafka主题中的writeStream数据写入hdfs?https://yq.aliyun.com/ask/493211当Spark在S3上读取大数据集时,在“停机时间”期间发生了什么?https://yq.aliyun.com/ask/493212从Redshift读入Spark Dataframe(Spark-Redshift模块)https://yq.aliyun.com/ask/493215在初始化spark上下文后,在运行时更改pyspark的hadoop配置中的aws凭据https://yq.aliyun.com/ask/493217Window.rowsBetween - 仅考虑满足特定条件的行(例如,不为null)https://yq.aliyun.com/ask/493220spark的RDD内容直接用saveAsTextFile保存到hdfs时会出现中文乱码现象,但在控制台用foreach打印该RDD数据显示是正常的,该怎么解决呢?https://yq.aliyun.com/ask/494418请问一下如何能查看spark struct streaming内存使用情况呢?https://yq.aliyun.com/ask/494417使用spark 2.3 structed streaming 时 checkpoint 频繁在HDFS写小文件,块数到达百万级别 ,这个怎么优化下?https://yq.aliyun.com/ask/494415请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?https://yq.aliyun.com/ask/493702为什么我使用 dropDuplicates()函数报错Caused by: java.lang.NoSuchMethodError: org.codehaus.commons.compiler.Location.(Ljava/lang/String;II)V ?https://yq.aliyun.com/ask/493700请教一下,我hive中数据大小为16g,通过importtsv生成了hfile 文件,导入到hbase中了,数据变成130多g,还有什么更好的办法吗?https://yq.aliyun.com/ask/493698jdbc 连接spark thrift server 如何获取日志?https://yq.aliyun.com/ask/493582Spark如何从一行中仅提取Json数据?https://yq.aliyun.com/ask/493581pyspark - 在json流数据中找到max和min usign createDataFramehttps://yq.aliyun.com/ask/493234如何计算和获取Spark Dataframe中唯一ID的值总和?https://yq.aliyun.com/ask/493231如何将csv目录加载到hdfs作为parquet?https://yq.aliyun.com/ask/493224无法使用Spark在Datastax上初始化图形https://yq.aliyun.com/ask/493222使用PySpark计算每个窗口的用户数https://yq.aliyun.com/ask/493221sql语句不支持delete操作,如果我想执行delete操作该怎么办?https://yq.aliyun.com/ask/494420spark streaming 和 kafka ,打成jar包后((相关第三方依赖也在里面)),放到集群上总是报StringDecoder 找不到classhttps://yq.aliyun.com/ask/494421json字符串中有重名但大小写不同的key,使用play.api.libs.json.Json.parse解析json没有报错,但是spark-sql使用org.openx.data.jsonserde.JsonSerDe时,会自动将key转为小写,然后putOnce函数报错Duplicate keyhttps://yq.aliyun.com/ask/494423spark DataFrame写入HDFS怎么压缩?https://yq.aliyun.com/ask/495552使用Spark On Hive时,动态的将数据插入到Hive中,但是在Hive的数据表下会有很多文件,这个可以怎么设置一下呢?https://yq.aliyun.com/ask/495927 技术交流群 Apache Spark中国技术交流群 (钉钉扫码加入)

游客886

请问下大家,PG数据源目前支持kafka吗?或者有没有其他流数据源支持?

请问下大家,PG数据源目前支持kafka吗?或者有没有其他流数据源支持?本问题来自云栖社区【PostgreSQL技术进阶社群】。https://yq.aliyun.com/articles/690084 点击链接欢迎加入社区大社群。

赵慧

flink的时间窗口怎么写?

转自钉钉群21789141:flink的时间窗口怎么写.window(TumblingEventTimeWindows.of(Time.seconds(5)))算子直接这样写就行了么?里面的多道流数据处理写在哪啊?

社区小助手

pyspark - 在json流数据中找到max和min usign createDataFrame

我有一组由Kafka流式传输的json消息,每个消息都描述一个网站用户。使用pyspark,我需要计算每个国家/地区每个流媒体窗口的用户数,并返回具有最大和最小用户数的国家/地区。 以下是流式json消息的示例: {"id":1,"first_name":"Barthel","last_name":"Kittel","email":"bkittel0@printfriendly.com","gender":"Male","ip_address":"130.187.82.195","date":"06/05/2018","country":"France"}这是我的代码: from pyspark.sql.types import StructField, StructType, StringTypefrom pyspark.sql import Rowfrom pyspark import SparkContextfrom pyspark.sql import SQLContext fields = ['id', 'first_name', 'last_name', 'email', 'gender', 'ip_address', 'date', 'country']schema = StructType([ StructField(field, StringType(), True) for field in fields]) def parse(s, fields): try: d = json.loads(s[0]) return [tuple(d.get(field) for field in fields)] except: return [] array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema) rdd = sc.parallelize(array_of_users) group by country and then substitute the list of messages for each country by its length, resulting into a rdd of (country, length) tuplescountry_count = rdd.groupBy(lambda user: user['country']).mapValues(len) identify the min and max using as comparison key the second element of the (country, length) tuplecountry_min = country_count.min(key = lambda grp: grp[1])country_max = country_count.max(key = lambda grp: grp[1])当我运行它时,我收到消息 AttributeError Traceback (most recent call last) in () 16 return [] 17 ---> 18 array_of_users = parsed.SQLContext.createDataFrame(parsed.flatMap(lambda s: parse(s, fields)), schema) 19 20 rdd = sc.parallelize(array_of_users) AttributeError: 'TransformedDStream' object has no attribute 'SQLContext'我怎样才能解决这个问题?

社区小助手

生成“假”流数据。

我尝试生成流数据,以模拟我收到两个值的情况,Integer类型,在不同的时间范围内,带有时间戳,Kafka作为连接器。 我使用Flink环境作为消费者,但我不知道哪个是生产者的最佳解决方案。(如果可能,Java语法优于Scala) 我应该直接从Kafka生成数据吗?如果是,那么最好的方法是什么?或者如果我从Flink作为制作人生成数据,将其发送给Kafka并在Flink最后再次使用它可能会更好?我怎么能从flink那里做到这一点?或许还有另一种简单的方法来生成流数据并将其传递给Kafka。

flink小助手

Apche Flink - 活动时间

我想为Apache flink中的事件创建一个事件时钟。我是按照以下方式做的public class TimeStampAssigner implements AssignerWithPeriodicWatermarks> { private final long maxOutOfOrderness = 0; // 3.5 private long currentMaxTimestamp; @Override public long extractTimestamp(Tuple2<String, String> element, long previousElementTimestamp) { currentMaxTimestamp = new Date().getTime(); return currentMaxTimestamp; } @Override public Watermark getCurrentWatermark() { return new Watermark(currentMaxTimestamp - maxOutOfOrderness); } }请检查上面的代码,并告诉我是否正确执行。在事件时间和水印分配之后,我想要处理流程中的流,其中我将为不同的密钥收集流数据10分钟。

flink小助手

Flink批处理接收器

我试图以流式和批量方式使用flink,将大量数据添加到Accumulo(每分钟几百万)。我想在将记录发送到Accumulo之前批量记录。我从目录或通过kafka摄取数据,使用flatmap转换数据,然后传递给RichSinkFunction,RichSinkFunction将数据添加到集合中。 使用流数据,批处理似乎没问题,因为我可以将记录添加到固定大小的集合中,一旦达到批处理阈值就会将其发送到accumulo。但是对于有限的批处理数据,我很难找到一种好的批处理方法,因为如果在指定时间内没有其他数据需要刷新超时。似乎没有与弹性搜索或其他替代接收器不同的Accumulo连接器。 我想过使用带有触发器的批处理大小和时间间隔的过程函数,但这需要一个键控窗口。我不想沿着关键路线走下去,因为数据看起来非常偏斜,因为有些钥匙会有一吨记录而有些钥匙会很少。如果我不使用窗口方法,那么我理解操作符不会是并行的。我希望分批处理,所以每个接收器只关心数字或时间间隔。

flink小助手

如何在时间窗口内获取最新值

这就是我的流数据: time id group 1 a1 b1 2 a1 b2 3 a1 b3 4 a2 b3 在我们的窗口中考虑上面的所有示例 我的用例获取最新的独特ID。 我需要输出如下: time id group 3 a1 b3 4 a2 b3 我怎样才能在Flink中实现这一目标? 我知道窗口功能WindowFunction。但是,我无法绕过这样做。 我试过这只是为了获得不同的ID。如何将此功能扩展到我的用例? class DistinctGrid extends WindowFunction[UserMessage, String, Tuple, TimeWindow] { override def apply(key: Tuple, window: TimeWindow, input: Iterable[UserMessage], out: Collector[String]): Unit = { val distinctGeo = input.map(_.id).toSet for (i <- distinctGeo) { out.collect(i) } }}

flink小助手

Flink 1.6 bucketing sink HDFS文件卡在.in-progress中

我正在将Kafka数据流写入HDFS路径中的bucketing sink。卡夫卡发出了字符串数据。使用FlinkKafkaConsumer010从Kafka使用 -rw-r--r-- 3 ubuntu supergroup 4097694 2018-10-19 19:16 /streaming/2018-10-19--19/_part-0-1.in-progress-rw-r--r-- 3 ubuntu supergroup 3890083 2018-10-19 19:16 /streaming/2018-10-19--19/_part-1-1.in-progress-rw-r--r-- 3 ubuntu supergroup 3910767 2018-10-19 19:16 /streaming/2018-10-19--19/_part-2-1.in-progress-rw-r--r-- 3 ubuntu supergroup 4053052 2018-10-19 19:16 /streaming/2018-10-19--19/_part-3-1.in-progress只有当我使用一些映射函数动态操作流数据时才会发生这种情况。如果我直接将流写入HDFS,它的工作正常。知道为什么会这样吗?我使用的是Flink 1.6.1,Hadoop 3.1.1和Oracle JDK1.8

社区小助手

如何缓存spark streaming Dataset <Row>

我有一个sparkDataset,它流式传输csv文件的目录。所以我有这些问题: 如何缓存流数据集。如何在YARN中提交我的spark流媒体作业,我的流媒体作业应该永远运行,直到用户手动中断。

社区小助手

Spark Streaming Kafka Stream批处理执行

我目前正在实现一个从Kafka主题流式传输数据的应用程序。 是否常常使用应用程序仅运行一次批处理,例如,当天结束,收集主题中的所有数据,进行一些聚合和转换等等? 这意味着在使用spark-submit启动应用程序后,所有这些内容将在一个批处理中执行,然后应用程序将关闭。或者是spark流构建用于连续批量运行无限和永久流数据?

李博 bluemind

阿里云实时计算 Flink适用场景?

阿里云实时计算 Flink提供类标准的Flink SQL语义协助用您完成流式计算逻辑的处理。同时,受限于SQL代码功能无法满足某些特定场景的业务需求,阿里云实时计算 Flink为部分授信用户提供全功能的UDF函数,帮助授信用户完成业务定制化的数据处理逻辑。在流数据分析领域您可以直接使用Flink SQL+UDF即可完成大部分流式数据分析处理逻辑。

李博 bluemind

实时计算 Flink是什么?

转自实时计算:实时计算 Flink是什么?目前对信息高时效性、可操作性的需求不断增长,这要求软件系统在更少的时间内能处理更多的数据。传统的大数据处理模型将在线事务处理和离线分析从时序上将两者完全分割开来,但显然该架构目前已经越来越落后于人们对于大数据实时处理的需求。 实时计算的产生即来源于对于上述数据加工时效性的严苛需求: 数据的业务价值随着时间的流失而迅速降低,因此在数据发生后必须尽快对其进行计算和处理。而传统的大数据处理模式对于数据加工均遵循传统日清日毕模式,即以小时甚至以天为计算周期对当前数据进行累计并处理,显然这类处理方式无法满足数据实时计算的需求。在诸如实时大数据分析、风控预警、实时预测、金融交易等诸多业务场景领域,批量(或者说离线)处理对于上述对于数据处理时延要求苛刻的应用领域而言是完全无法胜任其业务需求的。而实时计算作为一类针对流数据的实时计算模型,可有效地缩短全链路数据流时延、实时化计算逻辑、平摊计算成本,最终有效满足实时处理大数据的业务需求。

xuzhengjun

如何利用内存技术构建大数据服务平台?

随着大数据、云计算等技术的应用和普及,创新的企业用大数据解决方案的开发成为很多开发者与创业者的关注方向。传统的数据库面临着挑战:效率下降、数据集成代价大、无法处理多样性的数据等等。 传统搭建大数据分析服务的方式非常繁琐,不仅需要大数据模型的开发,还需要选择合适的硬件,灵活高效的软件,费时费力费钱。如果有一个方便快捷的开发平台,直接上手,免费使用该有多好? 2009年,SAP HANA做了一个非常大胆的决定,就是以堆高大上硬件的方式,把几乎所有的数据都放在内存里。小伙伴们,想想内存的速度和硬盘能是一个量级的么?一个在Oracle或者DB2上需要一天才能完成的报表,换上HANA之后只需要3秒钟!!对,只需要3秒钟。不仅仅是速度,SAP HANA同时也具备了比较灵活的资源配置和高效的资源利用。 作为一款具有革命性意义的内存计算平台,HANA整合了符合原子性、一致性、隔离性、持久性(ACID)标准的数据库,高级数据处理功能,应用服务和灵活的数据集成服务。不局限于数据库,开发者可以在该平台上快速构建企业服务方案,目标企业可以快速部署使用。SAP HANA 平台适用于企业目前使用的所有应用,包括原有软件、第三方软件和SAP软件。SAP HANA非常欢迎开发者在该平台上构建更加创新卓越的企业服务。 辣么,说了这么多,我们的问题来了: 大家现在使用什么软件平台构建自己大数据服务? 与SAP HANA Plantform相比,优势和劣势分别是什么? 在使用SAP HANA Express时,大家有遇到什么困难吗? 大家对于SAP HANA Express有哪些满意或者期许的地方? 今后您希望看到哪些SAP HANA Express的信息或者文章? 未来是否会考虑用HANA Express+阿里云构建大数据服务?期望得到怎样的体验? 什么? SAP HANA有免费版本?!SAP HANA Express Edition是SAP HANA Platform的简化版本,适用于在笔记本电脑和云托管虚拟机等资源受限的环境中运行。精简版提供了一系列丰富的功能包括Streaming(流分析)、Fiori(产品前端UI开发框架)、增强的数据整合等,帮助你构建具备实时海量数据处理功能的应用。SAP HANA Express免费下载使用功能组件如下: 什么是SAP HANA? 什么是SAP HANA Express Edition? 获得更多中文SAP HANA Express下载使用方法,请关注SAP公众号:https://yq.aliyun.com/teams/154想要在阿里云栖大会得到SAP专家手把手教您SAP HANA Express的使用方法?欢迎扫码二维码提前注册,到达SAP展区,告知工作人员注册手机号即可参与SAP创新实验室活动。此外,我们将从扫码注册的童鞋中随机抽取2名,分别送出小米VR眼镜一副。