stream

#stream#

已有1人关注此标签

内容分类

社区小助手

Spark 【问答合集】

如何使用spark将kafka主题中的writeStream数据写入hdfs?https://yq.aliyun.com/ask/493211当Spark在S3上读取大数据集时,在“停机时间”期间发生了什么?https://yq.aliyun.com/ask/493212从Redshift读入Spark Dataframe(Spark-Redshift模块)https://yq.aliyun.com/ask/493215在初始化spark上下文后,在运行时更改pyspark的hadoop配置中的aws凭据https://yq.aliyun.com/ask/493217Window.rowsBetween - 仅考虑满足特定条件的行(例如,不为null)https://yq.aliyun.com/ask/493220spark的RDD内容直接用saveAsTextFile保存到hdfs时会出现中文乱码现象,但在控制台用foreach打印该RDD数据显示是正常的,该怎么解决呢?https://yq.aliyun.com/ask/494418请问一下如何能查看spark struct streaming内存使用情况呢?https://yq.aliyun.com/ask/494417使用spark 2.3 structed streaming 时 checkpoint 频繁在HDFS写小文件,块数到达百万级别 ,这个怎么优化下?https://yq.aliyun.com/ask/494415请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?https://yq.aliyun.com/ask/493702为什么我使用 dropDuplicates()函数报错Caused by: java.lang.NoSuchMethodError: org.codehaus.commons.compiler.Location.(Ljava/lang/String;II)V ?https://yq.aliyun.com/ask/493700请教一下,我hive中数据大小为16g,通过importtsv生成了hfile 文件,导入到hbase中了,数据变成130多g,还有什么更好的办法吗?https://yq.aliyun.com/ask/493698jdbc 连接spark thrift server 如何获取日志?https://yq.aliyun.com/ask/493582Spark如何从一行中仅提取Json数据?https://yq.aliyun.com/ask/493581pyspark - 在json流数据中找到max和min usign createDataFramehttps://yq.aliyun.com/ask/493234如何计算和获取Spark Dataframe中唯一ID的值总和?https://yq.aliyun.com/ask/493231如何将csv目录加载到hdfs作为parquet?https://yq.aliyun.com/ask/493224无法使用Spark在Datastax上初始化图形https://yq.aliyun.com/ask/493222使用PySpark计算每个窗口的用户数https://yq.aliyun.com/ask/493221sql语句不支持delete操作,如果我想执行delete操作该怎么办?https://yq.aliyun.com/ask/494420spark streaming 和 kafka ,打成jar包后((相关第三方依赖也在里面)),放到集群上总是报StringDecoder 找不到classhttps://yq.aliyun.com/ask/494421json字符串中有重名但大小写不同的key,使用play.api.libs.json.Json.parse解析json没有报错,但是spark-sql使用org.openx.data.jsonserde.JsonSerDe时,会自动将key转为小写,然后putOnce函数报错Duplicate keyhttps://yq.aliyun.com/ask/494423spark DataFrame写入HDFS怎么压缩?https://yq.aliyun.com/ask/495552使用Spark On Hive时,动态的将数据插入到Hive中,但是在Hive的数据表下会有很多文件,这个可以怎么设置一下呢?https://yq.aliyun.com/ask/495927 技术交流群 Apache Spark中国技术交流群 (钉钉扫码加入)

李博 bluemind

stream如何实现分组后,多个字段分别求和?

比如user对象,我用性别分组,怎么即算出年龄总和,又算出身高总和 本问题来自云栖社区【阿里Java技术进阶2群】。https://yq.aliyun.com/articles/690084 点击链接欢迎加入社区大社群。

dataworks团队

互联网大数据时代的实时计算需求,你打算用什么方式来应对?

现在互联网大数据背景下,由于数据得到广泛应用之后,数据的价值随着时间的流逝而降低,所以事件出现后必须尽快对它们进行处理,最好事件出现时便立刻对其进行处理,发生一个事件进行一次处理,而不是缓存起来成一批处理。实时计算引擎近些也随之崛起,比如业界广泛使用的就是Flink引擎,同时这就要求开发人员在计算引擎上建立和开发对应的实时计算任务。 传统实时任务开发痛点 无论是SQL还是Java、python,开发任务必须写代码,而且代码形式不利于后期业务逻辑分析和维护 开发过程中无法调试,只能等全部任务完成,才能上线调试,有问题修改后在上线,反复验证 UDF开发需要单独打包编译上传,跟任务本身属于分割开发状态,容易造成流程繁琐,版本混淆 运维门槛非常高,如果不是对实时计算引擎了解精深,很难做到有效运维、排查问题 为了解决上述问题,DataWorks重磅推出的全新实时计算产品——Stream Studio,是个一站式流计算开发平台,基于阿里巴巴实时计算引擎Flink构建。同时在易用性上大大降低了开发者的使用门槛。 StreamStudio集DAG和SQL两种开发模式,支持DAG与SQL互相转换,通过可视化拖拽就可以轻松实现流计算作业开发,适用于实时ETL、实时报表、实时大屏、监控预警以及各类实时在线系统等应用场景。 >>5月15日 15:00 DataWorks Stream Studio发布会 直播间入口>>更多了解Stream Studio 话题1.之前你对大数据实时计算有了解吗?知道哪些应用场景?2.是否开发过实时任务,有的话,是用于什么场景的?用过什么工具?3.对于大数据实时计算的平台,你有什么好的建议吗?

云攻略小攻

千万QPS的缓存!Redis 5.0最新的Stream数据结构,你还在等什么?

讲到Redis大家都很熟悉,Redis是意大利西西里岛帅哥Salvatore Sanfilippo的开源作品~,2009年诞生,走到2019年已经十周年了~Redis最大的特点就是快,访问延时在1ms内,同时单实例性能就能达到10w+ QPS,中国互联网典型的场景就是用户多、高并发,所以各大互联网公司都对Redis有深度的使用。作者最新推出的Redis 5.0增加了Stream的数据结构, 它是一个新的强大的支持多播的可持久化的消息队列。本次阿里云将推出全新版本的Redis 5.0,在提供社区功能的基础上提供了更多的企业级的特性~ (1)审计日志:提供操作日志、敏感日志、慢日志的审计能力~对于重要的数据变更能做到可以追溯(2)大key分析能力: Redis使用过程中经常因为业务使用不当引起大key的问题,为此阿里云Redis在服务端提供了大Key的分析能力,当前分析能力覆盖了Redis 4.0、5.0的版本~,可以为客户分析后端数据的内存使用情况。同时在本次发布会我们还将对阿里云Redis的性能增强版本和混合云解决方案进行介绍~对于性能增强版本我们通过多线程突破了开源Redis单线程的瓶颈,整体的QPS能力能达到社区的300%,对于混合云的解决方案客户可以通过redis-shake搭建同步通道,通过redis-full-check搭建数据比对通道,客户可以在业务上灵活配置访问流量,保证企业客户的上云流程~云数据库Redis 5.0重磅发布会,直播入口 支持全新Stream 数据类型,给您不一样的缓存体验,快速查看! 你优秀,你来说:1、您在什么样的业务场景下会使用Redis?一般是单独使用还是组合其他数据库共同使用?2、您对于Redis Stream的应用场景有什么希望了解的,对于消息队列的选型有什么历史经验?3、您希望阿里云Redis提供哪些功能帮助您解决日常使用问题?

李博 bluemind

Stream函数式操作流元素是否比for性能高?

Stream函数式操作流元素是否比for性能高?

游客irm3nny572smk

post请求返回302是为什么

使用一句话识别RESTful API以POST方式请求返回的是 HTTP/1.1 302 FoundServer: Tengine Date: Thu, 28 Mar 2019 02:52:56 GMTContent-Type: text/htmlContent-Length: 258Connection: closeLocation: http://err.taobao.com/error1.htmlVia: tongyijieru-aliyun-com1119311638.em14[web,302] 请求报名:POST /stream/v1/asr?appkey=SY*VZvAc&format=pcm&sample_rate=8000&enable_punctuation_prediction=true&enable_inverse_text_normalization=true HTTP/1.1X-NLS-Token: 0112c68**7002b6cContent-Type: application/octet-streamcache-control: no-cache Accept: /Host: nls-gateway.cn-shanghai.aliyuncs.comaccept-encoding: gzip, deflatecontent-length: 43408Connection: keep-alive

游客irm3nny572smk

post请求返回302是什么问题

302 Found The requested resource resides temporarily under a different URI. Powered by Tengine

社区小助手

Spark Streaming Kafka Stream批处理执行

Spark Streaming Kafka Stream批处理执行

krystal.xu

Flink 如何支持动态sql ?

flnk 新手一枚。。我理解flink stream = 动态table, 但是sql里的 where语句条件能否使用stream/source的值。 example: DataStream<String> input = env.socketTextStream("localhost", 9000, "\n"); DataStream clickStream = input.flatMap(new ClickHbaseMapFunc()); tEnv.registerDataStream("clickTable", clickStream, "id, studentid, url, itemtype, quantity"); Table result = tEnv.sqlQuery("SELECT * FROM clickTable WHERE studentid = '"+ ????+"'" ); 当中的 where语句条件 如何 能 从 input stream中 获取 ?

社区小助手

请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?

请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?

stevenchennet

flink如何计算设备离网

场景描述设备工作期间每隔固定周期(比如30s)就往kafka发送采集到的数据,当设备超过15分钟没有上报数据,则认为设备离网了。如何每隔10分钟计算出设备在该10分钟时间段内的离网时长? 按我的理解(可能有错,请多指正),stream不设置window的时候会使用全量窗口,当新数据来的时候,只是将全量窗口的该设备的最新上报时间更新,每隔10分钟获取globalwindow数据,然后filter最新上报时间到现在时间的差值大于15分钟即可,但是我没有找到flink里如何获取globalwindow的数据。 btw:如果是spark,是可以做的,filter>15min,然后output的model设置为complete即可获取globalwindow的数据。请问flink下该方案该如何做?

乌野

RichAllWindowFunction的open方法在什么时候被触发

如题 还有, 自定义的RichAllWindowFunction应该怎样使用才能触发open方法. var filteredstream = inputStream .flatMap(buildHbaseMsg(_)) .filter(b => b.actType match { case 0|1|2|3 => true case _ => false }) .countWindowAll(2L).apply(new CustomWindowFunction ())' 这样吗? 尝试这样调用, open并没有执行'stream.apply((window, input, out: Collector[Unit]) => new CustomWindowFunction ().apply(window, input, out))'

晓生寒

[@小川游鱼][¥20]kafka的消费者stream iterator block,读不出消息?

rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下: import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties; import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties; /** Created by Administrator on 2015/10/10.*/ public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //zookeeper 配置 props.put("zookeeper.connect", "Master:2181"); //group 代表一个消费组 props.put("group.id", "consumer3"); // 连接zk的session超时时间 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间 props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率 props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常 //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("consumertest", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0); ConsumerIterator<String, String> it = stream.iterator(); System.out.println("consumer starting..."); while (it.hasNext()) System.out.println(it.next().message()); System.out.println("consumer over"); } public static void main(String[] args) { new KafkaConsumer().consume(); } } 谢谢~

flink小助手

Apache Flink:使用filter()或split()来分割流?

我有一个来自Kafka的DataStream,它对MyModel中的一个字段有两个可能的值。MyModel是一个pojo,其中包含来自Kafka的消息解析的特定于域的字段。 DataStream stream = env.addSource(myKafkaConsumer);我想分别在每个键a1,a2上应用窗口和运算符。将它们分开的好方法是什么?我有2个选项过滤器并选择,但不知道哪个更快。 过滤方式 stream .filter(<MyModel.a == a1>) .keyBy() .window() .apply() .addSink() stream .filter(<MyModel.a == a2>) .keyBy() .window() .apply() .addSink() 拆分并选择方法 SplitStream split = stream.split(…) split .select(<MyModel.a == a1>) … .addSink() split .select<MyModel.a == a2>() … .addSink() 如果split和select更好,如果我想根据MyModel中字段的值进行拆分,如何实现它们?

社区小助手

在Spark Stream中保存PairRdd时出错[重复]

我试图在spark流中保存我的Pair Rdd但在最后一步保存时出错。 这是我的示例代码 def main(args: Array[String]) { val inputPath = args(0) val output = args(1) val noOfHashPartitioner = args(2).toInt println("IN Streaming ") val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) val hadoopConf = sc.hadoopConfiguration; //hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") val ssc = new org.apache.spark.streaming.StreamingContext(sc, Seconds(60)) val input = ssc.textFileStream(inputPath) val pairedRDD = input.map(row => { val split = row.split("\\|") val fileName = split(0) val fileContent = split(1) (fileName, fileContent) }) import org.apache.hadoop.io.NullWritable import org.apache.spark.HashPartitioner import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } //print(pairedRDD) pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec]) ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate }我在保存时已经到了最后一步。一定在这里遗漏了什么,得到的错误就像 value partitionBy不是org.apache.spark.streaming.dstream.DStream [(String,String)]的数据

社区小助手

在追加模式下激活水印和窗口

下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片。代码在附加模式下仅生成空的批处理0。在更新模式下,结果会正确显示。需要附加模式,因为S3接收器仅在附加模式下工作。 String windowDuration = "24 hours";String slideDuration = "15 minutes";Dataset sliding24h = rowData .withWatermark(eventTimeCol, slideDuration) .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol)).count(); sliding24h .writeStream() .format("console") .option("truncate", false) .option("numRows", 1000) .outputMode(OutputMode.Append()) //.outputMode(OutputMode.Complete()) .start() .awaitTermination(); 以下是完整的测试代码: public static void main(String [] args) throws StreamingQueryException { SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate(); ArrayList<String> rl = new ArrayList<>(); for (int i = 0; i < 200; ++i) { long t = 1512164314L + i * 5 * 60; rl.add(t + ",qwer"); } String nameCol = "name"; String eventTimeCol = "eventTime"; String eventTimestampCol = "eventTimestamp"; MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING()); input.addData(JavaConversions.asScalaBuffer(rl).toSeq()); Dataset<Row> stream = input.toDF().selectExpr( "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol, "cast(split(value,'[,]')[1] as String) as " + nameCol); System.out.println("isStreaming: " + stream.isStreaming()); Column eventTime = functions.to_timestamp(col(eventTimestampCol)); Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime); String windowDuration = "24 hours"; String slideDuration = "15 minutes"; Dataset<Row> sliding24h = rowData .withWatermark(eventTimeCol, slideDuration) .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol)).count(); sliding24h .writeStream() .format("console") .option("truncate", false) .option("numRows", 1000) .outputMode(OutputMode.Append()) //.outputMode(OutputMode.Complete()) .start() .awaitTermination(); }

flink小助手

使用AvroParquetWriter将flink接收到parquet文件不会将数据写入文件

我正在尝试使用AvroParquetWriter将parquet文件写为接收器。文件已创建但长度为0(未写入数据)。无法清楚问题在哪里? import io.eels.component.parquet.ParquetWriterConfigimport org.apache.avro.Schemaimport org.apache.avro.generic.{GenericData, GenericRecord}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.hadoop.fs.Pathimport org.apache.parquet.avro.AvroParquetWriterimport org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}import org.apache.parquet.hadoop.metadata.CompressionCodecName import scala.io.Sourceimport org.apache.flink.streaming.api.scala._ object Tester extends App {val env = StreamExecutionEnvironment.getExecutionEnvironment def now = System.currentTimeMillis() val path = new Path(s"/tmp/test-$now.parquet") val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString val schema: Schema = new Schema.Parser().parse(schemaString) val compressionCodecName = CompressionCodecName.SNAPPY val config = ParquetWriterConfig() val genericReocrd: GenericRecord = new GenericData.Record(schema) genericReocrd.put("name", "test_b") genericReocrd.put("code", "NoError") genericReocrd.put("ts", 100L) val stream = env.fromElements(genericReocrd) val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builderGenericRecord .withSchema(schema) .withCompressionCodec(compressionCodecName) .withPageSize(config.pageSize) .withRowGroupSize(config.blockSize) .withDictionaryEncoding(config.enableDictionary) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withValidation(config.validating) .build() writer.write(genericReocrd) stream.addSink{r => writer.write(r) } env.execute()

社区小助手

如何使用writeFileAsText(path)写入文件时传递动态路径?

假设我有一个包含类型元素的Stream String。我想将流中的每个元素写入某个文件夹中的单独文件。我正在使用以下设置。 stream.writeAsText(path).setParallelism(1);如何使这条路径动态化?我甚至尝试添加System.nanotime()路径以使其动态化。但它似乎仍然不起作用,所有内容都写入一个文件。

社区小助手

Flink窗口:聚合和输出到接收器

我们有一个数据流,其中每个元素都是这种类型: id: Stringtype: Typeamount: Integer我们希望聚合此流并输出amount每周一次的总和。 当前解决方案 示例flink管道将如下所示: stream.keyBy(type) .window(TumblingProcessingTimeWindows.of(Time.days(7))) .reduce(sumAmount()) .addSink(someOutput()) 用于输入 | id | type | amount || 1 | CAT | 10 || 2 | DOG | 20 || 3 | CAT | 5 || 4 | DOG | 15 || 5 | DOG | 50 |如果窗口在记录3和4我们的输出之间结束将是: | TYPE | sumAmount || CAT | 15 | (id 1 and id 3 added together)| DOG | 20 | (only id 2 as been 'summed')标识4和5仍然是弗林克管道内,下周将被输出。 因此,下周我们的总产量将是: | TYPE | sumAmount || CAT | 15 | (of last week)| DOG | 20 | (of last week)| DOG | 65 | (id 4 and id 5 added together)新要求: 我们现在还想知道每条记录在哪一周处理了每条记录。换句话说,我们的新输出应该是: | TYPE | sumAmount | weekNumber || CAT | 15 | 1 || DOG | 20 | 1 || DOG | 65 | 2 |但我们还想要一个像这样的额外输出: | id | weekNumber || 1 | 1 || 2 | 1 || 3 | 1 || 4 | 2 || 5 | 2 |怎么办呢? flink有没有办法实现这个目标?我会想象我们会有一个汇总函数的汇总函数,但也会输出每个记录和当前周数,但我没有找到在文档中执行此操作的方法。 (注意:我们每周处理大约1亿条记录,所以理想情况下我们只希望在一周内将聚合保持在flink状态,而不是所有单个记录) 编辑: 我去了Anton描述的解决方案: DataStream elements = stream.keyBy(type) .process(myKeyedProcessFunction()); elements.addSink(outputElements());elements.getSideOutput(outputTag) .addSink(outputAggregates()) 而KeyedProcessFunction看起来像: class MyKeyedProcessFunction extends KeyedProcessFunction private ValueState<ZonedDateTime> state; private ValueState<Integer> sum; public void processElement(Element e, Context c, Collector<Element> out) { if (state.value() == null) { state.update(ZonedDateTime.now()); sum.update(0); c.timerService().registerProcessingTimeTimer(nowPlus7Days); } element.addAggregationId(state.value()); sum.update(sum.value() + element.getAmount()); } public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) { state.update(null); c.output(outputTag, sum.value()); } }

flink小助手

Flink中复杂的优化(多输入)集成测试

我需要为flink流式优化编写单元测试。它基本上是一个CoFlatMapFunction,它有2个输入。 我尝试从这个页面获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 输入的顺序对我的优化很重要,所以当我测试时,我不能StreamExecutionEnvironment#fromCollection用于每个输入,因为我不会控制在每个输入中注入数据点的顺序。 我试图创建一个单独的输入使用StreamExecutionEnvironment#fromCollection并CoFlatMapFunction根据其类型将每个元素分配给我的实际输入,但是在此操作中元素的顺序会丢失。 还有其他方法可以编写此测试吗?