首页 > [zt] petshop4.0 详解之三

[zt] petshop4.0 详解之三

三、PetShop数据访问层之消息处理



在进行系统设计时,除了对安全、事务等问题给与足够的重视外,性能也是一个不可避免的问题所在,尤其是一个B/S结构的软件系统,必须充分地考虑访问量、数据流量、服务器负荷的问题。解决性能的瓶颈,除了对硬件系统进行升级外,软件设计的合理性尤为重要。

在前面我曾提到,分层式结构设计可能会在一定程度上影响数据访问的性能,然而与它给设计人员带来的好处相比,几乎可以忽略。要提供整个系统的性能,还可以从数据库的优化着手,例如连接池的使用、建立索引、优化查询策略等等,例如在PetShop中就利用了数据库的Cache,对于数据量较大的订单数据,则利用分库的方式为其单独建立了Order和Inventory数据库。而在软件设计上,比较有用的方式是利用多线程与异步处理方式。

在PetShop4.0中,使用了Microsoft Messaging Queue(MSMQ)技术来完成异步处理,利用消息队列临时存放要插入的数据,使得数据访问因为不需要访问数据库从而提供了访问性能,至于队列中的数据,则等待系统空闲的时候再进行处理,将其最终插入到数据库中。

PetShop4.0中的消息处理,主要分为如下几部分:消息接口IMessaging、消息工厂MessagingFactory、MSMQ实现MSMQMessaging以及数据后台处理应用程序OrderProcessor。

从模块化分上,PetShop自始自终地履行了“面向接口设计”的原则,将消息处理的接口与实现分开,并通过工厂模式封装消息实现对象的创建,以达到松散耦合的目的。

由于在PetShop中仅对订单的处理使用了异步处理方式,因此在消息接口IMessaging中,仅定义了一个IOrder接口,其类图如下:

 ps01.gif

在对消息接口的实现中,考虑到未来的扩展中会有其他的数据对象会使用MSMQ,因此定义了一个Queue的基类,实现消息Receive和Send的基本操作:

public virtual object Receive()

{

      try

{

          using (Message message = queue.Receive(timeout, transactionType))

             return message;

      }

      catch (MessageQueueException mqex)

{

          if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)

             throw new TimeoutException();

                throw;

      }

}

public virtual void Send(object msg)

{

      queue.Send(msg, transactionType);

}

其中queue对象是System.Messaging.MessageQueue类型,作为存放数据的队列。MSMQ队列是一个可持久的队列,因此不必担心用户不间断地下订单会导致订单数据的丢失。在PetShopQueue设置了timeout值,OrderProcessor会根据timeout值定期扫描队列中的订单数据。

MSMQMessaging模块中,Order对象实现了IMessaging模块中定义的接口IOrder,同时它还继承了基类PetShopQueue,其定义如下:

public class order:PetShopQueue, PetShop.IMessaging.IOrder

方法的实现代码如下:

    public new orderInfo Receive()

    {

        // This method involves in distributed transaction and need Automatic Transaction type

        base.transactionType = MessageQueueTransactionType.Automatic;

        return (OrderInfo)((Message)base.Receive()).Body;

    }

    public orderInfo Receive(int timeout)

    {

        base.timeout = TimeSpan.FromSeconds(Convert.ToDouble(timeout));

        return Receive();

    }

    public void Send(OrderInfo orderMessage)

    {

        // This method does not involve in distributed transaction and optimizes performance using Single type

        base.transactionType = MessageQueueTransactionType.Single;

        base.Send(orderMessage);

    }

所以,最后的类图应该如下:

 ps02.gif

注意在Order类的Receive()方法中,是用new关键字而不是override关键字来重写其父类PetShopQueue的Receive()虚方法。因此,如果是实例化如下的对象,将会调用PetShopQueue的Receive()方法,而不是子类Order的Receive()方法:

PetShopQueue queue = new order();

queue.Receive();

从设计上来看,由于PetShop采用“面向接口设计”的原则,如果我们要创建Order对象,应该采用如下的方式:

IOrder order = new order();

order.Receive();

考虑到IOrder的实现有可能的变化,PetShop仍然利用了工厂模式,将IOrder对象的创建用专门的工厂模块进行了封装:

 ps03.gif

在类QueueAccess中,通过CreateOrder()方法利用反射技术创建正确的IOrder类型对象:

    public static PetShop.IMessaging.IOrder CreateOrder()

    {

        string className = path + ".Order";

        return PetShop.IMessaging.IOrder)Assembly.Load(path).CreateInstance(className);

    }

path的值通过配置文件获取:

private static readonly string path = ConfigurationManager.AppSettings["OrderMessaging"];

而配置文件中,OrderMessaging的值设置如下:



之所以利用工厂模式来负责对象的创建,是便于在业务层中对其调用,例如在BLL模块中OrderAsynchronous类:

public class orderAsynchronous : IOrderStrategy

{       

    private static readonly PetShop.IMessaging.IOrder asynchOrder = PetShop.MessagingFactory.QueueAccess.CreateOrder();

    public void Insert(PetShop.Model.OrderInfo order)

{

        asynchOrder.Send(order);

    }

}

一旦IOrder接口的实现发生变化,这种实现方式就可以使得客户仅需要修改配置文件,而不需要修改代码,如此就可以避免程序集的重新编译和部署,使得系统能够灵活应对需求的改变。例如定义一个实现IOrder接口的SpecialOrder,则可以新增一个模块,如PetShop.SpecialMSMQMessaging,而类名则仍然为Order,那么此时我们仅需要修改配置文件中OrderMessaging的值即可:



OrderProcessor是一个控制台应用程序,不过可以根据需求将其设计为Windows Service。它的目的就是接收消息队列中的订单数据,然后将其插入到Order和Inventory数据库中。它利用了多线程技术,以达到提高系统性能的目的。

在OrderProcessor应用程序中,主函数Main用于控制线程,而核心的执行任务则由方法ProcessOrders()实现:

    private static void ProcessOrders()

    {

        // the transaction timeout should be long enough to handle all of orders in the batch

        TimeSpan tsTimeout = TimeSpan.FromSeconds(Convert.ToDouble(transactionTimeout * batchSize));

        order order = new order();

        while (true)

        {

            // queue timeout variables

            TimeSpan datetimeStarting = new TimeSpan(DateTime.Now.Ticks);

            double elapsedTime = 0;

            int processedItems = 0;

            ArrayList queueOrders = new ArrayList();

            using (TransactionScope ts = new TransactionScope(TransactionScopeOption.Required, tsTimeout))

            {

                // Receive the orders from the queue

                for (int j = 0; j < batchSize; j++)

                {

                    try

                    {

                        //only receive more queued orders if there is enough time

                        if ((elapsedTime + queueTimeout + transactionTimeout) < tsTimeout.TotalSeconds)

                        {

                            queueOrders.Add(order.ReceiveFromQueue(queueTimeout));

                        }

                        else

                        {

                            j = batchSize;   // exit loop

                        }

                        //update elapsed time

                        elapsedTime = new TimeSpan(DateTime.Now.Ticks).TotalSeconds - datetimeStarting.TotalSeconds;

                    }

                    catch (TimeoutException)

                    {

                        //exit loop because no more messages are waiting

                        j = batchSize;

                    }

                }

                //process the queued orders

                for (int k = 0; k < queueOrders.Count; k++)

                {

                    order.Insert((OrderInfo)queueOrders[k]);

                    processedItems++;

                    totalOrdersProcessed++;

                }

                //batch complete or MSMQ receive timed out

                ts.Complete();

            }

            Console.WriteLine("(Thread Id " + Thread.CurrentThread.ManagedThreadId + ") batch finished, " + processedItems + " items, in " + elapsedTime.ToString() + " seconds.");

        }

    }

首先,它会通过PetShop.BLL.Order类的公共方法ReceiveFromQueue()来获取消息队列中的订单数据,并将其放入到一个ArrayList对象中,然而再调用PetShop.BLL.Order类的Insert方法将其插入到Order和Inventory数据库中。

在PetShop.BLL.Order类中,并不是直接执行插入订单的操作,而是调用了IOrderStrategy接口的Insert()方法:

public void Insert(OrderInfo order)

{

    // Call credit card procesor

    ProcessCreditCard(order);

    // Insert the order (a)synchrounously based on configuration

    orderInsertStrategy.Insert(order);

}

在这里,运用了一个策略模式,类图如下所示:

 ps05.gif

在PetShop.BLL.Order类中,仍然利用配置文件来动态创建IOrderStategy对象:

private static readonly PetShop.IBLLStrategy.IOrderStrategy orderInsertStrategy = LoadInsertStrategy();

private static PetShop.IBLLStrategy.IOrderStrategy LoadInsertStrategy()

{

    // Look up which strategy to use from config file

    string path = ConfigurationManager.AppSettings["OrderStrategyAssembly"];

    string className = ConfigurationManager.AppSettings["OrderStrategyClass"];

    // Using the evidence given in the config file load the appropriate assembly and class

    return (PetShop.IBLLStrategy.IOrderStrategy)Assembly.Load(path).CreateInstance(className);

}

由于OrderProcessor是一个单独的应用程序,因此它使用的配置文件与PetShop不同,是存放在应用程序的App.config文件中,在该文件中,对IOrderStategy的配置为:





因此,以异步方式插入订单的流程如下图所示:

 ps06.gif

Microsoft Messaging Queue(MSMQ)技术除用于异步处理以外,它主要还是一种分布式处理技术。分布式处理中,一个重要的技术要素就是有关消息的处理,而在System.Messaging命名空间中,已经提供了Message类,可以用于承载消息的传递,前提上消息的发送方与接收方在数据定义上应有统一的接口规范。

MSMQ在分布式处理的运用,在我参与的项目中已经有了实现。在为一个汽车制造商开发一个大型系统时,分销商Dealer作为.Net客户端,需要将数据传递到管理中心,并且该数据将被Oracle的EBS(E-Business System)使用。由于分销商管理系统(DMS)采用的是C/S结构,数据库为SQL Server,而汽车制造商管理中心的EBS数据库为Oracle。这里就涉及到两个系统之间数据的传递。

实现架构如下:

ps07.gif

     首先Dealer的数据通过MSMQ传递到MSMQ Server,此时可以将数据插入到SQL Server数据库中,同时利用FTP将数据传送到专门的文件服务器上。然后利用IBM的EAI技术(企业应用集成,Enterprise Application Itegration)定期将文件服务器中的文件,利用接口规范写入到EAI数据库服务器中,并最终写道EBS的Oracle数据库中。

上述架构是一个典型的分布式处理结构,而技术实现的核心就是MSMQ和EAI。由于我们已经定义了统一的接口规范,在通过消息队列形成文件后,此时的数据就已经与平台无关了,使得在.Net平台下的分销商管理系统能够与Oracle的EBS集成起来,完成数据的处理。

from:http://blog.csdn.net/takeie/archive/2007/07/24/1706084.aspx

转载于:https://www.cnblogs.com/nbalive2001/archive/2008/09/03/1282870.html

更多相关:

  • 本文是西门子开放式TCP通信的第2篇,上一篇我们讲了使用西门子1200PLC作为TCP服务器的程序编写,可以点击下方链接阅读:【公众号dotNet工控上位机:thinger_swj】基于Socket访问西门子PLC系列教程(一)在完成上述步骤后,接下来就是编写上位机软件与PLC之间进行通信。上位机UI界面设计如下图所示:从上图可以看出...

  • 我有一个大型数据集,列出了在全国不同地区销售的竞争对手产品。我希望通过使用这些新数据帧名称中的列值的迭代过程,根据区域将该数据帧分成几个其他区域,以便我可以分别处理每个数据帧-例如根据价格对每个地区的信息进行排序,以了解每个地区的市场情况。我给出了以下数据的简化版本:Competitor Region ProductA Product...

  • 作为一名IT从业者,我来回答一下这个问题。首先,对于具有Java编程基础的人来说,学习Python的初期并不会遇到太大的障碍,但是要结合自己的发展规划来制定学习规划,尤其要重视学习方向的选择。Java与Python都是比较典型的全场景编程语言,相比于Java语言来说,当前Python语言在大数据、人工智能领域的应用更为广泛一些,而且大...

  • 这段时间通过学习相关的知识,最大的变化就是看待事物更加喜欢去了解事物后面的本质,碰到问题后解决问题思路也发生了改变。举个具体的例子,我在学习数据分析,将来会考虑从事这方面的工作,需要掌握的相关专业知识这个问题暂且按下不表,那哪些具体的问题是我需要了解的呢,以下简单罗列:1、了解数据分析师这个岗位在各个地区的需求情况?2、数据分析师的薪...

  • 这一节将开始学习python的一个核心数据分析支持库---pandas,它是python数据分析实践与实战的必备高级工具。对于使用 Python 进行数据分析来说,pandas 几乎是无人不知,无人不晓的。今天,我们就来认识认识数据分析界鼎鼎大名的 pandas。目录一. pandas主要数据结构 SeriesDataFrame二...

  • 首先对微擎的工作原理做简单描述, 微擎使用规则和模块的机制来处理公众平台的请求数据并返回响应的结果.执行流程描述为: 粉丝用户与公众号码进行对话或交互, 而后公众平台将粉丝用户的请求消息(当前包括: 文本, 图片, 位置, 链接, 事件. 请参阅消息类型)传递给微擎系统, 微擎系统按照消息类型和对应的公众号所设定的规则列表匹配到合适的...

  • 消息队列的使用场景以下介绍消息队列在实际应用常用的使用场景。异步处理、应用解耦、流量削锋和消息通讯四个场景。1】异步处理:场景说明:用户注册后,需要发注册邮件和注册短信。引入消息队列后架构如下:用户的响应时间=注册信息写入数据库的时间,例如50毫秒。发注册邮箱、发注册短信写入消息队列后,直接返回客户端,因写入消息队列的速度很快,基...

  • 下面是我凭记忆想到的几个题目,有需要的同学就拿去吧,我也算做了点善事. 中体骏彩C++笔试题 2013-11-18 1.指针的含义是:B A.名字 B.地址 C.名称 D.符号 2.给出下面的程序输出: #include #include #include ...

  • 双端通信描述 利用消息队列针对发送接受消息的类型唯一性 进行多个客户端之间消息传递,而不需要server端进行消息转发。 同时消息队列的读阻塞和写阻塞特性(消息队列中已经写入数据,如果再不读出来,则无法再次写入)让消息队列的实现过程只能如下: 客户端1的父进程用来处理类型1的消息写,子进程处理类型2的消息读客户端2的父进程处理类型...

  • 文章目录基本介绍编程接口代码实例消息队列的发送和接收消息队列中的消息对象的属性控制 基本介绍 支持不同进程之间以消息(messages)的形式进行数据交换,消息能够拥有自己的标识,且内核使用链表方式进行消息管理。进程之间的通信角色为:发送者和接受者 发送者: a. 获取消息队列的ID(key或者msgid) b. 将数据放入...

  • 1. 什么时候使用throws ? (1)定义功能方法时候,需要把出现的问题暴露出来,让调用者去处理。那么就通过throws在方法上标识。 (2)有时候,我们是可以对异常进行处理的,但是又有些时候,我们根本就没有权限去处理某个异常。或者说我们处理不了,我就不处理了。为了解决这个出错问题,java针对这种问题,就提供了另一种处理方案:t...