轻松搞定RabbitMQ4:发布订阅与路由选择

简介: 轻松搞定RabbitMQ(四)——发布/订阅 翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html 在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。


轻松搞定RabbitMQ(四)——发布/订阅

翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。这次我们做一些完全不同的事儿——将消息发送给多个消费者。这种模式叫做“发布/订阅”。

为了说明这个模式,我们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。

如果在日志系统中每一个接受者(订阅者)都会的得到消息的拷贝。那样的话,我们可以运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另一个接受者(订阅者)程序,打印日志到屏幕上。

说白了,发表日志消息将被广播给所有的接收者。

Exchanges(转发器)

前面的博文汇总,我们都是基于一个队列发送和接受消息。现在介绍一下完整的消息传递模式。

RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。

相反,生产者只能发送消息给转发器,转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。我们关注最后一个。现在让我们创建一个该类型的转发器,定义如下:

channel.exchangeDeclare("logs", "fanout");

fanout转发器非常简单,从名字就可以看出,它是广播接受到的消息给所有的队列。而这正好符合日志系统的需求。

Nameless exchange(匿名转发)

之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码是:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息通过队列的routingKey路由到指定的队列中去,如果存在的话。

现在我们可以指定转发器的名字了:

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

你可能还记得之前我们用队列时,会指定一个名字。队列有名字对我们来说是非常重要的——我们需要为消费者指定同一个队列。

但这并不是我们的日志系统所关心的。我们要监听所有日志消息,而不仅仅是一类日志。我们只对对当前流动的消息感兴趣。解决这些问题,我盟需要完成两件事。

首先,每当我盟连接到RabbitMQ时,需要一个新的空队列。为此我们需要创建一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给我们。

其次,一旦消费者断开连接,队列将自动删除。

我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

String queueName = channel.queueDeclare().getQueue();

queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings(绑定)

我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列的我们叫它Binding。

channel.queueBind(queueName, "logs", "");

这样,日志转发器将附加到日志队列上去。

完整的例子:

发送端代码(生产者)EmitLog.java

public class EmitLog {
	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		for(int i=0;i<3;i++){
			// 发送的消息
			String message = "Hello World!";
			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1 ReceiveLogs2Console.java

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2 ReceiveLogs2File.java

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    print2File(message);
//			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}

可以看到我们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。

生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。

注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

看看最终的结果吧:

轻松搞定RabbitMQ(四)——发布/订阅

翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。这次我们做一些完全不同的事儿——将消息发送给多个消费者。这种模式叫做“发布/订阅”。

为了说明这个模式,我们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。

如果在日志系统中每一个接受者(订阅者)都会的得到消息的拷贝。那样的话,我们可以运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另一个接受者(订阅者)程序,打印日志到屏幕上。

说白了,发表日志消息将被广播给所有的接收者。

Exchanges(转发器)

前面的博文汇总,我们都是基于一个队列发送和接受消息。现在介绍一下完整的消息传递模式。

RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。

相反,生产者只能发送消息给转发器,转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。我们关注最后一个。现在让我们创建一个该类型的转发器,定义如下:

channel.exchangeDeclare("logs", "fanout");

fanout转发器非常简单,从名字就可以看出,它是广播接受到的消息给所有的队列。而这正好符合日志系统的需求。

Nameless exchange(匿名转发)

之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码是:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息通过队列的routingKey路由到指定的队列中去,如果存在的话。

现在我们可以指定转发器的名字了:

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

你可能还记得之前我们用队列时,会指定一个名字。队列有名字对我们来说是非常重要的——我们需要为消费者指定同一个队列。

但这并不是我们的日志系统所关心的。我们要监听所有日志消息,而不仅仅是一类日志。我们只对对当前流动的消息感兴趣。解决这些问题,我盟需要完成两件事。

首先,每当我盟连接到RabbitMQ时,需要一个新的空队列。为此我们需要创建一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给我们。

其次,一旦消费者断开连接,队列将自动删除。

我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

String queueName = channel.queueDeclare().getQueue();

queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings(绑定)

我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列的我们叫它Binding。

channel.queueBind(queueName, "logs", "");

这样,日志转发器将附加到日志队列上去。

完整的例子:

发送端代码(生产者)EmitLog.java

public class EmitLog {
	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		for(int i=0;i<3;i++){
			// 发送的消息
			String message = "Hello World!";
			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1 ReceiveLogs2Console.java

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2 ReceiveLogs2File.java

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    print2File(message);
//			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}

可以看到我们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。

生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。

注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

看看最终的结果吧:


轻松搞定RabbitMQ(五)——路由选择

翻译地址:http://www.rabbitmq.com/tutorials/tutorial-four-java.html

前篇博文中,我们建立了一个简单的日志系统。可以广播消息给多个消费者。本篇博文,我们将添加新的特性——我们可以只订阅部分消息。比如:我们可以接收Error级别的消息写入文件。同时仍然可以在控制台打印所有日志。

Bindings(绑定)

在上一篇博客中我们已经使用过绑定。类似下面的代码:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定表示转换器与队列之间的关系。可以简单的人为:队列对该转发器上的消息感兴趣。

绑定可以设定额外的routingKey参数。为了与避免basicPublish方法(发布消息的方法)的参数混淆,我们准备把它称作绑定键(binding key)。下面展示如何使用绑定键(binding key)来创建一个绑定:

channel.queueBind(queueName, EXCHANGE_NAME, "black");

绑定键关键取决于转换器的类型。对于fanout类型,忽略此参数。

Direct exchange(直接转发)

前面讲到我们的日志系统广播消息给所有的消费者。我们想对其扩展,根据消息的严重性来过滤消息。例如:我们希望将致命错误的日志消息记录到文件,而不是把磁盘空间浪费在warn和info类型的日志上。我们使用的fanout转发器,不能给我们太多的灵活性。它仅仅只是盲目的广播而已。我们使用direct转发器进行代替,其背后的算法很简单——消息会被推送至绑定键(binding key)和消息发布附带的选择键(routing key)完全匹配的队列。

在上图中,我们可以看到direct类型的转发器与2个队列进行了绑定。第一个队列使用的绑定键是orange,第二个队列绑定键为black和green。这样当消息发布到转发器是,附带orange绑定键的消息将被路由到队列Q1中去。附带black和green绑定键的消息被路由到Q2中去。其他消息全部丢弃。

Multiple bindings(多重绑定)

使用一个绑定键绑定多个队列是完全合法的。如上图,绑定键black绑定了2个队列——Q1和Q2。

Emitting logs(发送日志)

我们将这种模式用于日志系统,发送消息给direct类型的转发器。我们将 提供日志严重性做为绑定键。那样,接收程序可以选择性的接收严重性的消息。首先关注发送日志的代码:

像往常一样首先创建一个转换器:

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

然后为发送消息做准备:

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化代码,我们假定日志的严重性是‘info’,‘warning’,‘error’中之一。

Subscribing(订阅)

接收消息跟前面博文中的一样。我们仅需要修改一个地方:为每一个我们感兴趣的严重性的消息,创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){    
  channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

完整的例子

发送端代码(EmitLogDirect.java)

public class EmitLogDirect {
	private final static String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "direct");

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for(int i=0;i<3;i++){
			String severity = severities[i%3];//每一次发送一条不同严重性的日志
			
			// 发送的消息
			String message = "Hello World"+Strings.repeat(".", i+1);
			//参数1:exchange name
			//参数2:routing key
			channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
			System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
		}
		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1(ReceiveLogs2Console.java)

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();

		//所有日志严重性级别
		String[] severities={"error","info","warning"};
		for (String severity : severities) {
			//关注所有级别的日志(多重绑定)
			channel.queueBind(queueName, EXCHANGE_NAME, severity);
		}
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '"  + envelope.getRoutingKey() + "':'" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2(ReceiveLogs2File.java)

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "direct_logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "direct");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    
	    String severity="error";//只关注error级别的日志,然后记录到文件中去。
	    channel.queueBind(queueName, EXCHANGE_NAME, severity);
	    
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    //记录日志到文件:
			    print2File( "["+ envelope.getRoutingKey() + "] "+message);
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}

最终结果:

罗哩罗嗦的说这么多,其实就是说了这么一件事:我们可以使用Direct exchange+routingKey来过滤自己感兴趣的消息。一个队列可以绑定多个routingKey。这就是我们今天的主题——路由选择。


轻松搞定RabbitMQ(四)——发布/订阅

翻译地址:http://www.rabbitmq.com/tutorials/tutorial-three-java.html

在前面的教程中,我们创建了一个工作队列,都是假设一个任务只交给一个消费者。这次我们做一些完全不同的事儿——将消息发送给多个消费者。这种模式叫做“发布/订阅”。

为了说明这个模式,我们将构建一个简单日志系统。它包含2段程序:第一个将发出日志消息,第二个接受并打印消息。

如果在日志系统中每一个接受者(订阅者)都会的得到消息的拷贝。那样的话,我们可以运行一个接受者(订阅者)程序,直接把日志记录到硬盘。同时运行另一个接受者(订阅者)程序,打印日志到屏幕上。

说白了,发表日志消息将被广播给所有的接收者。

Exchanges(转发器)

前面的博文汇总,我们都是基于一个队列发送和接受消息。现在介绍一下完整的消息传递模式。

RabbitMQ消息模式的核心理念是:生产者没有直接发送任何消费到队列。实际上,生产者都不知道这个消费是发送给哪个队列的。

相反,生产者只能发送消息给转发器,转发器是非常简单的。一方面它接受生产者的消息,另一方面向队列推送消息。转发器必须清楚的知道如何处理接收到的消息。附加一个特定的队列吗?附加多个队列?或者是否丢弃?这些规则通过转发器的类型进行定义。

类型有:Direct、Topic、Headers和Fanout。我们关注最后一个。现在让我们创建一个该类型的转发器,定义如下:

channel.exchangeDeclare("logs", "fanout");

fanout转发器非常简单,从名字就可以看出,它是广播接受到的消息给所有的队列。而这正好符合日志系统的需求。

Nameless exchange(匿名转发)

之前我们对转换器一无所知,却可以将消息发送到队列,那是可能是我们用了默认的转发器,转发器名为空字符串""。之前我们发布消息的代码是:

channel.basicPublish("", "hello", null, message.getBytes());

第一个参数就是转发器的名字,空字符串表示模式或者匿名的转发器。消息通过队列的routingKey路由到指定的队列中去,如果存在的话。

现在我们可以指定转发器的名字了:

channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

你可能还记得之前我们用队列时,会指定一个名字。队列有名字对我们来说是非常重要的——我们需要为消费者指定同一个队列。

但这并不是我们的日志系统所关心的。我们要监听所有日志消息,而不仅仅是一类日志。我们只对对当前流动的消息感兴趣。解决这些问题,我盟需要完成两件事。

首先,每当我盟连接到RabbitMQ时,需要一个新的空队列。为此我们需要创建一个随机名字的空队列,或者更好的,让服务器选好年则一个随机名字的空队列给我们。

其次,一旦消费者断开连接,队列将自动删除。

我们提供一个无参的queueDeclare()方法,创建一个非持久化、独立的、自动删除的队列,且名字是随机生成的。

String queueName = channel.queueDeclare().getQueue();

queueName是一个随机队列名。看起来会像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

Bindings(绑定)

我们已经创建了一个广播的转发器和一个随机队列。现在需要告诉转发器转发消息到队列。这个关联转发器和队列的我们叫它Binding。

channel.queueBind(queueName, "logs", "");

这样,日志转发器将附加到日志队列上去。

完整的例子:

发送端代码(生产者)EmitLog.java

public class EmitLog {
	private final static String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException {
		/**
		 * 创建连接连接到MabbitMQ
		 */
		ConnectionFactory factory = new ConnectionFactory();
		// 设置MabbitMQ所在主机ip或者主机名
		factory.setHost("127.0.0.1");
		// 创建一个连接
		Connection connection = factory.newConnection();
		// 创建一个频道
		Channel channel = connection.createChannel();
		// 指定转发——广播
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

		for(int i=0;i<3;i++){
			// 发送的消息
			String message = "Hello World!";
			channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message + "'");
		}

		// 关闭频道和连接
		channel.close();
		connection.close();
	}
}

消费者1 ReceiveLogs2Console.java

public class ReceiveLogs2Console {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
}

消费者2 ReceiveLogs2File.java

public class ReceiveLogs2File {
	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] argv) throws IOException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("127.0.0.1");
		// 打开连接和创建频道,与发送端一样
		Connection connection = factory.newConnection();
		final Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 声明一个随机队列
		String queueName = channel.queueDeclare().getQueue();
	    channel.queueBind(queueName, EXCHANGE_NAME, "");
		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		
		// 创建队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");
			    print2File(message);
//			    System.out.println(" [x] Received '" + message + "'");
			  }
			};
			channel.basicConsume(queueName, true, consumer);
	}
	
	private static void print2File(String msg) {
		try {
			String dir = ReceiveLogs2File.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
			File file = new File(dir, logFileName + ".log");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write(((new SimpleDateFormat("HH:mm:ss").format(new Date())+" - "+msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}  
}

可以看到我们1个生产者用于发送log消息,2个消费者,一个用于显示,一个用于记录文件。

生产者声明了一个广播模式的转换器,订阅这个转换器的消费者都可以收到每一条消息。可以看到在生产者中,没有声明队列。这也验证了之前说的。生产者其实只关心exchange,至于exchange会把消息转发给哪些队列,并不是生产者关心的。

2个消费者,一个打印日志,一个写入文件,除了这2个地方不一样,其他地方一模一样。也是声明一下广播模式的转换器,而队列则是随机生成的,消费者实例启动后,会创建一个随机实例,这个在管理页面可以看到(如图)。而实例关闭后,随机队列也会自动删除。最后将队列与转发器绑定。

注:运行的时候要先运行2个消费者实例,然后在运行生产者实例。否则获取不到实例。

看看最终的结果吧:


微信公众号【Java技术江湖】一位阿里 Java 工程师的技术小站。(关注公众号后回复”Java“即可领取 Java基础、进阶、项目和架构师等免费学习资料,更有数据库、分布式、微服务等热门技术学习视频,内容丰富,兼顾原理和实践,另外也将赠送作者原创的Java学习指南、Java程序员面试指南等干货资源)


wAAACH5BAEKAAAALAAAAAABAAEAAAICRAEAOw==


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
10天前
|
消息中间件 微服务
RabbitMQ入门指南(十):延迟消息-死信交换机
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了死信交换机、死信交换机实现延迟消息等内容。
19 0
|
5月前
|
消息中间件
消息中间件系列教程(16) -RabbitMQ-应答模式
消息中间件系列教程(16) -RabbitMQ-应答模式
36 0
|
3月前
|
消息中间件 存储
RabbitMQ之交换机
【1月更文挑战第9天】 RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。
132 1
|
11月前
|
消息中间件
五、RabbitMQ 之 SpringAMQP 实现 Fanout 交换机
五、RabbitMQ 之 SpringAMQP 实现 Fanout 交换机
|
9月前
|
消息中间件 存储
【RabbitMQ五】——RabbitMQ路由模式(Routing)
【RabbitMQ五】——RabbitMQ路由模式(Routing)
108 0
|
11月前
|
消息中间件 数据库
【消息中间件】RabbitMQ的工作模式
【消息中间件】RabbitMQ的工作模式
|
11月前
|
存储 消息中间件
四、RabbitMQ 交换机
四、RabbitMQ 交换机
|
11月前
|
消息中间件
十、RabbitMQ高级 - 死信交换机
十、RabbitMQ高级 - 死信交换机
|
11月前
|
消息中间件
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
七、RabbitMQ 之 SpringAMQP 实现 Topic 交换机
|
消息中间件 存储
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器
RabbitMQ学习(七):交换器