RocketMQ是我们常用的消息中间件,在运行单元测试时,我们可能不需要真正发送消息(除非是为了测试发送消息),也不想因为连结不上RocketMQ的Broker,NameServer而影响单元测试运行。
   那我们该如何Mock RocketMQ消息生产者呢?
  请看例子:

 

//RocketMQ消息生产者 Mock 

public class RocetMQProducerMockingTest {
	// 把RocketMQ的生产者mock
	@BeforeClass
	public static void mockRocketMQ() {
		new RocketMQProducerMockUp();
	}

	@Test
	public void testSendRocketMQMessage() throws Exception {
		DefaultMQProducer producer = new DefaultMQProducer("test_producer");
		producer.setNamesrvAddr("192.168.0.2:9876;192.168.0.3:9876");
		producer.start();
		for (int i = 0; i < 20; i++) {
			Message msg = new Message("testtopic", "TagA", ("Hello " + i).getBytes());
			// 因为mq生产者已经mock,所以消息并不会真正的发送,即使nameServer连不上,也不影响单元测试的运行
			SendResult result = producer.send(msg);
			Assert.isTrue(result.getSendStatus() == SendStatus.SEND_OK);
			Assert.isTrue(result.getMsgId() != null);
		}
		producer.shutdown();
	}
}



最关键的类是RocketMQProducerMockUp,这个类改变了生产者默认实现。代码如下:

//MQ消息发送者 的MockUp(伪类) 
public class RocketMQProducerMockUp extends MockUp<DefaultMQProducer> {

	@Mock
	void init() throws MQClientException {
		// 构造函数也什么都不做
	}

	@Mock
	void start() throws MQClientException {
		// 启动,什么都不做 
	}

	@Mock
	void shutdown() {
		// 关闭,也什么都不做 
	}

	@Mock
	List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException {
		// 欺骗调用方,返回不存在的消息队列,因为消息并不会真正发送嘛
		List<MessageQueue> queues = new ArrayList<MessageQueue>();
		MessageQueue q = new MessageQueue();
		q.setBrokerName("testbrokername");
		q.setQueueId(1);
		q.setTopic("testtopic");
		queues.add(q);
		return queues;
	}

	// 下面是对各个send方法的mock,都返回消息成功结果
	@Mock
	SendResult send(final Message msg)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	SendResult send(final Message msg, final long timeout)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	void send(final Message msg, final SendCallback sendCallback)
			throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void send(final Message msg, final SendCallback sendCallback, final long timeout)
			throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void sendOneway(final Message msg) throws MQClientException, RemotingException, InterruptedException {

	}

	@Mock
	SendResult send(final Message msg, final MessageQueue mq)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	SendResult send(final Message msg, final MessageQueue mq, final long timeout)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
			throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback, long timeout)
			throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void sendOneway(final Message msg, final MessageQueue mq)
			throws MQClientException, RemotingException, InterruptedException {

	}

	@Mock
	SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg, final long timeout)
			throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
		return newSuccessSendResult();
	}

	@Mock
	void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback)
			throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void send(final Message msg, final MessageQueueSelector selector, final Object arg, final SendCallback sendCallback,
			final long timeout) throws MQClientException, RemotingException, InterruptedException {
		sendCallback.onSuccess(this.newSuccessSendResult());
	}

	@Mock
	void sendOneway(final Message msg, final MessageQueueSelector selector, final Object arg)
			throws MQClientException, RemotingException, InterruptedException {

	}

	@Mock
	TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter tranExecuter,
			final Object arg) throws MQClientException {
		return newTransactionSendResult();
	}

	private TransactionSendResult newTransactionSendResult() {
		TransactionSendResult success = new TransactionSendResult();
		success.setSendStatus(SendStatus.SEND_OK);
		success.setMsgId(UUID.randomUUID().toString());
		MessageQueue q = new MessageQueue();
		q.setBrokerName("testbrokername");
		q.setQueueId(1);
		q.setTopic("testtopic");
		success.setMessageQueue(q);
		success.setLocalTransactionState(LocalTransactionState.COMMIT_MESSAGE);
		return success;
	}

	private SendResult newSuccessSendResult() {
		SendResult success = new SendResult();
		success.setSendStatus(SendStatus.SEND_OK);
		success.setMsgId(UUID.randomUUID().toString());
		MessageQueue q = new MessageQueue();
		q.setBrokerName("testbrokername");
		q.setQueueId(1);
		q.setTopic("testtopic");
		success.setMessageQueue(q);
		return success;
	}
}