流水线上的思考——异步程序开发模型(1)

简介:
我想大家都熟悉现代工厂的流水线生产作业——在一个车间里,机器流水线的两旁坐满了工人,一个挨着一个,井然有序。当流水线上有需要安装部件的产品过来时,每个工人从流水线上取出待安装的产品,然后对产品进行单独的加工,再将产品放回流水线上,进入下一个生产环节。这样生产方式所产生的直接好处,就是让每一个员工只做自己的事情,因而显著地提高整体工作效率。
其实,这种工厂流水线的生产方式,我们完全可以借鉴到程序的开发中来。以下几章我将以一个基于网络的服务端程序为例,来给大家讲讲如何在我们的程序中实现这种流水线的设计方法。

 

首先我们来假设一个业务场景:
1:客户端程序发送消息给服务端。
2:服务端程序接收到消息以后,根据消息的类型进行区分,由相应的业务处理函数去处理相应的业务。
3:将处理后的数据进行数据库入库,同时将处理后的信息发送给相应的客户端。  
上面的场景,几乎包含了开发服务端程序所涉及到的绝大对数内容:
1:高并发的网络处理。
2:大量的业务分发处理。
3:高速的数据入库(慢设备操作)。
 
就目前来看,我工作之中遇到的项目,绝大多数是由这三部分组成。

 

第一个内容(高并发的网络处理),需要有一个能够支撑高并发的网络库。所以这次我们先不讨论。
今天我们先来讨论一下第二个内容,即大量业务分发处理。
我们先回顾一下流水线的特点:
1:一个流水线,我们可以将其理解成一个我们需要处理的业务队列,这个队列可以源源不断的接收外来的业务请求入队。
2:坐在流水线两边的员工,我们可以理解成处理业务的处理函数。  
这样一来一个最为简单的流水线设计就出来了:
流水线我们用list来实现,设计出来的就是这样的一个流水线程序:
While(!list.empty())
{
         Switch(业务判断)
         {
                   Case …  //业务处理1
                   {
                            //处理业务1
                   }
          Case …   //业务处理2
          {
                            //处理业务2
                   }
 
     }
//从链表中删除这个业务信息
}
在这里我并没有使用具体的语言来进行实现,我想,这种实现是每个学习开发的朋友都会知道如何编写的。
 
现在回顾一下这个设计,我们会发现有一个地方需要考虑:
我们使用的是list,所以客户端的业务请求必然是放到这个list中的。当处理完毕后,就需要从list中将这个业务删除。这一加一删,在多线程中我们就需要使用临界区来对这个list进行保护,避免出现异常。  
对于临界区的使用,我想开发多线程的人都知道,它会在多线程中保证共享资源的安全,但同时它也会因此而带来一定的性能损失。其实很多人都觉得这种损失是不足为道的,但是据我的测试,当大量产生临界区碰撞的时候,程序的处理能力会急剧下降。

 

那么如何来进行优化呢?只要我们能想办法减小产生临界区碰撞的次数,那不就可以了么?——也就是说,我们要想办法降低临界区的颗粒度。
在《数据结构与算法》一书中讲到了单链表。单链表的特性就是只需要将链表头进行重新指向,则该整个的链表就会被重新指向。我们可以使用单链表的这个特性来进行优化。
那么优化后的处理结果将会是:
其中:
Head:是写入业务的单链表头。
WorkHead:是处理业务的单链表头。
 
Lock()
WorkHead = Head;
head = null;
unlock();
 
While(!workHead. empty())
{
    Switch(业务判断)
    {
       Case …  //业务处理1
       {
          //处理业务1
       }
       Case …  //业务处理2
       {
          //处理业务2
       }
}
//从链表中删除这个业务信息
}
 
这样一来,我们就实现了两个业务链表:一个链表(Head为链表头)用来接收客户端发送来的数据,一个链表(WorkHead为链表头)用来实际的处理业务。当处理WorkHead为空时,我们将Head链表的头和WorkHead头进行替换。  
好了,写到这里应该有很多人会说,你写这么多,却一行代码都没有,我们怎么用啊?
为了避免大家说我写的毫无内容,看来只能列出代码了。以下代码是分别使用DelphiC++写的这种方式的实现:
Delphi版本:
其中的定义为:
PMsgBuffer = ^TMsgBuffer;
  TMsgBuffer = record
    SocketHandle: Integer;
    MainTypeInteger;
Buffer: Pchar;
    BufferLen: Integer;
    Next: PMsgBuffer;
  end;
 
procedure TMainThread.Reponse;
var
  pWork,
  pNext: PMsgBuffer;
begin
  pWork:=nil;
  pNext:=nil;
 
  EnterCriticalSection(QueueCS);
  try
    if not Assigned(pWork) then
    begin
      pWork:=First;
      First:=nil;
      Last:=nil;
    end;
  finally
    LeaveCriticalSection(QueueCS);
  end;
 
  while Assigned(pWork) do
  begin
    pNext:=pWork.Next;
   
    //业务分解处理
    case pWork.MainType of
      User_Online://用户上线业务
      begin
        Online(pWork.pOutPacket, pWork.Buffer, pWork.BufferLen, pWork.SocketHandle, pWork.RemoteIP);
      end;
      User_Offline://用户下线业务
      begin
        Offline(pWork.pOutPacket, pWork.Buffer, pWork.BufferLen, pWork.SocketHandle, pWork.RemoteIP);
      end;
    end;
 
    if Assigned(pWork.Buffer) then
    begin
      FreeMem(pWork.Buffer);
    end;
    Dispose(pWork);
    pWork:=pNext;
  end;
end;
 
C++版本:
其中BusinessBuffer为:
struct BusinessBuffer
{
         WORD  main_type_
         std::vector<char>    linker_buffer;
         unsigned int linker_handle;
         std::string  linker_ip;
         BusinessBuffer *next;
};
 
void  Business::reponse()
{
         BusinessBuffer *work_ptr = NULL;
         BusinessBuffer *next_ptr = NULL;
 
         lock();
         work_ptr = first_ptr_;
         first_ptr_ = NULL;
         last_ptr_  = NULL;
         Unlock();
        
         while (work_ptr != NULL)
         {
                   next_ptr = work_ptr->next;
                   switch(work_ptr->main_type_)
                   {
                   case User_Online://用户上线业务     
                            Online(work_ptr);
                            break;
                   case User_Offline://用户下线业务     
                            Offline(work_ptr);
                            break;
                   }
                   delete work_ptr;
                   work_ptr = next_ptr;
         }
}
 
好了,至此一个简单的流水线处理设计好了。但是我们会发现其中存在一些其它的问题:
1:当每个业务处理过程之中存在一定的数据库或者文件等慢设备处理的时候,那岂不是将我们这个流水线的处理能力完全的拉了下来了么?一个桶能装多少水,完全由它最短的板子来决定的。
2:当业务处理完毕以后,需要将处理结果再做其它业务处理的时候,用一个流水线是否能够满足性能上的要求呢?
 
以上的问题我们将会在以后的章节中陆续讲到。
本文转自狗窝博客51CTO博客,原文链接http://blog.51cto.com/fxh7622/1135371如需转载请自行联系原作者

fxh7622
相关文章
|
2月前
|
机器学习/深度学习 Python Windows
【架构】流水线结合生产者消费者模型赋能模型推理过程
【架构】流水线结合生产者消费者模型赋能模型推理过程
20 0
|
6月前
|
jenkins Devops 机器人
【devops】九、Jenkins流水线(下)
【devops】九、Jenkins流水线(下)
|
6月前
|
jenkins Devops 持续交付
【devops】九、Jenkins流水线(上)
【devops】九、Jenkins流水线(上)
107 1
|
30天前
|
JavaScript jenkins 持续交付
Jenkins自动构建 CI/CD流水线学习笔记(从入门到入土,理论+示例)
Jenkins自动构建 CI/CD流水线学习笔记(从入门到入土,理论+示例)
49 0
|
6月前
|
Kubernetes jenkins 持续交付
jenkins结合k8s构建流水线如何提升运行性能和构建效率
jenkins结合k8s构建流水线如何提升运行性能和构建效率
|
7月前
|
jenkins Java 持续交付
实战:Docker+Jenkins+Gitee构建CICD流水线
实战:Docker+Jenkins+Gitee构建CICD流水线
|
4月前
|
存储 jenkins Shell
Jenkins Pipeline 流水线任务 补充篇
Jenkins Pipeline 流水线任务 补充篇
50 0
|
4月前
|
Java jenkins 持续交付
Jenkins Pipeline 流水线方式部署 SpringBoot 项目2
Jenkins Pipeline 流水线方式部署 SpringBoot 项目
135 0