123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- package com.lutao.tcp.server.tcptest;
- import cn.hutool.core.lang.UUID;
- import cn.hutool.http.HttpUtil;
- import cn.hutool.json.JSONUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import com.lutao.tcp.server.domain.entity.RecSendLog;
- import com.lutao.tcp.server.service.RecSendLogService;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- import javax.annotation.Resource;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.time.Instant;
- import java.time.LocalDateTime;
- import java.time.ZoneId;
- import java.time.format.DateTimeFormatter;
- @Slf4j
- @Component
- public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
- private static BootNettyChannelInboundHandlerAdapter nettyServerHandler;
- @Resource
- private RecSendLogService recSendLogService;
- @PostConstruct
- public void init() {
- nettyServerHandler = this;
- nettyServerHandler.recSendLogService = this.recSendLogService;
- }
- /**
- * 从客户端收到新的数据时,这个方法会在收到消息时被调用
- *
- * @param ctx
- * @param msg
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
- String json = (String) msg;
- //log.info(json);
- try{
- if (msg == null) {
- log.error("======加载客户端报文为空======【" + ctx.channel().id() + "】" + " :" + json);
- return;
- }
- if (JSONUtil.isJson(json)){
- JSONObject jsonObject = JSON.parseObject(json);
- log.info("======加载客户端报文======【" + ctx.channel().id() + "】" + " :" + jsonObject.toJSONString());
- RecSendLog recSendLog = new RecSendLog();
- String uuid = UUID.randomUUID().toString();
- recSendLog.setRec(jsonObject.toJSONString());
- recSendLog.setLogType("大范围速报报警");
- recSendLog.setCreateTime(LocalDateTime.now());
- recSendLog.setUpdateTime(LocalDateTime.now());
- recSendLog.setId(uuid);
- nettyServerHandler.recSendLogService.save(recSendLog);
- //http://115.85.203.230:45002/utility/scopeScan/warnData/submit 转发
- String body = HttpUtil.createPost("http://115.85.203.230:45002/utility/scopeScan/warnData/submit")
- .body(jsonObject.toJSONString())
- .execute().body();
- Integer code = JSONUtil.parseObj(body).getInt("code");
- log.info("======转发状态码======【" + ctx.channel().id() + "】" + " :" + code);
- if (code == 200){
- log.info("======转发成功======【" + ctx.channel().id() + "】" + " :" + jsonObject.toJSONString());
- recSendLog.setSend(jsonObject.toJSONString());
- nettyServerHandler.recSendLogService.updateById(recSendLog);
- }else{
- log.info("======转发失败======【" + ctx.channel().id() + "】" + " :" + jsonObject.toJSONString());
- }
- }else{
- log.info("======无法转换为json======【" + ctx.channel().id() + "】" + " :" + json);
- }
- //回应客户端
- ctx.write(ctx.channel().id() + "服务端已成功接收数据!");
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- }
- }
- /**
- * 从客户端收到新的数据、读取完成时调用
- *
- * @param ctx
- */
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
- log.info("channelReadComplete");
- ctx.flush();
- }
- /**
- * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
- *
- * @param ctx
- * @param cause
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
- log.info("exceptionCaught");
- cause.printStackTrace();
- ctx.close();//抛出异常,断开与客户端的连接
- }
- /**
- * 客户端与服务端第一次建立连接时 执行
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
- super.channelActive(ctx);
- ctx.channel().read();
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
- String clientIp = insocket.getAddress().getHostAddress();
- //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
- log.info("channelActive:"+clientIp+ctx.name());
- }
- /**
- * 客户端与服务端 断连时 执行
- *
- * @param ctx
- * @throws Exception
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
- super.channelInactive(ctx);
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
- String clientIp = insocket.getAddress().getHostAddress();
- ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
- log.info("channelInactive:"+clientIp);
- }
- /**
- * 服务端当read超时, 会调用这个方法
- *
- * @param ctx
- * @param evt
- * @throws Exception
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
- super.userEventTriggered(ctx, evt);
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
- String clientIp = insocket.getAddress().getHostAddress();
- ctx.close();//超时时断开连接
- log.info("userEventTriggered:"+clientIp);
- }
- @Override
- public void channelRegistered(ChannelHandlerContext ctx) throws Exception{
- log.info("channelRegistered");
- }
- @Override
- public void channelUnregistered(ChannelHandlerContext ctx) throws Exception{
- //当一个客户端断开连接或一个服务器完成其服务并关闭连接时
- log.info("channelUnregistered");
- }
- /**
- * 流量控制是一种拥塞控制机制,
- * 用于防止发送方过多地发送数据,从而导致接收方过载。
- * 当通道变得不可写时,这可能意味着接收方的缓冲区已满,
- * 因此发送方需要停止发送数据,等待通道变得可写时再继续发送
- * @throws Exception
- */
- @Override
- public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception{
- // 通道变得可写,可以继续发送数据
- log.info("Channel is writable, resuming data transmission.");
- }
- /**
- * 广播数据十六进制,每两个字符为一组,
- *如:09 57 79 30 30 30 30 30 30 30 30 30 30 30 41 30 30 35 30 30
- * 再转为对应ASCII码,即可得到明文的uuid
- * @param uuid
- */
- public String hexToASCII(String uuid) {
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < uuid.length(); i += 2) {
- String hex = uuid.substring(i, i + 2);
- int decimal = Integer.parseInt(hex, 16);
- char asciiChar = (char) decimal;
- sb.append(asciiChar);
- }
- String result = sb.toString();
- return result;
- }
- /**
- * 将时间戳转换为yyyy-MM-dd HH:mm:ss格式的日期字符串。
- * @param timestamp 时间戳(以毫秒为单位)
- * @return 格式化后的日期字符串
- */
- public String timestampToDate(long timestamp) {
- Instant instant = Instant.ofEpochSecond(timestamp); // 将时间戳转换为Instant对象
- LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); // 将Instant对象转换为本地日期时间对象
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 定义日期格式
- // 将日期时间对象格式化为字符串
- String time = dateTime.format(formatter);
- return time;
- }
- /**
- * 将LocalDateTime转换为yyyy-MM-dd HH:mm:ss格式的日期字符串。
- * @return 格式化后的日期字符串
- */
- public String LocalDateTimeToStr() {
- // 获取当前日期时间
- LocalDateTime now = LocalDateTime.now();
- // 定义日期时间的格式
- DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- // 将LocalDateTime格式化为字符串
- String formattedDateTime = now.format(formatter);
- return formattedDateTime;
- }
- }
|