C#高性能大容量SOCKET并发(五):粘包、分包、解包

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 原文:C#高性能大容量SOCKET并发(五):粘包、分包、解包 粘包 使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。
原文: C#高性能大容量SOCKET并发(五):粘包、分包、解包

粘包

使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。

分包

在NETIOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

应用层数据包格式如下:

应用层数据包格式  
数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
AsyncSocketInvokeElement分包处理主要代码,我们收到的数据都是在ProcessReceive方法中处理,处理的方法是把收到的数据存到缓冲区数组中,然后取前4个字节为长度,如果剩下的字节数大于等于长度,则取到一个完整包,进行后续逻辑处理,如果取到的不够一个包,则不处理,等待后续包接收,具体代码如下:
        public virtual bool ProcessReceive(byte[] buffer, int offset, int count) //接收异步事件返回的数据,用于对数据进行缓存和分包
        {
            m_activeDT = DateTime.UtcNow;
            DynamicBufferManager receiveBuffer = m_asyncSocketUserToken.ReceiveBuffer;

            receiveBuffer.WriteBuffer(buffer, offset, count);
            if (receiveBuffer.DataCount > sizeof(int))
            {
                //按照长度分包
                int packetLength = BitConverter.ToInt32(receiveBuffer.Buffer, 0); //获取包长度
                if (NetByteOrder)
                    packetLength = System.Net.IPAddress.NetworkToHostOrder(packetLength); //把网络字节顺序转为本地字节顺序


                if ((packetLength > 10 * 1024 * 1024) | (receiveBuffer.DataCount > 10 * 1024 * 1024)) //最大Buffer异常保护
                    return false;

                if ((receiveBuffer.DataCount - sizeof(int)) >= packetLength) //收到的数据达到包长度
                {
                    bool result = ProcessPacket(receiveBuffer.Buffer, sizeof(int), packetLength);
                    if (result)
                        receiveBuffer.Clear(packetLength + sizeof(int)); //从缓存中清理
                    return result;
                }
                else
                {
                    return true;
                }
            }
            else
            {
                return true;
            }
        }
解包

由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在ProcessPacket中,命令和数据的包格式为:

命令长度Len:Cardinal(4字节无符号整数) 命令 数据
        public virtual bool ProcessPacket(byte[] buffer, int offset, int count) //处理分完包后的数据,把命令和数据分开,并对命令进行解析
        {
            if (count < sizeof(int))
                return false;
            int commandLen = BitConverter.ToInt32(buffer, offset); //取出命令长度
            string tmpStr = Encoding.UTF8.GetString(buffer, offset + sizeof(int), commandLen);
            if (!m_incomingDataParser.DecodeProtocolText(tmpStr)) //解析命令
              return false;

            return ProcessCommand(buffer, offset + sizeof(int) + commandLen, count - sizeof(int) - commandLen); //处理命令
        }
每个包中包含多个协议关键字,每个协议关键字用回车换行分开,因此我们需要调用文本分开函数,然后针对每条命令解析出关键字和值,具体代码在IncomingDataParser.DecodeProtocolText如下:
        public bool DecodeProtocolText(string protocolText)
        {
            m_header = "";
            m_names.Clear();
            m_values.Clear();
            int speIndex = protocolText.IndexOf(ProtocolKey.ReturnWrap);
            if (speIndex < 0)
            {
                return false;
            }
            else
            {
                string[] tmpNameValues = protocolText.Split(new string[] { ProtocolKey.ReturnWrap }, StringSplitOptions.RemoveEmptyEntries);
                if (tmpNameValues.Length < 2) //每次命令至少包括两行
                    return false;
                for (int i = 0; i < tmpNameValues.Length; i++)
                {
                    string[] tmpStr = tmpNameValues[i].Split(new string[] { ProtocolKey.EqualSign }, StringSplitOptions.None);
                    if (tmpStr.Length > 1) //存在等号
                    {
                        if (tmpStr.Length > 2) //超过两个等号,返回失败
                            return false;
                        if (tmpStr[0].Equals(ProtocolKey.Command, StringComparison.CurrentCultureIgnoreCase))
                        {
                            m_command = tmpStr[1];
                        }
                        else
                        {
                            m_names.Add(tmpStr[0].ToLower());
                            m_values.Add(tmpStr[1]);
                        }
                    }
                }
                return true;
            }
        }
处理命令

解析出命令后,需要对每个命令进行处理,各个协议实现类从AsyncSocketInvokeElement.ProcessCommand继承,然后编写各自协议处理逻辑,如吞吐量的测试协议逻辑实现代码如下:

namespace SocketAsyncSvr
{
    class ThroughputSocketProtocol : BaseSocketProtocol
    {
        public ThroughputSocketProtocol(AsyncSocketServer asyncSocketServer, AsyncSocketUserToken asyncSocketUserToken)
            : base(asyncSocketServer, asyncSocketUserToken)
        {
            m_socketFlag = "Throughput";
        }

        public override void Close()
        {
            base.Close();
        }

        public override bool ProcessCommand(byte[] buffer, int offset, int count) //处理分完包的数据,子类从这个方法继承
        {
            ThroughputSocketCommand command = StrToCommand(m_incomingDataParser.Command);
            m_outgoingDataAssembler.Clear();
            m_outgoingDataAssembler.AddResponse();
            m_outgoingDataAssembler.AddCommand(m_incomingDataParser.Command);
            if (command == ThroughputSocketCommand.CyclePacket)
                return DoCyclePacket(buffer, offset, count);
            else
            {
                Program.Logger.Error("Unknow command: " + m_incomingDataParser.Command);
                return false;
            }
        }

        public ThroughputSocketCommand StrToCommand(string command)
        {
            if (command.Equals(ProtocolKey.CyclePacket, StringComparison.CurrentCultureIgnoreCase))
                return ThroughputSocketCommand.CyclePacket;
            else
                return ThroughputSocketCommand.None;
        }

        public bool DoCyclePacket(byte[] buffer, int offset, int count)
        {
            int cycleCount = 0;
            if (m_incomingDataParser.GetValue(ProtocolKey.Count, ref cycleCount))
            {
                m_outgoingDataAssembler.AddSuccess();
                cycleCount = cycleCount + 1;
                m_outgoingDataAssembler.AddValue(ProtocolKey.Count, cycleCount);
            }
            else
                m_outgoingDataAssembler.AddFailure(ProtocolCode.ParameterError, "");
            return DoSendResult(buffer, offset, count);
        }
    }
}
DEMO下载地址: http://download.csdn.net/detail/sqldebug_fan/7467745
免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

目录
打赏
0
0
0
0
216
分享
相关文章
C#中简单Socket编程
1. 先运行服务器代码。服务器将开始监听指定的IP和端口,等待客户端连接。 1. 然后运行客户端代码。客户端将连接到服务器并发送消息。 1. 服务器接收到消息后,将回应客户端,并在控制台上显示接收到的消息。 1. 客户端接收到服务器的回应消息,并在控制台上显示。
154 15
C#使用Socket实现分布式事件总线,不依赖第三方MQ
`CodeWF.EventBus.Socket` 是一个轻量级的、基于Socket的分布式事件总线系统,旨在简化分布式架构中的事件通信。它允许进程之间通过发布/订阅模式进行通信,无需依赖外部消息队列服务。
C#使用Socket实现分布式事件总线,不依赖第三方MQ
|
5月前
|
C# 一分钟浅谈:Socket 编程基础
【10月更文挑战第7天】本文介绍了Socket编程的基础知识、基本操作及常见问题,通过C#代码示例详细展示了服务器端和客户端的Socket通信过程,包括创建、绑定、监听、连接、数据收发及关闭等步骤,帮助开发者掌握Socket编程的核心技术和注意事项。
165 3
C# 一分钟浅谈:Socket 编程基础
今天我们聊聊C#的并发和并行
今天我们聊聊C#的并发和并行
112 1
揭秘:如何轻松驾驭Uno Platform,用C#和XAML打造跨平台神器——一步步打造你的高性能WebAssembly应用!
【8月更文挑战第31天】Uno Platform 是一个跨平台应用程序框架,支持使用 C# 和 XAML 创建多平台应用,包括 Web。通过编译为 WebAssembly,Uno Platform 可实现在 Web 上运行高性能、接近原生体验的应用。本文介绍如何构建高效的 WebAssembly 应用:首先确保安装最新版本的 Visual Studio 或 VS Code 并配置 Uno Platform 开发环境;接着创建新的 Uno Platform 项目;然后通过安装工具链并使用 Uno WebAssembly CLI 编译应用;最后添加示例代码并测试应用。
222 0
揭秘Apache Wicket:页面生命周期背后的神秘力量!
【8月更文挑战第31天】李工是一位热爱Web开发的程序员,近日在技术博客上分享了他对Apache Wicket框架的学习心得,特别是页面生命周期的理解。他认为掌握Wicket页面生命周期对于开发富交互式Web应用至关重要。他通过一个简单的计数器应用示例,详细解释了Wicket的组件化设计理念以及页面和组件在生命周期中的变化。
74 0
[C#] 在异步请求并发情况下,dbcontext的安全问题
摘要: 在多线程异步环境中,偶发的数据库修改失败可能因并发的`dbContext`操作引起,当一个线程的修改未保存时,另一线程尝试相同操作会导致错误。另外,单次执行成功但随后失败的情况可能源于`dbContext`的瞬时生命周期。若`saveChangesAsync()`在刷新页面请求到来前未完成,新的请求可能会尝试在写操作期间读取数据,从而引发问题。
109 6
|
9月前
|
Java Socket编程与多线程:提升客户端-服务器通信的并发性能
【6月更文挑战第21天】Java网络编程中,Socket结合多线程提升并发性能,服务器对每个客户端连接启动新线程处理,如示例所示,实现每个客户端的独立操作。多线程利用多核处理器能力,避免串行等待,提升响应速度。防止死锁需减少共享资源,统一锁定顺序,使用超时和重试策略。使用synchronized、ReentrantLock等维持数据一致性。多线程带来性能提升的同时,也伴随复杂性和挑战。
149 0
Socket实现模拟TCP通信粘包问题
Socket实现模拟TCP通信粘包问题
|
10月前
|
C#
C# 使用Socket对接
C# 使用Socket对接
63 1