Spring Cloud 之 Stream.

简介: Spring Cloud 之 Stream.一、简介Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。Spring Cloud Stream 为一些供应商的消息中间件产品(目前集成了 RabbitMQ 和 Kafka)提供了个性化的自动化配置实现,并且引入了发布/订阅、消费组以及消息分区这三个核心概念。

Spring Cloud 之 Stream.
一、简介
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。

Spring Cloud Stream 为一些供应商的消息中间件产品(目前集成了 RabbitMQ 和 Kafka)提供了个性化的自动化配置实现,并且引入了发布/订阅、消费组以及消息分区这三个核心概念。简单地说,Spring Cloud Stream 本质上就是整合了 Spring Boot 和 Spring Integration, 实现了一套轻量级的消息驱动的微服务框架。

通过使用 Spring Cloud Stream,可以忽略消息中间件的差异,有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。

回到顶部
二、快速入门

  1. pom.yml

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
  2. application.yml
    配置消息中间件的连接信息:

spring:
application:

name: cloud-stream

rabbitmq:

host: 127.0.0.1
port: 5672
username: guest
password: guest
  1. 消息监听/消费
    @EnableBinding({Source.class, Sink.class})

public class SinkReceiver {

private Logger log = LoggerFactory.getLogger(SinkReceiver.class);

@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public Object processInput(String message) {
    log.info("Input Stream Receiver:{}", message);
    return message;
}

@StreamListener(Source.OUTPUT)
public void processOutPut(String message) {
    log.info("Output Stream Receiver:{}", message);
}

}
@EnableBinding:实现对消息通道(Channel) 的绑定,其中 Sink 是 Spring Cloud Stream 默认的输入通道,Source 是 Spring Cloud Stream 中默认的输出通道。
@StreamListener:将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。如果不设置属性值,将默认使用方法名作为消息通道名。
@SendTo:很多时候在处理完输入消息之后, 需要反馈一个消息给对方, 这时候可以通过 @SendTo 注解来指定返回内容的输出通道。
4.消息生产
消息生产有两种方式,一种是利用注入消息通道来发送消息,如下:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = StreamApplication.class)
public class SinkOutputTest {

@Autowired
private Sink sink;
@Autowired
private Source source;

@Test
public void sink() {
    sink.input().send(MessageBuilder.withPayload("From SinkSender").build());
}

@Test
public void source() {
    source.output().send(MessageBuilder.withPayload("From SourceSender").build());
}

}
另外一种是使用 Spring Integration 的原生支持 — @InboundChannelAdapter

@EnableBinding(value = {Source.class})
@SpringBootApplication
public class StreamApplication {

public static void main(String[] args) {
    SpringApplication.run(StreamApplication.class, args);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "2000"))
public MessageSource<String> timerMessageSource() {
    return () -> new GenericMessage<>("2019/08/06");
}

}
回到顶部
三、绑定器
Spring Cloud Stream 构建的应用程序与消息中间件之间是通过绑定器 Binder 相关联的,绑定器对于应用程序而言起到了隔离作用, 它使得不同消息中间件的实现细节对应用程序来说是透明的。所以对于每一个 Spring Cloud Stream 的应用程序来说, 它不需要知晓消息中间件的通信细节,它只需知道 Binder 对应程序提供的抽象概念来使用消息中间件来实现业务逻辑即可,而这个抽象概念就是在快速入门中我们提到的消息通道:Channel。如下图所示,在应用程序和 Binder 之间定义了两条输入通道和三条输出通道来传递消息,而绑定器则是作为这些通道和消息中间件之间的桥梁进行通信。

通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的 Channel 通道,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或是更换其他消息中间件产品时,我们要做的就是更换它们对应的 Binder 绑定器而不需要修改任何 SpringBoot 的应用逻辑。

回到顶部
四、消费组
Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。(这里提到的 Topic 指的是 Stream 的抽象概念,可以是 RabbitMQ 中的 Exchange,也可以是 Kafka 中的 Topic)。

发布-订阅模式会带来一个问题。因为在微服务架构中,我们的每一个微服务应用为了实现高可用和负载均衡, 实际上都会部署多个实例。按照消息广播的性质,多个实例都会接收到消息,从而导致重复消费。为了解决这个问题, 在Spring Cloud Stream中提供了消费组的概念。

如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过 spring.cloud.stream.bindings..group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候,只会有一个成员真正收到消息并进行处理。

spring:
application:

name: cloud-stream

cloud:

stream:
  bindings:
    input:
      # 设置消费组,保证只有一个实例消费到消息
      # 如果不设置消费组,Stream 将会为每个实例生成一个消费组
      group: ${spring.application.name}

回到顶部
五、消息分区
通过引入消费组的概念,我们已经能够在多实例的清况下,保障每个消息只被组内的一个实例消费。但是消费组无法控制消息具体被哪个实例消费。也就是说,对于同一条消息,它多次到达之后可能是由不同的实例进行消费的。但是对于一些业务场景,需要对一些具有相同特征的消息设置每次都被同一个消费实例处理。

消息分区的引入就是为了解决这样的问题:当生产者将消息数据发送给多个消费者实例时,保证拥有共同特征的消息数据始终是由同一个消费者实例接收和处理。

消费者分区:

spring:
application:

name: cloud-stream

cloud:

stream:
  instance-count: 1
  instance-index: 0
  bindings:
    input:
      consumer:
        partitioned: true

spring.cloud.stream.bindings.input.consumer.partitioned = true 开启消费者分区功能。
spring.cloud.stream.instance-count = 1 当前消费者的总实例个数,即应用程序部署的实例数量。
spring.cloud.stream.instance-index = 0 当前实例的索引号,从 0 开始,最大为 -1 。用于消息生产的时候锁定该实例。(消息生产的时候 "hashCode(key) % partitionCount" 的计算值等于该设置的值,即转发到该实例上)
生产者分区:

spring:
application:

name: cloud-stream

cloud:

stream:
  bindings:
    output:
      producer:
        partitionCount: 1
        partitionKeyExtractorName: keyStrategy

spring.cloud.stream.bindings.output.producer.partitionCount = 1 ,消息生产需要广播的消费者数量。即消息分区的数量。
spring.cloud.stream.bindings.output.producer.partitionKeyExtractorName = keyStrategy ,Spring Bean — 用来消息的特征值计算。(分区选择计算规则为 "hashCode(key) % partitionCount" , 这里的 key 根据 partitionKeyExpression 或 partitionKeyExtractorName 的配置计算得到)
@Component
public class KeyStrategy implements PartitionKeyExtractorStrategy {


@Override
public Object extractKey(Message<?> message) {
    return message.getPayload();
}

}

演示源代码:https://github.com/JMCuixy/spring-cloud-demo
内容参考:《Spring Cloud 微服务实战》
原文地址https://www.cnblogs.com/jmcui/p/11279388.html

相关文章
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(二)Rest微服务工程搭建
46 0
|
30天前
|
负载均衡 Java API
Spring Cloud 面试题及答案整理,最新面试题
Spring Cloud 面试题及答案整理,最新面试题
132 1
|
30天前
|
Java Nacos Sentinel
Spring Cloud Alibaba 面试题及答案整理,最新面试题
Spring Cloud Alibaba 面试题及答案整理,最新面试题
138 0
|
1月前
|
SpringCloudAlibaba Java 持续交付
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
【构建一套Spring Cloud项目的大概步骤】&【Springcloud Alibaba微服务分布式架构学习资料】
131 0
|
1月前
|
SpringCloudAlibaba Java 网络架构
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(七)Spring Cloud Gateway服务网关
81 0
|
1月前
|
消息中间件 JSON Java
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
Spring Boot、Spring Cloud与Spring Cloud Alibaba版本对应关系
403 0
|
2天前
|
负载均衡 Java 开发者
细解微服务架构实践:如何使用Spring Cloud进行Java微服务治理
【4月更文挑战第17天】Spring Cloud是Java微服务治理的首选框架,整合了Eureka(服务发现)、Ribbon(客户端负载均衡)、Hystrix(熔断器)、Zuul(API网关)和Config Server(配置中心)。通过Eureka实现服务注册与发现,Ribbon提供负载均衡,Hystrix实现熔断保护,Zuul作为API网关,Config Server集中管理配置。理解并运用Spring Cloud进行微服务治理是现代Java开发者的关键技能。
|
3天前
|
Java API 对象存储
对象存储OSS产品常见问题之使用Spring Cloud Alibaba情况下文档添加水印如何解决
对象存储OSS是基于互联网的数据存储服务模式,让用户可以安全、可靠地存储大量非结构化数据,如图片、音频、视频、文档等任意类型文件,并通过简单的基于HTTP/HTTPS协议的RESTful API接口进行访问和管理。本帖梳理了用户在实际使用中可能遇到的各种常见问题,涵盖了基础操作、性能优化、安全设置、费用管理、数据备份与恢复、跨区域同步、API接口调用等多个方面。
22 2
|
17天前
|
负载均衡 网络协议 Java
构建高效可扩展的微服务架构:利用Spring Cloud实现服务发现与负载均衡
本文将探讨如何利用Spring Cloud技术实现微服务架构中的服务发现与负载均衡,通过注册中心来管理服务的注册与发现,并通过负载均衡策略实现请求的分发,从而构建高效可扩展的微服务系统。
|
1月前
|
SpringCloudAlibaba 负载均衡 Java
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
【Springcloud Alibaba微服务分布式架构 | Spring Cloud】之学习笔记(目录大纲)
61 1

热门文章

最新文章