推荐:
google rest 一个不错的http测试应用,google浏览器可以使用。做接口什么的,很有帮助。亲,还不快了解一下。
扯淡:
现在我算是进入一个能带着你向前走的团队,但是产品设计太扯淡,互联网应用,开发周期异常的短,也高估了开发的能力,赶进度的开发bug很多啊。
如果开发只是完成任务的行动,是不会感到痛苦的。所以说:想要做好产品的开发,痛苦才刚刚开始。原因就是开发无法左右产品的设计.....
主题:
时刻关注排行的朋友注意啦,你们都得了排行强迫症啦,赶快找个护士就医吧。
一个排行.....(我需要护士)
关于排名的详细:摸我
好吧,据说netty排在第一,那就学习一下吧!
更具公司很久以前的一个服务器框架代码,不断删减,然后得到一个很简单的服务器框架,分享一下。
自己画的流程图,流程比较简单,这方面比较弱,不太会用图表达:
1,启用netty
我们需要监听端口,这样就可以处理连接上来的tcp消息了。这一步netty用java 的NIO和OIO都封装了,我们自然选择NIO啦。
一下是启动服务器的代码:对于这个启动,你只要学习一下netty的手册例子,就马上明白了,它手册的例子也很好,建议大家看看。
public class Start { public static void main(String[] args) { //ApplicationContext context = new ClassPathXmlApplicationContext("D:/Users/dongchao/workspace/NettyTest/resources/applicationContext-task.xml"); System.out.println("=============show time!============="); initNetty(); } private static final int tcpSendBufferSize = 32768; private static final int tcpReceiveBufferSize = 32768; // 初始化端口的监听 public static void initNetty(){ InetSocketAddress addr = new InetSocketAddress(8989);//需要监听的端口,即tcp连接建立的端口 //Executors.newCachedThreadPool()的解释: //缓冲线程执行器,产生一个大小可变的线程池。 //当线程池的线程多于执行任务所需要的线程的时候,对空闲线程(即60s没有任务执行)进行回收; //当执行任务的线程数不足的时候,自动拓展线程数量。因此线程数量是JVM可创建线程的最大数目。 ServerSocketChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());//It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently. // Creates a new group with a generated name. DefaultChannelGroup allChannels = new DefaultChannelGroup("pushServerChannelGroup"); ServerBootstrap bootstrap = new ServerBootstrap(channelFactory); // PushServerPipelineFactory作为一个ChannelPipelineFactory产生的工厂类,我们可以把需要执行的Handler进行配置 ChannelPipelineFactory pipelineFactory = new PushServerPipelineFactory(allChannels); // Whenever a new connection is accepted by the server, a new ChannelPipeline will be created by the specified ChannelPipelineFactory. // 服务器新连接建立的时候,新的ChannelPipeline会通过我们定义的ChannelPipelineFactory产生,其实是调用了getPipeline()方法。 bootstrap.setPipelineFactory(pipelineFactory); if (tcpReceiveBufferSize != -1) { bootstrap.setOption("child.receiveBufferSize", tcpReceiveBufferSize); } if (tcpSendBufferSize != -1) { bootstrap.setOption("child.sendBufferSize", tcpSendBufferSize); } bootstrap.setOption("reuseAddress", true); bootstrap.setOption("child.reuseAddress", true); bootstrap.setOption("child.keepAlive", false); bootstrap.setOption("child.tcpNoDelay", true); System.out.println(" ===================netty started====================="); Channel serverChannel = bootstrap.bind(addr); allChannels.add(serverChannel); }
PushServerPipelineFactory 其实就是配置了一下Handler,他叫pushServerCommandHandler,他的作用就是把接受到的信息放进叫receivedQueen的队列去就好了,其实就是调用了MessageManager的addSocketMessage方法。
我们看一下他的messageReceived方法就明白了,netty是事件机制的,messageReceived是重写的方法,只要是受到一个连接的消息,就会触发这个方法。
public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception { CommandMessage command = (CommandMessage) messageEvent.getMessage(); if (command.message.length() > 3) { Channel ch = channelHandlerContext.getChannel(); ch.write("---------message received-------------"); // 向消息队列里插消息包,通过handleMessage这个方法, // 插入的MessagePack其实已经更具消息的不同被选择成不同的子类 // 我觉得这是很关键的设计,我们的业务逻辑就可以分成不同的MessagePack子类,然后实现它的onHandler方法 messageManager.addSocketMessage(handleMessage(command.message, messageEvent)); } else { // logger.warn("too short message."); } } //重点方法 public MessagePack handleMessage(String msg, MessageEvent e) { MessagePack messagePack = null; int fid = SjsonUtil.getFIDFromMsg(msg); switch (fid) { case 25: // 调用TestCategoryMsg messagePack = new ShowTimeMessage(msg, e.getChannel()); break; case 26: // 调用不同的业务逻辑 messagePack = new TestCategoryMsg(msg, e.getChannel()); break; default: // logger.warn("unknow FID=" + fid + ",raw msg=" + msg + ",client=" + e.getChannel().getRemoteAddress()); } return messagePack; }
PushServerPipelineFactory 除了配置好Handler,还把MessageManager启动起来了,MessageManager是spring的配置文件中配置的。注意他的init-method,就是实例化这个bean的时候会执行它的start方法,这个比较重要,因为MessageManager就是处理消息队列的模块,所以他需要在服务器启动时启动线程池去处理消息队列。MessageManager提供的方法就是用来维护一个叫receivedQueen的队列。
<bean id="messageManager" class="netty.gate.message.MessageManager" init-method="start"> </bean>
PushServerPipelineFactory:
public class PushServerPipelineFactory implements ChannelPipelineFactory { private DefaultChannelGroup channelGroup; private final PushServerCommandHandler pushServerCommandHandler; private final PushServerEncoder pushServerEncoder = new PushServerEncoder(); public PushServerPipelineFactory(DefaultChannelGroup channelGroup) { this.channelGroup = channelGroup; this.pushServerCommandHandler = new PushServerCommandHandler(this.channelGroup); pushServerCommandHandler.setMessageManager((MessageManager) TaskBeanFactory.getBean("messageManager")); } public final ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new PushServerCommandDecoder(), pushServerCommandHandler, pushServerEncoder); } protected ChannelPipelineFactory createPushServerPipelineFactory(DefaultChannelGroup allChannels) { return new PushServerPipelineFactory(allChannels); } }
很关键的MessageManager: 维护的是receivedQueen队列
public class MessageManager { // MessageManager的消息队列,下面的addSocketMessage方法就是向这个队列塞MessagePack的 private LinkedBlockingQueue<MessagePack> receivedQueen = new LinkedBlockingQueue<MessagePack>(512); private ExecutorService pool; private int reStartThreadCount = 0; public void start() { this.pool = Executors.newCachedThreadPool(); pool.submit(new PushRecvThread()); } private class PushRecvThread implements Runnable { public void run() { while (true) { MessagePack message = waitForProcessMessage(); if (message != null) { // 利用多态执行继承MessagePack的子类方法 message.onHandler(TaskBeanFactory.getContextInstance()); } } } } public MessagePack waitForProcessMessage() { MessagePack message = null; while (message == null) { try { // 从队列中取继承MessagePack的实例 message = receivedQueen.poll(10, TimeUnit.SECONDS); } catch (InterruptedException e) { // TODO log } } return message; } public void addSocketMessage(MessagePack message) { if (message != null) { try { boolean success = receivedQueen.offer(message, 15, TimeUnit.SECONDS); if (false == success) { // maybe PushRecvThread is break,restart the thread again if (reStartThreadCount < 10) { pool.submit(new PushRecvThread()); reStartThreadCount++; } } else { } } catch (InterruptedException e) { // TODO log } } return; }
正真的处理逻辑的代码是写在这些继承MessagePack的抽象类里的,里面的一个onHandler方法是必须实现的,所以使用了抽象类。
下面代码的onHandler中,就可以写那些调用service层的,处理数据库,发邮件,调用接口啊等各种需求操作了。
public class TestCategoryMsg extends MessagePack { private static final String MESSAGE_NAME = "TEST_MESSAGE"; // 消息名称 public TestCategoryMsg(String msg, Channel channel) { super(msg, channel); } @Override public void onHandler(ApplicationContext ctx) { channel.write("---------------i dont know why--------------"); } public String getName() { return MESSAGE_NAME; } }
到此基本上一个服务器从接受数据,到回应数据的流程已经走完了。
我想上源码,可是看不到附件... 我表示无能为力啦 !
让我们继续前行
----------------------------------------------------------------------
努力不一定成功,但不努力肯定不会成功。
共勉。
相关推荐
基于Docker搭建FastDFS存储服务器并实现对应接口 基于策略模式+工厂模式解除WebSocket的信息处理类的耦合 基于SpringCloud使用Nacos + Getway + OpenFeign实现微服务注册与互相调用 基于Hystrix 熔断器对部分接口做...
基于netty 开发 rtmp 推拉流服务器代码,可用ffmpeg代码测试
java课程设计大作业,java、算法练手项目,适合初学java、数据结构的同学拿来学习研究,基于java、GUI开发的小游戏,程序都经过测试,可以直接运行,资源含程序运行所需的源码、资源文件等全部数据,有需要的可放心...
基于Netty NIO形式通过 WebSocket 协议进行通讯 基于Redis GeoHash 数据结构实现检索用户「附近的人」 基于Docker搭建FastDFS存储服务器并实现对应接口 基于策略模式+工厂模式解除WebSocket的信息处理类的...
基于 Netty 4.x 的轻量级点对点框架 目前正在建设中,第一个工作版本预计于 2015 年 6 月发布。 该框架旨在提供一个易于使用的点对点接口,用于无服务器应用程序。 它包括 TCP 和 UDP 连接。 该框架正在构建以适应...
jbus基于java netty的TCP透传服务器功能接收透传网关的TCP连接将网关作为一个设备,向mqtt服务器发布来自设备的数据消息通过向mqtt服务器订阅命令消息,将来自mqtt服务器的命令消息,转发给网关工具服务器状态监视...
使用SpringBoot2.x集成Netty4.x创建基于TCP/IP协议的服务端和客户端的Demo代码案例 使用了SpringBoot+swaggerUI方式进行测试,客户端可以给服务端发送消息,服务端也可以给已经连上的客户端发送消息,使用了通道保活...
这是一个基于netty的websocket服务器与客户端代码。 可基于客户端代码直接开发项目。 将原来的demo进行了一些业务整合 目录结构 |- xpush |——push-client |————org.yyx.message.push.client |——————...
Java整合springboot2.3+modbusTcp协议+netty高性能物联网服务源码 ...5、完全支持Modbus TCP 4种部署模式: TCP服务器master,TCP客户端slave,TCP服务器slave,TCP客户端master。 本人QQ412961810,有问题可以帮忙解答
充分利用设备管理云平台「Java,SPA,Spring Boot 2.0,Netty,Vue.js 2.0,Element 2.0」项目描述Java&Vue.js全栈项目,大规模扩展设备管理云平台,由以下几部分组成: Java前端服务器,基于Vue.js的Web前端“ SPA...
分布式(多进程)架构,几行代码实现一个功能服务器的搭建 多线程设计,注解方式配置,轻松管理所有消息流 强大的RPC功能,调用远程RPC近似于调用本地函数,无需手工定义内部协议 支持插件功能,轻松实现功能插件 框架...
huxin项目是一套聊天系统,包括前台手机界面及后台分布式系统,基于SpringBoot+Netty+MUI+H5Plus+Nginx+FastDFS分布式文件系统搭建的聊天系统。 前端聊天系统包含首页门户登录注册、互信、通讯录、发现、我等模块,...
基于Netty,实现JT808 JT/T808部标协议的消息处理,与编码解码; 使用SpringBoot + MyBatis提供数据入库、Web接口服务; 协议部分不依赖Spring,可移除Spring独立运行(支持Android客户端); 最简洁、清爽、易用的...
分布式架构 漫谈分布式架构 初识分布式架构与意义 如何把应用从单机扩展到分布式 大型分布式架构演进过程 ... 基于Netty实现Dubbo多协议通信支持 Netty无锁化串行设计及高并发处理机制 手写实现多协议RPC框架
spring cloud集成全渠道在线客服以及后台管理系统,所有微服务之间的消息传递采用区块链技术,持续更新中~~目标是:第一阶段:采用spring cloud微服务搭建后台管理系统第二阶段:集成在线客服,采用netty实现...
RW-HPS第三方Rust战争服务器这是一个基于Netty的服务器初步作为一个高性能高可用的服务器为玩家提供更好的游戏体验执照本服务端遵守GNU通用公共许可证v3.0不会支持的游戏协议列表相关,如ADD列表,更新列表,删除...
使用介绍:基于SpringBoot + Rpc-Netty-Framework前后端分离、控制与持久层分离的分布式博客系统。 项目亮点: 1. 使用Interceptor + JWT + Hash + Zset 做用户权限双重认证、构建用户热搜词汇排行榜,同时过滤...
Artemis是通过基于netty的非堵塞IO架构开发的,拥有出色的性能。由jboss捐献的HornetQ的衍生版本,可能作为下一代ActiveMQ的存在。 Artemis 服务器 选择Artemis的原因开源、高可用性、Java编写、高性能的日志保证...
它基于事件和异步,依托于全异步Java服务器Netty,并扩展了很多其他特性。 github地址:https://github.com/fengzhizi715/NetDiscovery 一. 爬虫框架的功能 爬虫框架包含爬虫引擎(SpiderEngine)和爬虫(Spider)。...
基于Netty实现SocketIO的实时推送系统。支持命名空间、二进制数据、SSL、ACK等功能。 环境搭建 开发工具: MySql:数据库 jetty:开发服务器 Tomcat:应用服务器 SVN|Git:版本管理 Nginx:反向代理服务器 Varnish:...