spring spring cloud stream

初探Spring Cloud Stream

吳沛芸 Peiiun Wu 2019/10/04 12:30:28
136

一、概述

Spring Cloud Stream (以下簡稱Stream) 是一個用來建構訊息驅動微服務的框架,讓微服務之間透過訊息代理(Message Broker)彼此溝通。

Stream裡面運用到一些現存的框架,比如Sping Integration和 Spring Message,這些框架雖然運作的很好,但是有與訊息代理之間密切耦合的缺點。Stream做的事情是對於這些框架做進一步的封裝,由於Stream是建立一個在Spring Boot基礎上運作的框架,讓Spring根據配置(configuration)和依賴(dependency)文件在運行時自動注入,這意味著可以透過修改配置文件無痛切換到不同的訊息代理,讓微服務和訊息代理之間的耦合降低。

 

二、訊息驅動 ─ Message Driven

訊息驅動意思是讓應用服務透過訊息流的方式異步溝通,使服務之間不直接調用來降低彼此的依賴與牽制。

在微服務的架構中,一個請求到回應可能會經過多個服務,假設有一個請求如下圖,A服務直接內部調用B服務:

假設A服務本身的業務處理時間很短、B服務需要比較久的處理時間,我們為B服務啟動多個實體(Instance)來因應;A服務為了等待B服務而無法接收下個請求,導致A服務必須配合B服務啟動多個實體,但對A服務而言其實是沒有必要的。

 

若換做是訊息驅動的方式,A服務不直接呼叫B服務,而是發送一個訊息事件(Event)到訊息代理(Message Broker)後就可以接待下個請求;而B服務去訂閱A服務傳到訊息代理的事件,如此一來A、B服務只要約定好事件的內容就可以獨立作業,也可以依照各自的需求去擴增應用實體。

 

三、訊息代理 ─ Message Broker

下面訊息代理會以RabbitMQ來舉例。

RabbitMQ使用Advanced Message Queuing Protocol (AMQP) 作為訊息傳輸的協定,AMQP有一些特性,比如說他可以應用在跨組織、跨平台(應用由不同語言撰寫)系統的聯絡,而這些系統允許非同步處理(Async)或是強調可靠傳達(Reliable)的訊息交換方式,這些特性與微服務搭配起來相當合適。

以下是RabbitMQ 的結構圖:

RabbitMQ結構吐圖

訊息發送方將訊息傳遞到RabbitMQ中的交換器,交換器依照一些規則將訊息綁定(Binding)在一到多個訊息佇列,而訂閱方透過訂閱訊息佇列來獲得發送方的資訊,如此一來就完成系統之間的訊息溝通。

了解完RabbitMQ的運作架構後,接著來看Stream的運作模型和如何與訊息代理合作。

 

四、Stream運作模型

Stream為服務注入輸入(input)和輸出(output)通道,通道連接到的綁定器(Binder),由綁定器去跟訊息代理─也就是圖中的Middleware溝通。

綁定器抽象─Binder Abstraction

綁定器隔離了服務和訊息代理,面向服務它整合構成訊息生產和消費的API,而面向消息代理它實作了與代理溝通的設定和細節,因此服務不用考慮如何與不同的訊息代理對接,也可以在不更動到程式的情況下,透過更換綁定器的依賴和配置來轉換不同的訊息代理。

 

持續發布與訂閱─Persistent Publish-Subscribe Support

這個發佈訂閱模型演示,當一個請求需要發送給多個應用服務做處理,可以讓這個請求成為一個訊息流給多個應用服務去訂閱,再各自拓展下游的應用,這樣既不會浪費現有訊息流,也降低了發布者與消費者的複雜度和耦合度。

 

消費群體─Consumer group

Stream也提供了分組的功能。當一個服務啟動了多個實體,這些實體可能以輪巡等負載均衡的方式在運作,這個時候所有的實體是訂閱同個主題的。透過配置指定分組(group)方式,可以確保當中只會有一個實體接到訊息。

 

五、實作

1. 安裝RabbitMQ

RabbitMQ從這裡安裝,為求快速演示這邊使用CloudAMQP提供的雲端安裝:

登錄後點擊"Create New Instance"

設定名稱後點擊"Select Region"

設定雲端中心區域後點擊"Review"

點擊"Create Intance"創建完成

接著你可以在Detail中看到你的使用者名稱、密碼、URL等資訊,等等會需要用URL來連接RabbitMQ;另外點擊左邊的"RabbitMQ Manager"可以開啟監控面板。

 

2. Project 設置

接著讓我們創建訊息的發送方和接收方:

發送方:

pom.xml 中引入必需的依賴 ─ spring-boot-starter-web和spring-cloud-starter-stream-rabbit

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>  
		  <groupId>org.springframework.cloud</groupId>
		  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>

或是使用Spring Initializr 快速創建(依賴選擇Spring Web再手動加入spring-cloud-starter-stream-rabbit)

 

HelloBinding.java 

package com.producer;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface HelloBinding {

	@Output("greetingChannel")
	MessageChannel greeting();
	
}

我們訂一個類型為MessageChannel的抽象方法greeting,之後會需要用到MessageChannel的send方法將訊息傳出去,加上@Output(),裡面的名稱是自訂的,用來識別通道(Channel)。

 

ProducerApplication.java

package com.producer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;


@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class ProducerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class, args);
	}

}

在程式的進入點加上@EnableBinding(),參數可以填入一到多個介面(interface),這裡就填上剛剛創建的HelloBinding.class,它的作用是連接到訊息代理(Message Broker)。

 

ProducerController.java

package com.producer;

import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    private MessageChannel greet;

    public ProducerController(HelloBinding binding) {
        greet = binding.greeting();
    }

    @GetMapping("/greet/{name}")
    public void publish(@PathVariable String name) {
        String greeting = "Hello, " + name + "!";
        Message<String> msg = MessageBuilder.withPayload(greeting)
            .build();
        this.greet.send(msg);
    }
    
}

創建一支API當作訊息的發送者,當訪問這支API時,會先用MessageBuilder創建一則String訊息,使用MessageChannel的send方法送給訊息代理,這邊API加入一個參數當作識別每次傳進來的內容。

 

application.properties

spring.rabbitmq.addresses=amqp://itf...
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8081

最後設定properties檔案,告訴spring要如何連到RabbitMQ,addresses的參數是方才在CloudAMQP創建好的實體,URL可以在Dta中找到;我們可以設定綁定的通道greetingChannel在RabbitMQ顯示的exchange名稱為greetings,如果未設定會比照通道名稱,寫到這邊發送方就完成了。

 


接收方:

pom.xml:與發送方引入的依賴是一樣的。

 

HelloBinding.java

package com.consumer;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface HelloBinding {

	String GREETING_CHANNEL = "greetingChannel";
	
	@Input(GREETING_CHANNEL)
    SubscribableChannel greeting();	
}

與發送方兩個不一樣的地方在於,訂定一個類型為SubscribableChannel的抽象方法greeting,加上@Input(),填入我們要監聽的greetingChannel通道。

 

ConsumerApplication.java

package com.consumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
public class ConsuemerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ConsuemerApplication.class, args);
	}   

這邊程式的進入點不做修改。

 

HelloListener.java

package com.consumer;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;

@EnableBinding(HelloBinding.class)
public class HelloListener {

    @StreamListener(HelloBinding.GREETING_CHANNEL)
    public void processGreetingChannel(String msg) {
        System.out.println(msg);
    }
    
}

創建一個類別HelloListener來處理接收的訊息,在類別加上@EnableBinding()來啟用綁定,接著在接收的方法加上@StreamListener("通道名稱"),這邊讓它列印在console上。

 

application.properties

spring.rabbitmq.addresses=amqp://itf...
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=9081

這邊設定與發送方的大致相同,更動port號即可。

 

六、測試

啟動producer和consumer後,呼叫greet API:http://localhost:8081/greet/Peiiun。

consumer的console正確顯示傳來的訊息:Hello, Peiiun。

 

現在我們consumer多啟動一個實體,port設定為9082,呼叫greet API:http://localhost:8081/greet/Amy:

Consumer和Consumer2都接收到訊息:Hello, Amy。

 

當我們希望只有其中一個實體接收到訊息,這個時候必須在consumer配置消費群體(Consumer Group):

spring.cloud.stream.bindings.greetingChannel.group = greetings-group

重啟後,接著呼叫greet API 四次測試,param分別以One、Two、Three、Four分辨,可以看到訊息已分組,以輪巡的方式給其中一個Consumer接收。

 

七、結論

以上是Spring Cloud Stream主要概念,並演示了單純使用Spring Cloud Stream建構訊息驅動的微服務。應用服務使用訊息驅動的方式溝通,可以讓非核心業務使用異步的方式作業,提升核心業務的客戶體驗;另外Spring Cloud Stream讓更改訊息代理變得相當輕鬆,只要更改代理的依賴和配置檔案即可,實現應用服務和訊息代理之間的無感知。

 

參考資料:

Spring Cloud Stream Core

Spring Cloud Stream with RabbitMQ: Message-Driven Microservices

AMQP

 

吳沛芸 Peiiun Wu