stream

#stream#

已有1人关注此标签

内容分类

社区小助手

Spark 【问答合集】

如何使用spark将kafka主题中的writeStream数据写入hdfs?https://yq.aliyun.com/ask/493211当Spark在S3上读取大数据集时,在“停机时间”期间发生了什么?https://yq.aliyun.com/ask/493212从Redshift读入Spark Dataframe(Spark-Redshift模块)https://yq.aliyun.com/ask/493215在初始化spark上下文后,在运行时更改pyspark的hadoop配置中的aws凭据https://yq.aliyun.com/ask/493217Window.rowsBetween - 仅考虑满足特定条件的行(例如,不为null)https://yq.aliyun.com/ask/493220spark的RDD内容直接用saveAsTextFile保存到hdfs时会出现中文乱码现象,但在控制台用foreach打印该RDD数据显示是正常的,该怎么解决呢?https://yq.aliyun.com/ask/494418请问一下如何能查看spark struct streaming内存使用情况呢?https://yq.aliyun.com/ask/494417使用spark 2.3 structed streaming 时 checkpoint 频繁在HDFS写小文件,块数到达百万级别 ,这个怎么优化下?https://yq.aliyun.com/ask/494415请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?https://yq.aliyun.com/ask/493702为什么我使用 dropDuplicates()函数报错Caused by: java.lang.NoSuchMethodError: org.codehaus.commons.compiler.Location.(Ljava/lang/String;II)V ?https://yq.aliyun.com/ask/493700请教一下,我hive中数据大小为16g,通过importtsv生成了hfile 文件,导入到hbase中了,数据变成130多g,还有什么更好的办法吗?https://yq.aliyun.com/ask/493698jdbc 连接spark thrift server 如何获取日志?https://yq.aliyun.com/ask/493582Spark如何从一行中仅提取Json数据?https://yq.aliyun.com/ask/493581pyspark - 在json流数据中找到max和min usign createDataFramehttps://yq.aliyun.com/ask/493234如何计算和获取Spark Dataframe中唯一ID的值总和?https://yq.aliyun.com/ask/493231如何将csv目录加载到hdfs作为parquet?https://yq.aliyun.com/ask/493224无法使用Spark在Datastax上初始化图形https://yq.aliyun.com/ask/493222使用PySpark计算每个窗口的用户数https://yq.aliyun.com/ask/493221sql语句不支持delete操作,如果我想执行delete操作该怎么办?https://yq.aliyun.com/ask/494420spark streaming 和 kafka ,打成jar包后((相关第三方依赖也在里面)),放到集群上总是报StringDecoder 找不到classhttps://yq.aliyun.com/ask/494421json字符串中有重名但大小写不同的key,使用play.api.libs.json.Json.parse解析json没有报错,但是spark-sql使用org.openx.data.jsonserde.JsonSerDe时,会自动将key转为小写,然后putOnce函数报错Duplicate keyhttps://yq.aliyun.com/ask/494423spark DataFrame写入HDFS怎么压缩?https://yq.aliyun.com/ask/495552使用Spark On Hive时,动态的将数据插入到Hive中,但是在Hive的数据表下会有很多文件,这个可以怎么设置一下呢?https://yq.aliyun.com/ask/495927 技术交流群 Apache Spark中国技术交流群 (钉钉扫码加入)

李博 bluemind

stream如何实现分组后,多个字段分别求和?

比如user对象,我用性别分组,怎么即算出年龄总和,又算出身高总和 本问题来自云栖社区【阿里Java技术进阶2群】。https://yq.aliyun.com/articles/690084 点击链接欢迎加入社区大社群。

云攻略小攻

千万QPS的缓存!Redis 5.0最新的Stream数据结构,你还在等什么?

讲到Redis大家都很熟悉,Redis是意大利西西里岛帅哥Salvatore Sanfilippo的开源作品~,2009年诞生,走到2019年已经十周年了~Redis最大的特点就是快,访问延时在1ms内,同时单实例性能就能达到10w+ QPS,中国互联网典型的场景就是用户多、高并发,所以各大互联网公司都对Redis有深度的使用。作者最新推出的Redis 5.0增加了Stream的数据结构, 它是一个新的强大的支持多播的可持久化的消息队列。本次阿里云将推出全新版本的Redis 5.0,在提供社区功能的基础上提供了更多的企业级的特性~ (1)审计日志:提供操作日志、敏感日志、慢日志的审计能力~对于重要的数据变更能做到可以追溯(2)大key分析能力: Redis使用过程中经常因为业务使用不当引起大key的问题,为此阿里云Redis在服务端提供了大Key的分析能力,当前分析能力覆盖了Redis 4.0、5.0的版本~,可以为客户分析后端数据的内存使用情况。同时在本次发布会我们还将对阿里云Redis的性能增强版本和混合云解决方案进行介绍~对于性能增强版本我们通过多线程突破了开源Redis单线程的瓶颈,整体的QPS能力能达到社区的300%,对于混合云的解决方案客户可以通过redis-shake搭建同步通道,通过redis-full-check搭建数据比对通道,客户可以在业务上灵活配置访问流量,保证企业客户的上云流程~云数据库Redis 5.0重磅发布会,直播入口 支持全新Stream 数据类型,给您不一样的缓存体验,快速查看! 你优秀,你来说:1、您在什么样的业务场景下会使用Redis?一般是单独使用还是组合其他数据库共同使用?2、您对于Redis Stream的应用场景有什么希望了解的,对于消息队列的选型有什么历史经验?3、您希望阿里云Redis提供哪些功能帮助您解决日常使用问题?

李博 bluemind

Stream函数式操作流元素是否比for性能高?

Stream函数式操作流元素是否比for性能高?

游客irm3nny572smk

post请求返回302是为什么

使用一句话识别RESTful API以POST方式请求返回的是 HTTP/1.1 302 FoundServer: Tengine Date: Thu, 28 Mar 2019 02:52:56 GMTContent-Type: text/htmlContent-Length: 258Connection: closeLocation: http://err.taobao.com/error1.htmlVia: tongyijieru-aliyun-com1119311638.em14[web,302] 请求报名:POST /stream/v1/asr?appkey=SY*VZvAc&format=pcm&sample_rate=8000&enable_punctuation_prediction=true&enable_inverse_text_normalization=true HTTP/1.1X-NLS-Token: 0112c68**7002b6cContent-Type: application/octet-streamcache-control: no-cache Accept: /Host: nls-gateway.cn-shanghai.aliyuncs.comaccept-encoding: gzip, deflatecontent-length: 43408Connection: keep-alive

游客irm3nny572smk

post请求返回302是什么问题

302 Found The requested resource resides temporarily under a different URI. Powered by Tengine

社区小助手

Spark Streaming Kafka Stream批处理执行

Spark Streaming Kafka Stream批处理执行

krystal.xu

Flink 如何支持动态sql ?

flnk 新手一枚。。我理解flink stream = 动态table, 但是sql里的 where语句条件能否使用stream/source的值。 example: DataStream<String> input = env.socketTextStream("localhost", 9000, "\n"); DataStream clickStream = input.flatMap(new ClickHbaseMapFunc()); tEnv.registerDataStream("clickTable", clickStream, "id, studentid, url, itemtype, quantity"); Table result = tEnv.sqlQuery("SELECT * FROM clickTable WHERE studentid = '"+ ????+"'" ); 当中的 where语句条件 如何 能 从 input stream中 获取 ?

社区小助手

请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?

请教大家一个问题,spark stream连kafka,在web页面的stream标签,显示好多batch处于queued状态,这些batch是已经把数据从kafka读取进rdd,等待处理,还是还没有从kafka读取数进rdd?

stevenchennet

flink如何计算设备离网

场景描述设备工作期间每隔固定周期(比如30s)就往kafka发送采集到的数据,当设备超过15分钟没有上报数据,则认为设备离网了。如何每隔10分钟计算出设备在该10分钟时间段内的离网时长? 按我的理解(可能有错,请多指正),stream不设置window的时候会使用全量窗口,当新数据来的时候,只是将全量窗口的该设备的最新上报时间更新,每隔10分钟获取globalwindow数据,然后filter最新上报时间到现在时间的差值大于15分钟即可,但是我没有找到flink里如何获取globalwindow的数据。 btw:如果是spark,是可以做的,filter>15min,然后output的model设置为complete即可获取globalwindow的数据。请问flink下该方案该如何做?

乌野

RichAllWindowFunction的open方法在什么时候被触发

如题 还有, 自定义的RichAllWindowFunction应该怎样使用才能触发open方法. var filteredstream = inputStream .flatMap(buildHbaseMsg(_)) .filter(b => b.actType match { case 0|1|2|3 => true case _ => false }) .countWindowAll(2L).apply(new CustomWindowFunction ())' 这样吗? 尝试这样调用, open并没有执行'stream.apply((window, input, out: Collector[Unit]) => new CustomWindowFunction ().apply(window, input, out))'

晓生寒

[@小川游鱼][¥20]kafka的消费者stream iterator block,读不出消息?

rt,能够向kafka produce数据,kafka里也看得到,但是consumer却读不出数据,offset是0,程序在stream的iterator被阻塞了。demo代码如下: import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties; import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.serializer.StringDecoder;import kafka.utils.VerifiableProperties; /** Created by Administrator on 2015/10/10.*/ public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); props.put("auto.offset.reset", "smallest"); //zookeeper 配置 props.put("zookeeper.connect", "Master:2181"); //group 代表一个消费组 props.put("group.id", "consumer3"); // 连接zk的session超时时间 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200");//zk follower落后于zk leader的最长时间 props.put("auto.commit.interval.ms", "1000");//往zookeeper上写offset的频率 props.put("auto.offset.reset", "smallest");//如果offset出了返回,则 smallest: 自动设置reset到最小的offset. largest : 自动设置offset到最大的offset. 其它值不允许,会抛出异常 //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("auto.commit.interval.ms", "1000"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put("consumertest", new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get("consumertest").get(0); ConsumerIterator<String, String> it = stream.iterator(); System.out.println("consumer starting..."); while (it.hasNext()) System.out.println(it.next().message()); System.out.println("consumer over"); } public static void main(String[] args) { new KafkaConsumer().consume(); } } 谢谢~

flink小助手

Apache Flink:使用filter()或split()来分割流?

我有一个来自Kafka的DataStream,它对MyModel中的一个字段有两个可能的值。MyModel是一个pojo,其中包含来自Kafka的消息解析的特定于域的字段。 DataStream stream = env.addSource(myKafkaConsumer);我想分别在每个键a1,a2上应用窗口和运算符。将它们分开的好方法是什么?我有2个选项过滤器并选择,但不知道哪个更快。 过滤方式 stream .filter(<MyModel.a == a1>) .keyBy() .window() .apply() .addSink() stream .filter(<MyModel.a == a2>) .keyBy() .window() .apply() .addSink() 拆分并选择方法 SplitStream split = stream.split(…) split .select(<MyModel.a == a1>) … .addSink() split .select<MyModel.a == a2>() … .addSink() 如果split和select更好,如果我想根据MyModel中字段的值进行拆分,如何实现它们?

社区小助手

在Spark Stream中保存PairRdd时出错[重复]

我试图在spark流中保存我的Pair Rdd但在最后一步保存时出错。 这是我的示例代码 def main(args: Array[String]) { val inputPath = args(0) val output = args(1) val noOfHashPartitioner = args(2).toInt println("IN Streaming ") val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]") val sc = new SparkContext(conf) val hadoopConf = sc.hadoopConfiguration; //hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") val ssc = new org.apache.spark.streaming.StreamingContext(sc, Seconds(60)) val input = ssc.textFileStream(inputPath) val pairedRDD = input.map(row => { val split = row.split("\\|") val fileName = split(0) val fileContent = split(1) (fileName, fileContent) }) import org.apache.hadoop.io.NullWritable import org.apache.spark.HashPartitioner import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { override def generateActualKey(key: Any, value: Any): Any = NullWritable.get() override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String] } //print(pairedRDD) pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec]) ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate }我在保存时已经到了最后一步。一定在这里遗漏了什么,得到的错误就像 value partitionBy不是org.apache.spark.streaming.dstream.DStream [(String,String)]的数据

社区小助手

在追加模式下激活水印和窗口

下面结构化的流媒体代码水印和窗口数据,24小时间隔,15分钟幻灯片。代码在附加模式下仅生成空的批处理0。在更新模式下,结果会正确显示。需要附加模式,因为S3接收器仅在附加模式下工作。 String windowDuration = "24 hours";String slideDuration = "15 minutes";Dataset sliding24h = rowData .withWatermark(eventTimeCol, slideDuration) .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol)).count(); sliding24h .writeStream() .format("console") .option("truncate", false) .option("numRows", 1000) .outputMode(OutputMode.Append()) //.outputMode(OutputMode.Complete()) .start() .awaitTermination(); 以下是完整的测试代码: public static void main(String [] args) throws StreamingQueryException { SparkSession spark = SparkSession.builder().master("local[*]").getOrCreate(); ArrayList<String> rl = new ArrayList<>(); for (int i = 0; i < 200; ++i) { long t = 1512164314L + i * 5 * 60; rl.add(t + ",qwer"); } String nameCol = "name"; String eventTimeCol = "eventTime"; String eventTimestampCol = "eventTimestamp"; MemoryStream<String> input = new MemoryStream<>(42, spark.sqlContext(), Encoders.STRING()); input.addData(JavaConversions.asScalaBuffer(rl).toSeq()); Dataset<Row> stream = input.toDF().selectExpr( "cast(split(value,'[,]')[0] as long) as " + eventTimestampCol, "cast(split(value,'[,]')[1] as String) as " + nameCol); System.out.println("isStreaming: " + stream.isStreaming()); Column eventTime = functions.to_timestamp(col(eventTimestampCol)); Dataset<Row> rowData = stream.withColumn(eventTimeCol, eventTime); String windowDuration = "24 hours"; String slideDuration = "15 minutes"; Dataset<Row> sliding24h = rowData .withWatermark(eventTimeCol, slideDuration) .groupBy(functions.window(col(eventTimeCol), windowDuration, slideDuration), col(nameCol)).count(); sliding24h .writeStream() .format("console") .option("truncate", false) .option("numRows", 1000) .outputMode(OutputMode.Append()) //.outputMode(OutputMode.Complete()) .start() .awaitTermination(); }

flink小助手

使用AvroParquetWriter将flink接收到parquet文件不会将数据写入文件

我正在尝试使用AvroParquetWriter将parquet文件写为接收器。文件已创建但长度为0(未写入数据)。无法清楚问题在哪里? import io.eels.component.parquet.ParquetWriterConfigimport org.apache.avro.Schemaimport org.apache.avro.generic.{GenericData, GenericRecord}import org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.hadoop.fs.Pathimport org.apache.parquet.avro.AvroParquetWriterimport org.apache.parquet.hadoop.{ParquetFileWriter, ParquetWriter}import org.apache.parquet.hadoop.metadata.CompressionCodecName import scala.io.Sourceimport org.apache.flink.streaming.api.scala._ object Tester extends App {val env = StreamExecutionEnvironment.getExecutionEnvironment def now = System.currentTimeMillis() val path = new Path(s"/tmp/test-$now.parquet") val schemaString = Source.fromURL(getClass.getResource("/request_schema.avsc")).mkString val schema: Schema = new Schema.Parser().parse(schemaString) val compressionCodecName = CompressionCodecName.SNAPPY val config = ParquetWriterConfig() val genericReocrd: GenericRecord = new GenericData.Record(schema) genericReocrd.put("name", "test_b") genericReocrd.put("code", "NoError") genericReocrd.put("ts", 100L) val stream = env.fromElements(genericReocrd) val writer: ParquetWriter[GenericRecord] = AvroParquetWriter.builderGenericRecord .withSchema(schema) .withCompressionCodec(compressionCodecName) .withPageSize(config.pageSize) .withRowGroupSize(config.blockSize) .withDictionaryEncoding(config.enableDictionary) .withWriteMode(ParquetFileWriter.Mode.OVERWRITE) .withValidation(config.validating) .build() writer.write(genericReocrd) stream.addSink{r => writer.write(r) } env.execute()

社区小助手

如何使用writeFileAsText(path)写入文件时传递动态路径?

假设我有一个包含类型元素的Stream String。我想将流中的每个元素写入某个文件夹中的单独文件。我正在使用以下设置。 stream.writeAsText(path).setParallelism(1);如何使这条路径动态化?我甚至尝试添加System.nanotime()路径以使其动态化。但它似乎仍然不起作用,所有内容都写入一个文件。

社区小助手

Flink窗口:聚合和输出到接收器

我们有一个数据流,其中每个元素都是这种类型: id: Stringtype: Typeamount: Integer我们希望聚合此流并输出amount每周一次的总和。 当前解决方案 示例flink管道将如下所示: stream.keyBy(type) .window(TumblingProcessingTimeWindows.of(Time.days(7))) .reduce(sumAmount()) .addSink(someOutput()) 用于输入 | id | type | amount || 1 | CAT | 10 || 2 | DOG | 20 || 3 | CAT | 5 || 4 | DOG | 15 || 5 | DOG | 50 |如果窗口在记录3和4我们的输出之间结束将是: | TYPE | sumAmount || CAT | 15 | (id 1 and id 3 added together)| DOG | 20 | (only id 2 as been 'summed')标识4和5仍然是弗林克管道内,下周将被输出。 因此,下周我们的总产量将是: | TYPE | sumAmount || CAT | 15 | (of last week)| DOG | 20 | (of last week)| DOG | 65 | (id 4 and id 5 added together)新要求: 我们现在还想知道每条记录在哪一周处理了每条记录。换句话说,我们的新输出应该是: | TYPE | sumAmount | weekNumber || CAT | 15 | 1 || DOG | 20 | 1 || DOG | 65 | 2 |但我们还想要一个像这样的额外输出: | id | weekNumber || 1 | 1 || 2 | 1 || 3 | 1 || 4 | 2 || 5 | 2 |怎么办呢? flink有没有办法实现这个目标?我会想象我们会有一个汇总函数的汇总函数,但也会输出每个记录和当前周数,但我没有找到在文档中执行此操作的方法。 (注意:我们每周处理大约1亿条记录,所以理想情况下我们只希望在一周内将聚合保持在flink状态,而不是所有单个记录) 编辑: 我去了Anton描述的解决方案: DataStream elements = stream.keyBy(type) .process(myKeyedProcessFunction()); elements.addSink(outputElements());elements.getSideOutput(outputTag) .addSink(outputAggregates()) 而KeyedProcessFunction看起来像: class MyKeyedProcessFunction extends KeyedProcessFunction private ValueState<ZonedDateTime> state; private ValueState<Integer> sum; public void processElement(Element e, Context c, Collector<Element> out) { if (state.value() == null) { state.update(ZonedDateTime.now()); sum.update(0); c.timerService().registerProcessingTimeTimer(nowPlus7Days); } element.addAggregationId(state.value()); sum.update(sum.value() + element.getAmount()); } public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) { state.update(null); c.output(outputTag, sum.value()); } }

flink小助手

Flink中复杂的优化(多输入)集成测试

我需要为flink流式优化编写单元测试。它基本上是一个CoFlatMapFunction,它有2个输入。 我尝试从这个页面获得一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html 输入的顺序对我的优化很重要,所以当我测试时,我不能StreamExecutionEnvironment#fromCollection用于每个输入,因为我不会控制在每个输入中注入数据点的顺序。 我试图创建一个单独的输入使用StreamExecutionEnvironment#fromCollection并CoFlatMapFunction根据其类型将每个元素分配给我的实际输入,但是在此操作中元素的顺序会丢失。 还有其他方法可以编写此测试吗?

flink小助手

Flink和Beam SDK如何处理窗口 - 哪个更有效?

我将Apache Beam SDK与用于流处理的Flink SDK进行比较,以确定使用Beam作为附加框架的成本/优势。 我有一个非常简单的设置,其中从Kafka源读取数据流并由运行Flink的节点集群并行处理。 根据我对这些SDK如何工作的理解,按窗口处理数据窗口流的最简单方法是: 使用Apache Beam(在Flink上运行): 1.1。创建管道对象。 1.2。创建一个Kafka记录的PCollection。 1.3。应用窗口函数。 1.4。按窗口将管道转换为键。 1.5。按键(窗口)分组记录。 1.6。应用窗口记录所需的任何功能。 使用Flink SDK 2.1。从Kafka源创建数据流。 2.2。通过提供关键功能将其转换为键控流。 2.3。应用窗口函数。 2.4。应用窗口记录所需的任何功能。 虽然Flink解决方案在程序上看起来更简洁,但根据我的经验,它在大量数据方面的效率较低。我只能想象密钥提取功能引入了开销,因为Beam不需要这一步骤。 我的问题是:这些过程不相同吗?有什么可以解释Beam方式更有效率,因为它使用Flink作为运行者(并且所有其他条件都相同)? 这是使用Beam SDK的代码 PipelineOptions options = PipelineOptionsFactory.create(); //Run with Flink FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); flinkPipelineOptions.setRunner(FlinkRunner.class); flinkPipelineOptions.setStreaming(true); flinkPipelineOptions.setParallelism(-1); //Pick this up from the user interface at runtime // Create the Pipeline object with the options we defined above. Pipeline p = Pipeline.create(flinkPipelineOptions); // Create a PCollection of Kafka records PCollection<KafkaRecord<byte[], byte[]>> kafkaCollection = p.apply(KafkaIO.<Long, String>readBytes() .withBootstrapServers(KAFKA_IP + ":" + KAFKA_PORT) .withTopics(ImmutableList.of(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC)) .updateConsumerProperties(ImmutableMap.of("group.id", CONSUMER_GROUP))); //Apply Windowing Function PCollection<KafkaRecord<byte[], byte[]>> windowedKafkaCollection = kafkaCollection.apply(Window.into(SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1)))); //Transform the pipeline to key by window PCollection<KV<IntervalWindow, KafkaRecord<byte[], byte[]>>> keyedByWindow = windowedKafkaCollection.apply( ParDo.of( new DoFn<KafkaRecord<byte[], byte[]>, KV<IntervalWindow, KafkaRecord<byte[], byte[]>>>() { @ProcessElement public void processElement(ProcessContext context, IntervalWindow window) { context.output(KV.of(window, context.element())); } })); //Group records by key (window) PCollection<KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>>> groupedByWindow = keyedByWindow .apply(GroupByKey.<IntervalWindow, KafkaRecord<byte[], byte[]>>create()); //Process windowed data PCollection<KV<IIntervalWindowResult, IPueResult>> processed = groupedByWindow .apply("filterAndProcess", ParDo.of(new PueCalculatorFn())); // Run the pipeline. p.run().waitUntilFinish(); 这是使用Flink SDK的代码 // Create a Streaming Execution Environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.setParallelism(6); //Connect to KafkaProperties properties = new Properties(); properties.setProperty("bootstrap.servers", KAFKA_IP + ":" + KAFKA_PORT);properties.setProperty("group.id", CONSUMER_GROUP); DataStream stream = env .addSource(new FlinkKafkaConsumer010<>(Arrays.asList(REAL_ENERGY_TOPIC, IT_ENERGY_TOPIC), new JSONDeserializationSchema(), properties)); //Key by idstream.keyBy((KeySelector) jsonNode -> jsonNode.get("id").asInt()) //Set the windowing function. .timeWindow(Time.seconds(5L), Time.seconds(1L)) //Process Windowed Data .process(new PueCalculatorFn(), TypeInformation.of(ImmutablePair.class)); // execute programenv.execute("Using Flink SDK"); 我想我应该添加一些可能相关的指标。 网络接收字节Flink SDKtaskmanager.22644786446taskmanager.32645765232taskmanager.12827676598taskmanager.62422309148taskmanager.42428570491taskmanager.52431368644beamtaskmanager.24092154160taskmanager.34435132862taskmanager.14766399314taskmanager.64425190393taskmanager.44096576110taskmanager.54092849114CPU利用率(最大值)Flink SDKtaskmanager.293.00%taskmanager.392.00%taskmanager.191.00%taskmanager.690.00%taskmanager.490.00%taskmanager.592.00%beamtaskmanager.252.0%taskmanager.371.0%taskmanager.172.0%taskmanager.640.0%taskmanager.456.0%taskmanager.526.0%Beam似乎使用了更多的网络,而Flink使用了更多的CPU。这是否表明Beam以更有效的方式并行处理? 编辑No2我很确定PueCalculatorFn类是等价的,但我会在这里分享代码,看看两个进程之间是否有明显的差异。 public class PueCalculatorFn extends DoFn>>, KV> implements Serializable {private transient List realEnergyRecords;private transient List itEnergyRecords; @ProcessElementpublic void procesElement(DoFn>>, KV>.ProcessContext c, BoundedWindow w) { KV<IntervalWindow, Iterable<KafkaRecord<byte[], byte[]>>> element = c.element(); Instant windowStart = Instant.ofEpochMilli(element.getKey().start().getMillis()); Instant windowEnd = Instant.ofEpochMilli(element.getKey().end().getMillis()); Iterable<KafkaRecord<byte[], byte[]>> records = element.getValue(); //Calculate Pue IPueResult result = calculatePue(element.getKey(), records); //Create IntervalWindowResult object to return DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart), formatter.format(windowEnd), realEnergyRecords, itEnergyRecords); //Return Pue keyed by Window c.output(KV.of(intervalWindowResult, result)); } private PueResult calculatePue(IntervalWindow window, Iterable> records) { //Define accumulators to gather readings final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0); final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0); //Declare variable to store the result BigDecimal pue = BigDecimal.ZERO; //Initialise transient lists realEnergyRecords = new ArrayList<>(); itEnergyRecords = new ArrayList<>(); //Transform the results into a stream Stream<KafkaRecord<byte[], byte[]>> streamOfRecords = StreamSupport.stream(records.spliterator(), false); //Iterate through each reading and add to the increment count streamOfRecords .map(record -> { byte[] valueBytes = record.getKV().getValue(); assert valueBytes != null; String valueString = new String(valueBytes); assert !valueString.isEmpty(); return KV.of(record, valueString); }).map(kv -> { Gson gson = new GsonBuilder().registerTypeAdapter(KafkaConsumption.class, new KafkaConsumptionDeserialiser()).create(); KafkaConsumption consumption = gson.fromJson(kv.getValue(), KafkaConsumption.class); return KV.of(kv.getKey(), consumption); }).forEach(consumptionRecord -> { switch (consumptionRecord.getKey().getTopic()) { case REAL_ENERGY_TOPIC: totalRealIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed()); realEnergyRecords.add(consumptionRecord.getValue()); break; case IT_ENERGY_TOPIC: totalItIncrement.accumulate(consumptionRecord.getValue().getEnergyConsumed()); itEnergyRecords.add(consumptionRecord.getValue()); break; } } ); assert totalRealIncrement.doubleValue() > 0.0; assert totalItIncrement.doubleValue() > 0.0; //Beware of division by zero if (totalItIncrement.doubleValue() != 0.0) { //Calculate PUE pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP); } //Create a PueResult object to return IWindow intervalWindow = new Window(window.start().getMillis(), window.end().getMillis()); return new PueResult(intervalWindow, pue.stripTrailingZeros()); } @Overrideprotected void finalize() throws Throwable { super.finalize(); RecordSenderFactory.closeSender(); WindowSenderFactory.closeSender(); }} flinkpublic class PueCalculatorFn extends ProcessWindowFunction {private transient List realEnergyRecords;private transient List itEnergyRecords; @Overridepublic void process(Integer integer, Context context, Iterable iterable, Collector collector) throws Exception { Instant windowStart = Instant.ofEpochMilli(context.window().getStart()); Instant windowEnd = Instant.ofEpochMilli(context.window().getEnd()); BigDecimal pue = calculatePue(iterable); //Create IntervalWindowResult object to return DateTimeFormatter formatter = DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")); IIntervalWindowResult intervalWindowResult = new IntervalWindowResult(formatter.format(windowStart), formatter.format(windowEnd), realEnergyRecords .stream() .map(e -> (IKafkaConsumption) e) .collect(Collectors.toList()), itEnergyRecords .stream() .map(e -> (IKafkaConsumption) e) .collect(Collectors.toList())); //Create PueResult object to return IPueResult pueResult = new PueResult(new Window(windowStart.toEpochMilli(), windowEnd.toEpochMilli()), pue.stripTrailingZeros()); //Collect result collector.collect(new ImmutablePair<>(intervalWindowResult, pueResult)); } protected BigDecimal calculatePue(Iterable iterable) { //Define accumulators to gather readings final DoubleAccumulator totalRealIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0); final DoubleAccumulator totalItIncrement = new DoubleAccumulator((x, y) -> x + y, 0.0); //Declare variable to store the result BigDecimal pue = BigDecimal.ZERO; //Initialise transient lists realEnergyRecords = new ArrayList<>(); itEnergyRecords = new ArrayList<>(); //Iterate through each reading and add to the increment count StreamSupport.stream(iterable.spliterator(), false) .forEach(object -> { switch (object.get("topic").textValue()) { case REAL_ENERGY_TOPIC: totalRealIncrement.accumulate(object.get("energyConsumed").asDouble()); realEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object)); break; case IT_ENERGY_TOPIC: totalItIncrement.accumulate(object.get("energyConsumed").asDouble()); itEnergyRecords.add(KafkaConsumptionDeserialiser.deserialize(object)); break; } }); assert totalRealIncrement.doubleValue() > 0.0; assert totalItIncrement.doubleValue() > 0.0; //Beware of division by zero if (totalItIncrement.doubleValue() != 0.0) { //Calculate PUE pue = BigDecimal.valueOf(totalRealIncrement.getThenReset()).divide(BigDecimal.valueOf(totalItIncrement.getThenReset()), 9, BigDecimal.ROUND_HALF_UP); } return pue; } }这是我在Beam示例中使用的自定义反序列化器。 KafkaConsumptionDeserialiserpublic class KafkaConsumptionDeserialiser implements JsonDeserializer { public KafkaConsumption deserialize(JsonElement jsonElement, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException { if(jsonElement == null) { return null; } else { JsonObject jsonObject = jsonElement.getAsJsonObject(); JsonElement id = jsonObject.get("id"); JsonElement energyConsumed = jsonObject.get("energyConsumed"); Gson gson = (new GsonBuilder()).registerTypeAdapter(Duration.class, new DurationDeserialiser()).registerTypeAdapter(ZonedDateTime.class, new ZonedDateTimeDeserialiser()).create(); Duration duration = (Duration)gson.fromJson(jsonObject.get("duration"), Duration.class); JsonElement topic = jsonObject.get("topic"); Instant eventTime = (Instant)gson.fromJson(jsonObject.get("eventTime"), Instant.class); return new KafkaConsumption(Integer.valueOf(id != null?id.getAsInt():0), Double.valueOf(energyConsumed != null?energyConsumed.getAsDouble():0.0D), duration, topic != null?topic.getAsString():"", eventTime); } } }