Azure Service Bus简单介绍和使用

我们知道现如今不同的应用或者服务之间decouple的趋势是越来越明显,而随之而来的问题就是这些应用或者服务之间如何进行可靠以及安全的数据或者状态传输,我们可以通过基于MSMQ或者RabbitMQ之类消息中间件来自我搭建和维护一套方案,也可以选择微软为我们提供的企业级的基于AzureService Bus来达到我们的目的。本文就来详细讨论一下Azure Service Bus

两种基本的场景

目前Azure service Bus支持两种基本的场景,一种是基于Queuemessage传输,一种是基于Topics的传输:

  1. Queue

发送端会把Message发送到queue,然后接收端从queue中接收message,在queue中的message是有序的,也就是所谓的FIFO,并且接收方只能是单一的,如下图所示:

  1. Topics

Queue不同的是,一般来说Queue是点对点的传输,而Topic这是一个publish/subscribe的情况,也就是说可以有多接收方(我们称之为Subscriber),每个接收方都可以接收到一份完整的message的拷贝,你甚至可以设置一些filter的条件来决定在什么情况下哪些message可以被其中特定的receiver接收。

基本场景的实现

在我们实现senderreceiver端的代码之前,我们需要首先在azure上创建用来接收的queue或者topics,这个时候你需要一个Azure的订阅,或者注册一个免费的用户

基于Queue的基本场景的实现

下面我们先来看一下基本场景中基于Queuemessage的发送和接收如何用.NET代码实现:

Azure portalqueue的创建

  1. Azure Portal中创建一个resource,选择service bus
  2. 然后我们创建一个namespace,所谓的namespace就是一个container,他用来存放所有的service bus相关的内容,我们的queue或者topics都是建立于它的基础上。
  3. 在创建好了namespace之后,我们可以点击到刚刚创建的namespace,选择“Shared access policies”,然后点击“RootManageShareAccessKey”来得到我们在代码中需要使用的connection string,这里我们把primary connection string拷贝下来,后面的代码中会用到。
  4. 在我们的namespace中创建一个queue

Send端的代码实现

Portal创建好queue之后,我们就可以连接这个queue,进行发送message了。首先新建一个queueClient:

queueClient = new QueueClient(ServiceBusConnectionString, QueueName);

这里的ServiceBusConnectionString就是我们刚刚在portal那边拷贝的primary connection stringQueueName是我们创建的queue的名字。

然后利用下面的SendMessagesAsync来发送messagequeue中:

 static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
   try
   {
        for (var i = 0; i < numberOfMessagesToSend; i++)
        {
            // 创建发送的message
            string messageBody = $"Message {i}";
            var message = new Message(Encoding.UTF8.GetBytes(messageBody));
            // 打印到console,我们用来debug
            Console.WriteLine($"Sending message: {messageBody}");
            // SendAsync来发送message
            await queueClient.SendAsync(message);
        }
    }
    catch (Exception exception)
    {
    //catch exception
    }
}

receiver端的代码实现

在我们发送messagequeue中后,我们需要些一个简单的receiver端的代码来接收message,同样首先要创建一个queueClient

queueClient = new QueueClient(ServiceBusConnectionString, QueueName);

通过RegisterMessageHandler来注册receiverhandler函数:

var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
    // 同时可以并行处理的message的数目,这里简单设为1
    MaxConcurrentCalls = 1,
    //用来表示是否自动complete,这里设为false,就是我们要在callback中显式地complete
    AutoComplete = false
};

// Register the function that will process messages
queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);

static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    // call complete这样就不会再接收了
    await queueClient.CompleteAsync(message.SystemProperties.LockToken);
}

运行的结果

send端代码运行后的结果如下:

此时我们去azure portalqueue的界面可以看到有10message在那边

此时再运行receiver端的代码,结果如下:

我们可以看到我们在receiver端按照顺序把所有的message都接收到了,此时再去看azure portal queue中,此时我们的active message count已经是0了。

至此,我们基于Queue的基本场景实现已经完成。

基于Topics的基本场景实现

基于Topics和基于Queue的实现基本很类似,就是在create queue那一步稍有不同,我们需要创建topic,毕竟这里使用的不是queue

  1. namespacecreate topic
  2. 在已经创建的topic上面创建subscriptions

Send端代码的实现

queue的场景类似,我们需要新建对应的topicClient,代码如下:

 topicClient = new TopicClient(ServiceBusConnectionString, TopicName);

相应的参数意义从名字上看应该是一目了然。这之后的sendmessage代码就和queue比较类似了:

       static async Task SendMessagesAsync(int numberOfMessagesToSend)
{
    try
    {
        for (var i = 0; i < numberOfMessagesToSend; i++)
        {
            // 创建要发送的message
            string messageBody = $"Message {i}";
            var message = new Message(Encoding.UTF8.GetBytes(messageBody));
            // 显式对应的message信息到console
            Console.WriteLine($"Sending message: {messageBody}");
            // 发送message到topic
            await topicClient.SendAsync(message);
        }
    }
    catch (Exception exception)
    {
        Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
    }
}

Receiver端代码的实现

receiver端的代码是基于subscription的,所以我们首先要新建的subscriptionsclient

subscriptionClient = new SubscriptionClient(ServiceBusConnectionString, TopicName, SubscriptionName);

我们这里再加一个default rule的确认代码:

await subscriptionClient.RemoveRuleAsync(RuleDescription.DefaultRuleName);

await subscriptionClient.AddRuleAsync(new RuleDescription(RuleDescription.DefaultRuleName, new TrueFilter()));

剩下的receiver代码和queue那边就是一样的了,只是把其中queueClient改成subscriptionClient而已。

 var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
    // 详情见queue那边的注释
    MaxConcurrentCalls = 1,
    AutoComplete = false
};

// 设置处理message的回调

subscriptionClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
static async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
    // Process the message
    Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
    await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}

运行结果

send端的结果:

在发送之后,我们可以看到Azuretopics的中已经有了对应的messagecurrentsize4.5KB

我们来运行其中一个subscription(我们一共创建了两个subscription来测试),看看结果如何:

这里我们可以很清楚地看到和Queue之间的一个明显的差别,就是receiver端接收到的message是没有顺序的保证的。此时我们去portal上再看看,会发现topic中的current size少了一半,已经只剩2.2KB了:

通过第二个subscription再接收一次,结果如下,同样message的顺序是无序的,甚至和第一个subscription的顺序也是不同的。

在全部receiver都完成接收任务之后,我们可以看到portal中的current size终于变成了零:

至此,基于topic的基本场景实现也就介绍完毕了。

一些高阶场景

除了我们上面介绍的两个基本场景外,Azure service bus作为一个企业级的解决方案,其还提供了很多高阶的应用场景,本文做一些简单介绍:

Message session

有时我们希望把一个queue/topic中的一组message统一起来由一个receiver来处理,而这组message和别的组的message是交错进行传输的,这时我们就可以把对应的messagesend的时候加上sessionId,然后在接收端通过MessageSession来进行接收,如下图所示:

Auto forwording自动转发

自动转发功能就是允许你把一个queue或者subscription和另外一个queue或者topic连接起来,这样service bus会自动把前面的message分发到后面的queue或者topic中去,当然这里有一个前提,就是这些queuesubscription以及topic必须在一个namespace中。

那么一般在什么情况下我们会使用这个功能呢,一种比较常见的场景就是把发送和接收decouple开,如下图所示:

我们有订单系统,存货系统以及客户关系系统,他们都会把他们的信息发送到对应的topic中去,AliceBob我们假设是两个sales,他对这些改变的信息都比较感兴趣,这是他们其实并不需要对每一个过程都去监听,只要处理好他们私有的queue就好,我们可以根据order topic/Inventory topic/CRM topic中的关键字过滤出他们需要的信息,并自动转发到他们的私有queue就可以达到我们的目的,而且让他们的私有queue和具体的business的各个部分decouple开。

Dead-letter queue

当有一个message没有任何的receiver来接收或者说messge没法处理的时候怎么办呢,我们会把他发送dead-letter queue中,这个queue不需要我们显式地创建,也没法删除。当这些message被发送deadletter queue中之后,我们也可以在有办法进行处理的时候对他们进行处理,message被处理到dead-letter queue有以下几种情况,当然我们也可以自定义些错误信息:

定时分发

很多时候,我们希望message在一个延后到一个特定的时间再传输,比如我们在特定时间起一个job干某事,这种情况我们可以在发送的时候,设置CheduledEnqueueTimeUtc属性。这个message是可以在schedule的时间之前cancel的,我们可以通过ScheduleMessageAsync返回的SequenceNumber来进行cancel

信息延迟

和定时分发比较类似,假如我们在接收端接收到了一个message,但是因为各种原因我们暂时没有办法来处理它,希望把这个message hold一段时间,这也是可以做到的,我们可以在接收端call BrokeredMessage.Defer来处理,这样这个message就会仍然保存在queue或者subscription中,等待后续的处理。

批传输

这个比较好理解,很多时候因为我们的message可能来的太快了,我们总是一个一个处理其实效率并不是很高,那么Azure service bus其实提供了一个批传输的功能,也就是在一定时间内到达的message,我们会打包一起进行传输,这样其实就提供了传输的效率,default的批间隔是20ms,当然你也可以自定义.

Topic的过滤

我们有时会有这样的需求,就是对从topic那边得到的message我只想处理其中的某些message,其它情况的message可以由其它的topic进行处理,这就需要用到filter的功能。

重复探测

有一些特殊情况,send端可能会发送重复的message,比如说发送端在发送一个messge的过程中突然发生了一些错误,在发送端它可能认为message发送失败了,而事实上这个message已经发送到receiver端,这个时候发送端可能会重复发送这个message,这就会导致在接收端收到了同样的message,此时我们就可以通过重复探测功能来处理这种情况了。这个功能也很简单,就是在创建queue或者topic的时候使能对应的功能即可。

灾难恢复

这个话题其实是一个很重要的内容,就是说假如我们云端的datacenter停电了或者挂掉了,比如说地震或者怎样,我们如何进行处理呢。我想细心的同学可能会发现我们在创建namespace的时候,除了一个primaryconnectionstring还有一个secondconnectionstring,这其实就是用来处理这个情况的。

如下图说是,其实我们在开始的时候,就创建了两个namespace,并在他们之间建立了关联。

一般来说,像停电,宕机之类的恢复,所有的message还是可以恢复的,但是一些自然灾害会导致一小段时间message的丢失也是在所难免的。

至此,我们就基本把所有的Azure service bus的内容和大家简单介绍了一下,水平有限,若有错误,欢迎大家指出。

 

转载请注明出处:http://www.softlifelogging.com/?p=124&preview=true

更多精彩内容,欢迎关注公众号:随手记生活

No Comments

Leave a Reply

Your email address will not be published.