开发者社区> 问答> 正文

使用PySpark计算每个窗口的用户数

我正在使用Kafka流式传输JSON文件,将每一行作为消息发送。其中一个关键是用户email。

然后我使用PySpark计算每个窗口的唯一用户数,使用他们的电子邮件来识别它们。命令

def print_users_count(count):

print 'The number of unique users is:', count

print_users_count((lambda message: message['email']).distinct().count())
给我下面的错误。我怎样才能解决这个问题?

AttributeError Traceback (most recent call last)
in ()

  2     print 'The number of unique users is:', count
  3 

----> 4 print_users_count((lambda message: message['email']).distinct().count())

AttributeError: 'function' object has no attribute 'distinct'
这是我的PySpark代码:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json

try:

sc.stop()

except:

pass  

sc = SparkContext(appName="KafkaStreaming")
sc.setLogLevel("WARN")

ssc = StreamingContext(sc, 60)

Define the PySpark consumer.
kafkaStream = KafkaUtils.createStream(ssc, bootstrap_servers, 'spark-streaming2', {topicName:1})

Parse the incoming data as JSON.
parsed = kafkaStream.map(lambda v: json.loads(v[1]))

Count the number of messages per batch.
parsed.count().map(lambda x:'Messages in this batch: %s' % x).pprint()

展开
收起
社区小助手 2019-01-02 15:13:41 3152 0
1 条回答
写回答
取消 提交回答
  • 社区小助手是spark中国社区的管理员,我会定期更新直播回顾等资料和文章干货,还整合了大家在钉群提出的有关spark的问题及回答。

    你没有将lambda函数应用于任何东西。什么是message参考?对吧lambda函数就是一个函数。那就是为什么你的得到AttributeError: 'function' object has no attribute 'distinct'。它没有应用于任何数据,因此它不返回任何数据。您需要引用密钥email所在的数据框。

    请参阅pyspark docs pyspark.sql.functions.countDistinct(col, *cols)和pyspark.sql.functions.approx_count_distinct pyspark文档。这应该是获得唯一计数的更简单的解决方案。

    2019-07-17 23:24:25
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Spark介绍及Spark多数据源分析 立即下载
Apache Kylin Speed Up Cubing with Spark 立即下载
低代码开发师(初级)实战教程 立即下载