Windows Azure Service Bus (3) 队列(Queue) 使用VS2013开发Service Bus Queue

简介:

 《Windows Azure Platform 系列文章目录

    

  在之前的Azure Service Bus中,我们已经介绍了Service Bus 队列(Queue)的基本概念。

  在本章中,笔者将介绍如何使用Visual Studio 2013,开发一个Service Bus Queue的Demo Project。

  

  场景:

  1.在前端有一个ASP.NET页面,客户从输入框输入数据,并且通过按钮进行提交

  2.输入框输入的数据,会被后端的WorkerRoleWithSBQueue接受处理,并在后端显示

  

  主要步骤有:

  1.使用Azure Management Portal,创建Service Bus

  2.修改Web Role逻辑

  3.创建Cloud Project,添加Web Role和Service Bus Worker Role

  4.配置WebRole和ServiceBusWorkerRole

  3.DEMO

 

 

  一.笔者已经在之前的文章中,介绍如何创建Service Bus了。不熟悉的读者,可以参考Windows Azure Service Bus (2) 队列(Queue)入门

  

  二.创建Cloud Project

  1.首先我们以管理员身份,运行VS2013

  2.创建一个新的Cloud Service,命名为LeiServiceBusQueue,如下图:

  

  3.增加ASP.NET Web RoleWorker Role with Service Bus Queue。如下图

  

   

 

 

  二.设置WorkerRoleWithSBQueue1

  1.在WorkerRoleWithSBQueue1项目中,修改WorkerRole.cs代码,如下:

复制代码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "ProcessingQueue";// QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;
        ManualResetEvent CompletedEvent = new ManualResetEvent(false);

        public override void Run()
        {
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            Client.OnMessage((receivedMessage) =>
                {
                    try
                    {
                        // Process the message
                        Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());

                        string received = receivedMessage.GetBody<string>();
                        Trace.WriteLine("You input is" + received);
                    }
                    catch
                    {
                        // Handle any message processing specific exceptions here
                    }
                });

            CompletedEvent.WaitOne();
        }
    

     public override bool OnStart() { // Set the maximum number of concurrent connections ServicePointManager.DefaultConnectionLimit = 12; // Create the queue if it does not exist already string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString); if (!namespaceManager.QueueExists(QueueName)) { namespaceManager.CreateQueue(QueueName); }// Initialize the connection to Service Bus Queue Client = QueueClient.CreateFromConnectionString(connectionString, QueueName); return base.OnStart(); } public override void OnStop() { // Close the connection to Service Bus Queue Client.Close(); CompletedEvent.Set(); base.OnStop(); } } }
复制代码

 

  在上面的WokerRole.cs类代码中,分为两个逻辑:

  (1)OnStart函数,表示WorkerRole在启动的时候,初始化了ProcessingQueue对象

  (2)Run函数,表示在WorkerRole在执行的时候,将ProcessingQueue的内容输出

 

 

 

  三.设置Web Role

  1.在Web Role的根目录下,创建一个新的ASPX页面,重命名为ServiceBusQueue.aspx

  在ServiceBusQueue.aspx页面中,

  -  增加一个TextBox控件,重命名为txbInput

  -  增加一个Button控件,重命名为BtnSend

  2.在WebRole1项目中,增加NuGet,添加ServiceBus引用。如下图:

  

  3.在ServiceBusQueue.aspx.cs中,增加如下代码

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using Microsoft.WindowsAzure;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace WebRole1
{
    public partial class ServiceBusQueue : System.Web.UI.Page
    {
        const string QueueName = "ProcessingQueue";

     protected void Page_Load(object sender, EventArgs e) { } protected void BtnSend_Click(object sender, EventArgs e) { string strinput = txbInput.Text.ToString(); Send(strinput); txbInput.Text = string.Empty; } private void Send(string text) { string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString); // Initialize the connection to Service Bus Queue MessageSender sender = factory.CreateMessageSender(QueueName); BrokeredMessage message1 = new BrokeredMessage(text); sender.Send(message1); } } }
复制代码

  上面的代码中,会将用户从界面的输入值,插入到ProcessingQueue对象中

 

 

 

  四.配置WebRole和ServiceBusWorkerRole

  我们修改WebRole的配置文件,如下图:

  

  在WebRole1的Settings中,点击Add Setting增加Microsoft.ServiceBus.ConnectionString对象,如下图:

  

  上图中,需要将Value值修改为我们在Azure Management Portal中创建的ServiceBus连接字符串,如下图:

  

 

  修改完毕后,我们需要修改ServiceBusWorkerRole的配置文件

  

  将Microsoft.ServiceBus.ConnectionString的Value属性,修改为我们在Azure Management Portal中创建的ServiceBus连接字符串。如下图:

  

  

 

 

  五.然后我们用Visual Studio的Azure模拟器运行。

  我们依次在ServiceBusQueue.aspx中,输入不同的值。如下图:

                    

  我们可以在Azure Compute模拟器中,查看到2次输入的结果。如下图:

  

 

  可以观察到,首先输入的值,最先被处理。这也说明了Service Bus Queue 先进先出的特性。

 

  最后我们还可以在Management Portal中,查看到由代码生成的processingqueue对象

  

  

  

 

 

  =====================================分隔符=======================================================

 

  看到最后,如果有读者觉得,从Azure Compute Emulator查看到aspx页面的输入,这也太不智能啦。

  放心,其实我们可以将aspx页面的输入,返回到另外的ReturnQueue对象中去,并在前端aspx进行显示:

  (1)ProcessingQueue:插入(Send)数据

  (2)ProcessingQueue:返回(Receive)数据

 

  别问我为什么不能保存到代码中申明的ProcessingQueue中,笔者试验过,不能对ProcessingQueue同时执行Send和Receive操作

 

  ServiceBusQueue.aspx.cs代码如下

复制代码
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using Microsoft.WindowsAzure;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;

namespace WebRole1
{
    public partial class ServiceBusQueue : System.Web.UI.Page
    {
        const string QueueName = "ProcessingQueue";
        const string ReturnQueueName = "ReturnQueue";
        protected void Page_Load(object sender, EventArgs e)
        {

        }

        protected void BtnSend_Click(object sender, EventArgs e)
        {
            string strinput = txbInput.Text.ToString();
            Send(strinput);
            txbInput.Text = string.Empty;
        }

        private void Send(string text)
        {
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);

            // Initialize the connection to Service Bus Queue
            MessageSender sender = factory.CreateMessageSender(QueueName);

            BrokeredMessage message1 = new BrokeredMessage(text);

            sender.Send(message1);

        }

        protected void btnReceiveMessage_Click(object sender, EventArgs e)
        {
           string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
           QueueClient Client = QueueClient.CreateFromConnectionString(connectionString, ReturnQueueName);

           var message = Client.Receive(TimeSpan.FromSeconds(3));
           if (message != null)
           {
               var ret = message.GetBody<string>();
               message.Complete();


           }
        }
    }
}
复制代码

 

  WorkerRole.cs代码如下:

复制代码
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.ServiceRuntime;

namespace WorkerRoleWithSBQueue1
{
    public class WorkerRole : RoleEntryPoint
    {
        // The name of your queue
        const string QueueName = "ProcessingQueue";
        const string ReturnQueueName = "ReturnQueue";

        // QueueClient is thread-safe. Recommended that you cache 
        // rather than recreating it on every request
        QueueClient Client;
        ManualResetEvent CompletedEvent = new ManualResetEvent(false);

        public override void Run()
        {
            Trace.WriteLine("Starting processing of messages");

            // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
            Client.OnMessage((receivedMessage) =>
                {
                    try
                    {
                        // Process the message
                        Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());

                        string received = receivedMessage.GetBody<string>();
                        Trace.WriteLine("You input is" + received);

                        SendToReturnQueue(ReturnQueueName, received);
                    }
                    catch
                    {
                        // Handle any message processing specific exceptions here
                    }
                });

            CompletedEvent.WaitOne();
        }

        private void SendToReturnQueue(string queueName, string inputString)
        { 
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            MessagingFactory factory = MessagingFactory.CreateFromConnectionString(connectionString);

            // Initialize the connection to Service Bus Queue
            MessageSender sender = factory.CreateMessageSender(queueName);

            BrokeredMessage message1 = new BrokeredMessage(inputString);

            sender.Send(message1);

        }

        public override bool OnStart()
        {
            // Set the maximum number of concurrent connections 
            ServicePointManager.DefaultConnectionLimit = 12;

            // Create the queue if it does not exist already
            string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
            var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
            if (!namespaceManager.QueueExists(QueueName))
            {
                namespaceManager.CreateQueue(QueueName);
            }
            if (!namespaceManager.QueueExists(ReturnQueueName))
            {
                namespaceManager.CreateQueue(ReturnQueueName);
            }

            // Initialize the connection to Service Bus Queue
            Client = QueueClient.CreateFromConnectionString(connectionString, QueueName);
            return base.OnStart();
        }

        public override void OnStop()
        {
            // Close the connection to Service Bus Queue
            Client.Close();
            CompletedEvent.Set();
            base.OnStop();
        }
    }
}
复制代码

 

 

 

本博-三石Blog(下文简称本博),在本博客文章结尾处右下脚未注明转载、来源、出处的作品(内容)均为本博原创,本站对于原创作品内容对其保留版权,请勿随意转载,如若真有需要的朋友可以发Mail联系我;转载本博原创作品(内容)也必须遵循“署名-非商业用途-保持一致”的创作共用协议,请务必以文字链接的形式标明或保留文章原始出处和博客作者(Lei Zhang)的信息,关于本博摄影作品请务必注意保留(www.cnblog.com/threestone)等相关水印版权信息,否则视为侵犯原创版权行为;本博谢绝商业网站转载。版权所有,禁止一切有违中华人民共和国著作权保护法及相关法律和本博(法律)声明的非法及恶意抄袭。


本文转自Lei Zhang博客园博客,原文链接:http://www.cnblogs.com/threestone/p/4308998.html,如需转载请自行联系原作者

目录
相关文章
|
2月前
|
Java Unix 应用服务中间件
使用java service wrapper把windows flume做成服务
使用java service wrapper把windows flume做成服务
|
3月前
|
C++ Windows
Windows下boost安装及其在VS2013中配置
Windows下boost安装及其在VS2013中配置
|
1月前
|
数据可视化 数据库 C++
Qt 5.14.2揭秘高效开发:如何用VS2022快速部署Qt 5.14.2,打造无与伦比的Windows应用
Qt 5.14.2揭秘高效开发:如何用VS2022快速部署Qt 5.14.2,打造无与伦比的Windows应用
|
1月前
|
消息中间件 监控 NoSQL
在Windows下设置分布式队列Celery的心跳轮询
在Windows下设置分布式队列Celery的心跳轮询
28 0
|
12天前
|
监控 安全 API
7.3 Windows驱动开发:内核监视LoadImage映像回调
在笔者上一篇文章`《内核注册并监控对象回调》`介绍了如何运用`ObRegisterCallbacks`注册`进程与线程`回调,并通过该回调实现了`拦截`指定进行运行的效果,本章`LyShark`将带大家继续探索一个新的回调注册函数,`PsSetLoadImageNotifyRoutine`常用于注册`LoadImage`映像监视,当有模块被系统加载时则可以第一时间获取到加载模块信息,需要注意的是该回调函数内无法进行拦截,如需要拦截则需写入返回指令这部分内容将在下一章进行讲解,本章将主要实现对模块的监视功能。
29 0
7.3 Windows驱动开发:内核监视LoadImage映像回调
|
4月前
|
监控 安全 API
7.2 Windows驱动开发:内核注册并监控对象回调
在笔者上一篇文章`《内核枚举进程与线程ObCall回调》`简单介绍了如何枚举系统中已经存在的`进程与线程`回调,本章`LyShark`将通过对象回调实现对进程线程的`句柄`监控,在内核中提供了`ObRegisterCallbacks`回调,使用这个内核`回调`函数,可注册一个`对象`回调,不过目前该函数`只能`监控进程与线程句柄操作,通过监控进程或线程句柄,可实现保护指定进程线程不被终止的目的。
29 0
7.2 Windows驱动开发:内核注册并监控对象回调
|
4月前
|
监控 安全 API
7.6 Windows驱动开发:内核监控FileObject文件回调
本篇文章与上一篇文章`《内核注册并监控对象回调》`所使用的方式是一样的都是使用`ObRegisterCallbacks`注册回调事件,只不过上一篇博文中`LyShark`将回调结构体`OB_OPERATION_REGISTRATION`中的`ObjectType`填充为了`PsProcessType`和`PsThreadType`格式从而实现监控进程与线程,本章我们需要将该结构填充为`IoFileObjectType`以此来实现对文件的监控,文件过滤驱动不仅仅可以用来监控文件的打开,还可以用它实现对文件的保护,一旦驱动加载则文件是不可被删除和改动的。
29 1
7.6 Windows驱动开发:内核监控FileObject文件回调
|
4月前
|
监控 安全 API
6.9 Windows驱动开发:内核枚举进线程ObCall回调
在笔者上一篇文章`《内核枚举Registry注册表回调》`中我们通过特征码定位实现了对注册表回调的枚举,本篇文章`LyShark`将教大家如何枚举系统中的`ProcessObCall`进程回调以及`ThreadObCall`线程回调,之所以放在一起来讲解是因为这两中回调在枚举是都需要使用通用结构体`_OB_CALLBACK`以及`_OBJECT_TYPE`所以放在一起来讲解最好不过。
41 1
6.9 Windows驱动开发:内核枚举进线程ObCall回调
|
4月前
|
监控 安全 API
6.8 Windows驱动开发:内核枚举Registry注册表回调
在笔者上一篇文章`《内核枚举LoadImage映像回调》`中`LyShark`教大家实现了枚举系统回调中的`LoadImage`通知消息,本章将实现对`Registry`注册表通知消息的枚举,与`LoadImage`消息不同`Registry`消息不需要解密只要找到`CallbackListHead`消息回调链表头并解析为`_CM_NOTIFY_ENTRY`结构即可实现枚举。
48 1
6.8 Windows驱动开发:内核枚举Registry注册表回调
|
4月前
|
存储 API 开发者
6.7 Windows驱动开发:内核枚举LoadImage映像回调
在笔者之前的文章`《内核特征码搜索函数封装》`中我们封装实现了特征码定位功能,本章将继续使用该功能,本次我们需要枚举内核`LoadImage`映像回调,在Win64环境下我们可以设置一个`LoadImage`映像加载通告回调,当有新驱动或者DLL被加载时,回调函数就会被调用从而执行我们自己的回调例程,映像回调也存储在数组里,枚举时从数组中读取值之后,需要进行位运算解密得到地址。
32 1
6.7 Windows驱动开发:内核枚举LoadImage映像回调