JAVA通信编程(四)——UDP通讯

简介: 经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。 UdpImpl类如下: package com.

经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。

UdpImpl类如下:

package com.zzh.comm;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.log4j.Logger;

public class UdpImpl implements CommBuff
{
	private Logger logger = Logger.getLogger(Object.class.getName());
	
	private int local_port;
	private int dest_port;
	private String ip;
	private int time_out;
	
	DatagramSocket client = null;
	
	private String fileName = "/udp.properties";
	public UdpImpl()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		try
		{
			local_port = Integer.parseInt(map.get("udp_local_port"));
			dest_port = Integer.parseInt(map.get("udp_dest_port"));
			time_out = Integer.parseInt(map.get("udp_timeout"));
			ip = map.get("udp_dest_ip");
		}
		catch (Exception e)
		{
			logger.error(e.getMessage());
		}
	}
	
	@Override
	public byte[] readBuff()
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		byte[] recvBuf = new byte[1024];
		DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
		try
		{
			client.receive(recvPacket);
		}
		catch (IOException e)
		{
			logger.info(e.getMessage());
			return new byte[0];
		}
		byte[] ans = new byte[recvPacket.getLength()];
		System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
		logger.info("网口接收:"+CommUtil.bytesToHex(ans));
		return ans;
	}

	@Override
	public void writeBuff(byte[] message)
	{
		if(client == null)
		{
			throw new RuntimeException("clinet is null!");
		}
		
		try
		{
			InetAddress addr = InetAddress.getByName(ip);
			DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
			client.send(sendPacket);
			logger.info("发送成功: "+CommUtil.bytesToHex(message));
		}
		catch (UnknownHostException e)
		{
			logger.error(e.getMessage());
		}
		catch (IOException e)
		{
			logger.error(e.getMessage());
		}
		
	}

	@Override
	public void open() {
		try
		{
			client = new DatagramSocket(local_port);
			client.setSoTimeout(time_out);
			if(client != null)
			{
				logger.info("client open succeed!");
			}
		}
		catch (SocketException e)
		{
			logger.error(e.getMessage());
		}
	}

	@Override
	public void close() 
	{
		if(client != null)
		{
			client.close();
		}
	}

	@Override
	public Object getInfo()
	{
		return null;
	}

}
UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。

下面通过一个简单工厂模式,可以实现底层通讯的便利性。

package com.zzh.comm;

public class CommFactory
{
	public CommBuff getCommBuff(String properties) throws Exception
	{
		if(properties.equals("comm_serial"))
		{
			return new SerialImpl();
		}
		else if(properties.equals("comm_tcpServer"))
		{
			return new TcpServerImpl();
		}
		else if(properties.equals("comm_tcpClient"))
		{
			return new TcpClientImpl();
		}
		else if(properties.equals("comm_udp"))
		{
			return new UdpImpl();
		}
		else
		{
			throw new Exception("Communication para error: no found avaliable communication Object instance.");
		}
	}
}
上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。

本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.

package com.zzh.protocol;

import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.zzh.comm.CommBuff;
import com.zzh.comm.CommFactory;
import com.zzh.comm.CommUtil;
import com.zzh.comm.ReadProperties;
import com.zzh.dao.ModbusDao;
import com.zzh.dao.ModbusDaoImpl;
import com.zzh.dao.pojo.ModbusPojo;

public class Modbus {
	private CommBuff comm;
	private int comm_timeout;
	private byte devAddr;
	
	private static int RECV_SIZE = 35;
	private static int RECV_INNER_SIZE = 30;
	private static int MINUTE=60000;
	private volatile boolean  refreshFlag = false;
	
	private ModbusPojo modbusPojo; 
	
	private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
	private String fileName = "/modbus.properties";
	
	public Modbus()
	{
		Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
		String comm_way = map.get("modbus_comm_way");
		String comm_timeouts = map.get("comm_timeout");
		comm_timeout = Integer.parseInt(comm_timeouts);
		String devAddrs = map.get("devAddr");
		devAddr = Byte.parseByte(devAddrs);
		if(comm_way!=null)
		{
			modbusPojo = new ModbusPojo(); 
			try
			{
				comm = new CommFactory().getCommBuff(comm_way);
			}
			catch (Exception e)
			{
				e.printStackTrace();
			}
			comm.open();
			
			ExecutorService pool = Executors.newFixedThreadPool(2);
			Thread thread1 = new Thread(new readThread());
	    	thread1.setDaemon(true);
	    	Thread thread2 = new Thread(new dbThread());
	    	thread2.setDaemon(true);
	    	pool.execute(thread1);
	    	pool.execute(thread2);
		}
		else
		{
			throw new RuntimeException("没有配置好合适的串口参数");
		}
	}
	
	private class readThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				byte[] recvBuff = comm.readBuff();
				if(recvBuff.length>0)
				{
					for(int i=0;i<recvBuff.length;i++)
					{
						deque.add(recvBuff[i]);
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
	}
	
	private class dbThread implements Runnable
	{
		@Override
		public void run()
		{
			while(true)
			{
				if(refreshFlag == true)
				{
					Calendar now = Calendar.getInstance();
					if(now.get(Calendar.MINUTE)%5==0)
//					if(true)
					{
						synchronized (modbusPojo)
						{
							filterModbusPojo();
							modbusPojo.setNow(TimeUtil.getDateOfMM(now));
//							modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
							ModbusDao md = new ModbusDaoImpl();
							md.addModbus(modbusPojo);
						}
					}
				}
				try
				{
					TimeUnit.MILLISECONDS.sleep(MINUTE);
//					TimeUnit.MILLISECONDS.sleep(1000);
				}
				catch (InterruptedException e)
				{
					e.printStackTrace();
				}
			}
		}
		
	}
	
	public void filterModbusPojo()
	{
		modbusPojo.setQua(0);
		if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
		{
			modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
			System.out.println("getTemperature = "+modbusPojo.getTemperature());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
		{
			modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
			System.out.println("getHumidity = "+modbusPojo.getHumidity());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
		{
			modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
			System.out.println("getPressure = "+modbusPojo.getPressure());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
		{
			modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
			System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
		{
			modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
			System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
		if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
		{
			modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
			System.out.println("getWindDir = "+modbusPojo.getWindDir());
			modbusPojo.setQua(1);
		}
	}
	
	public void process()
	{
		try
		{
			TimeUnit.MILLISECONDS.sleep(comm_timeout);
		}
		catch (InterruptedException e)
		{
			e.printStackTrace();
		}
		recvProcess();
		sendProcess();
	}

	public void recvProcess()
	{
		refreshFlag = false;
		byte[] recvBuff = new byte[RECV_INNER_SIZE];
		while(deque.size()>=RECV_SIZE)
		{
			Byte first = deque.pollFirst();
			if(first == devAddr)
			{
				Byte second = deque.pollFirst();
				if(second == 0x03)
				{
					Byte third = deque.pollFirst();
					if(third == RECV_INNER_SIZE)
					{
						for(int i=0;i<RECV_INNER_SIZE;i++)
						{
							recvBuff[i] = deque.pollFirst();
						}
						deque.pollFirst();
						deque.pollFirst();
						dealRecvBuff(recvBuff);
					}
				}
			}
		}
	}
	
	public void dealRecvBuff(byte[] recvBuff)
	{
		System.out.println(CommUtil.bytesToHex(recvBuff));
		refreshFlag = true;
		getModbusPojo(recvBuff);
//		modbusPojo.print();
	}
	
	public void getModbusPojo(byte[] recvBuff)
	{
		int temp;
		synchronized (modbusPojo)
		{
			for(int i=0;i<recvBuff.length;)
			{
				switch(i)
				{
					case 0:
						temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
						double envTemperature = temp*0.1;
						modbusPojo.setEnvTemperature(envTemperature);
						break;
					case 2:
						temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
						double temperature = temp*0.1;
						modbusPojo.setTemperature(temperature);
						break;
					case 4:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
						double humidity = temp*0.1;
						modbusPojo.setHumidity(humidity);
						break;
					case 6:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
						double pressure = temp*0.1;
						modbusPojo.setPressure(pressure);
						break;
					case 8:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
						modbusPojo.setIrradiance(temp);
						break;
					case 10:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
						modbusPojo.setScaIrradiance(temp);
						break;
					case 12:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
						modbusPojo.setDirIrradiance(temp);
						break;
					case 14:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
						modbusPojo.setWindDir(temp);
						break;
					case 16:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
						double windSpeed = temp*0.1;
						modbusPojo.setWindSpeed(windSpeed);
						break;
					case 18:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
						double windSpeedTwo = temp*0.1;
						modbusPojo.setWindSpeedTwo(windSpeedTwo);
						break;
					case 20:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
						double windSpeedTen = temp*0.1;
						modbusPojo.setWindSpeedTen(windSpeedTen);
						break;
					case 22:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
						modbusPojo.setDailyExposure(temp);
						break;
					case 24:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
						double totalExposure = temp*0.001;
						modbusPojo.setTotalExposure(totalExposure);
						break;
					case 26:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
						double scaExposure = temp*0.001;
						modbusPojo.setScaExposure(scaExposure);
						break;
					case 28:
						temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
						double dirExposure = temp*0.001;
						modbusPojo.setDirExposure(dirExposure);
						break;
				}
				i=i+2;
			}
		}
	}
	
	public void sendProcess()
	{
		byte[] message = new byte[8];
		int sendLen = 0;
		message[sendLen++] = devAddr;
		message[sendLen++] = 0x03;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x00;
		message[sendLen++] = 0x0F;
		byte[] crc = CommUtil.CRC16(message,6);
		message[sendLen++] = crc[0];
		message[sendLen++] = crc[1];
		comm.writeBuff(message);
	}

}


目录
相关文章
|
10天前
|
存储 Java 数据库连接
java多线程之线程通信
java多线程之线程通信
|
10天前
|
安全 Java 开发者
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第9天】本文将深入探讨Java并发编程的核心概念,包括线程安全和性能优化。我们将详细解析Java中的同步机制,包括synchronized关键字、Lock接口以及并发集合等,并探讨它们如何影响程序的性能。此外,我们还将讨论Java内存模型,以及它如何影响并发程序的行为。最后,我们将提供一些实用的并发编程技巧和最佳实践,帮助开发者编写出既线程安全又高效的Java程序。
22 3
|
12天前
|
Java
Java 并发编程:深入理解线程池
【4月更文挑战第8天】本文将深入探讨 Java 中的线程池技术,包括其工作原理、优势以及如何使用。线程池是 Java 并发编程的重要工具,它可以有效地管理和控制线程的执行,提高系统性能。通过本文的学习,读者将对线程池有更深入的理解,并能在实际开发中灵活运用。
|
8天前
|
安全 算法 Java
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第11天】 在Java中,高效的并发编程是提升应用性能和响应能力的关键。本文将探讨Java并发的核心概念,包括线程安全、锁机制、线程池以及并发集合等,同时提供实用的编程技巧和最佳实践,帮助开发者在保证线程安全的前提下,优化程序性能。我们将通过分析常见的并发问题,如竞态条件、死锁,以及如何利用现代Java并发工具来避免这些问题,从而构建更加健壮和高效的多线程应用程序。
|
12天前
|
Java
Java并发编程:深入理解线程池
【4月更文挑战第7天】在现代软件开发中,多线程编程已经成为一种不可或缺的技术。为了提高程序性能和资源利用率,Java提供了线程池这一强大工具。本文将深入探讨Java线程池的原理、使用方法以及如何根据实际需求定制线程池,帮助读者更好地理解和应用线程池技术。
15 0
|
2天前
|
安全 Java 调度
Java并发编程:深入理解线程与锁
【4月更文挑战第18天】本文探讨了Java中的线程和锁机制,包括线程的创建(通过Thread类、Runnable接口或Callable/Future)及其生命周期。Java提供多种锁机制,如`synchronized`关键字、ReentrantLock和ReadWriteLock,以确保并发访问共享资源的安全。此外,文章还介绍了高级并发工具,如Semaphore(控制并发线程数)、CountDownLatch(线程间等待)和CyclicBarrier(同步多个线程)。掌握这些知识对于编写高效、正确的并发程序至关重要。
|
2天前
|
缓存 分布式计算 监控
Java并发编程:深入理解线程池
【4月更文挑战第17天】在Java并发编程中,线程池是一种非常重要的技术,它可以有效地管理和控制线程的执行,提高系统的性能和稳定性。本文将深入探讨Java线程池的工作原理,使用方法以及在实际开发中的应用场景,帮助读者更好地理解和使用Java线程池。
|
4天前
|
缓存 监控 Java
Java并发编程:线程池与任务调度
【4月更文挑战第16天】Java并发编程中,线程池和任务调度是核心概念,能提升系统性能和响应速度。线程池通过重用线程减少创建销毁开销,如`ThreadPoolExecutor`和`ScheduledThreadPoolExecutor`。任务调度允许立即或延迟执行任务,具有灵活性。最佳实践包括合理配置线程池大小、避免过度使用线程、及时关闭线程池和处理异常。掌握这些能有效管理并发任务,避免性能瓶颈。
|
4天前
|
设计模式 运维 安全
深入理解Java并发编程:线程安全与性能优化
【4月更文挑战第15天】在Java开发中,多线程编程是提升应用程序性能和响应能力的关键手段。然而,它伴随着诸多挑战,尤其是在保证线程安全的同时如何避免性能瓶颈。本文将探讨Java并发编程的核心概念,包括同步机制、锁优化、线程池使用以及并发集合等,旨在为开发者提供实用的线程安全策略和性能优化技巧。通过实例分析和最佳实践的分享,我们的目标是帮助读者构建既高效又可靠的多线程应用。
|
5天前
|
SQL 安全 Java
Java安全编程:防范网络攻击与漏洞
【4月更文挑战第15天】本文强调了Java安全编程的重要性,包括提高系统安全性、降低维护成本和提升用户体验。针对网络攻击和漏洞,提出了防范措施:使用PreparedStatement防SQL注入,过滤和转义用户输入抵御XSS攻击,添加令牌对抗CSRF,限制文件上传类型和大小以防止恶意文件,避免原生序列化并确保数据完整性。及时更新和修复漏洞是关键。程序员应遵循安全编程规范,保障系统安全。