`
xiangxingchina
  • 浏览: 506269 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于netty写的网络通信框架

    博客分类:
  • NIO
 
阅读更多
最近在做一个项目,用到了远程调用方式,开始采用的是rmi,后来经过测试,rmi可能无法达到项目的一些性能上的要求,于是采用了基于tcp/udp的netty,但是直接用netty开发,有些麻烦了,我们想把服务抽取出来部署在远程服务器上,开发的兄弟们只是在自己的项目中负责调用一下,就跟rmi类似,非常方便。
    但是又有一个问题,调用的兄弟需要在web中请求这种tcp服务,netty内部是异步处理机制,http是伪长连接,调用结束后,异步请求还没有返回,http连接就断开了,返回的是null。所以这个问题要解决一下。

    下面说下封装的各个类的代码吧
    首先当客户端对远程服务器发起tcp请求时,这时候请求一般会到达服务器端的handler里,我写的这个handler继承了netty的SimpleChannelUpstreamHandler,代码如下:
Java代码  收藏代码

    public abstract class ChannelServerHandler extends SimpleChannelUpstreamHandler { 
        private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelServerHandler.class); 
        protected final Map<String, InvokeHandler> handlers = new HashMap<String, InvokeHandler>(); 
        protected final Map<String, Method> initMethods = new HashMap<String, Method>(); 
     
        public ChannelServerHandler() { 
            WSCFInit.register(handlers, initMethods); 
        } 
         
        protected abstract void processor(Channel channel, Object message); 
     
     
        @Override 
        public final void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
            Transport t = (Transport) e.getMessage(); 
            String className = t.getClazz(); 
            String methodName = t.getMethod(); 
            logger.info("Invoke Handler:" + className + ", Invoke Method:" + methodName); 
            processor(ctx.getChannel(), t); 
        } 


里面有几个变量需要解释一下,handlers是开发tcp服务端的handler存放的map,initMethods是里面需要调用的方法,通过WSCFInit类来进行初始化工作。

它主要做了如下工作,在服务器端Server启动的时候,扫描固定包下的handler和他们的方法,然后以clazz+method的方式存放在handlers和initMthods这两个map中。
Java代码  收藏代码

    Reflections reflections = new Reflections("packagename"); 
                Set<Class<?>> annotated = reflections.getTypesAnnotatedWith((Class<? extends Annotation>) annClass); 
                Iterator<Class<?>> it = annotated.iterator(); 
                while (it.hasNext()) { 
                    Class<?> next = it.next(); 
                    if (next.isAnnotationPresent(Handler.class)) { 
                        Annotation ann = (Annotation) next.getAnnotation((Class<? extends Annotation>) annClass); 
                        handlers.put(((Handler) ann).name(), (InvokeHandler) next.newInstance()); 
     
                        Method[] methods = next.getDeclaredMethods(); 
                        for (Method method : methods) { 
                            if (method.isAnnotationPresent(Remote.class)) { 
                                Remote path = method.getAnnotation(Remote.class); 
                                initMethods.put(((Handler) ann).name() + path.url(), method); 
                            } 
                        } 
     
                    } 
                } 



protected abstract void processor(Channel channel, Object message);这个方法具体的逻辑是由它的子类来处理的。

再看一下ServerHandler类里面processor的代码,这个类继承了ChannelServerHandler
Java代码  收藏代码

    @Override 
        protected void processor(Channel channel, Object message) { 
            Transport transport = (Transport) message; 
     
            InvokeHandler handler = handlers.get(transport.getClazz()); 
     
            Object[] params = (Object[]) transport.getMessage(); 
            Object ret = null; 
            try { 
                Method method = initMethods.get(transport.getClazz() + transport.getMethod()); 
                if (method == null) { 
                } else { 
                    ret = method.invoke(handler, params); 
                } 
                ServerSender sender = new ServerSender(channel, transport); 
                sender.send(ret); 
            } catch (Exception e) { 
                throw new IllegalAccessError(e.getMessage()); 
            } 
     
        } 



客户端向服务器端发起的请求真正处理的逻辑在这个方法里面,这个方法在处理完调用了相应的服务端handler进行响应后,会将需要返回给客户端的信息封装在transport这个对象然后传递出去,这个对象是封装服务器和客户端通信消息的。

那么Transport这个类定义了些什么内容呢
Java代码  收藏代码

    public final class Transport implements Serializable { 
     
        private static final long serialVersionUID = 1675991188209117209L; 
        private String clazz; 
        private String method; 
        private Object message; 
        private String key; 



clazz是要调用的handler的注解名,method是要调用的方法的注解名,message是封装通信消息的,key代表一个token,客户端将这个token发给服务器端,服务器端根据这个token进行查找,最后将token和处理结果一起返回给客户端。


ok,处理服务端信息的handler看完了,我们再来看看客户端的
Java代码  收藏代码

    public abstract class ChannelClientHandler extends SimpleChannelUpstreamHandler { 
        private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelClientHandler.class); 
        public final Map<String, ResultHandler> ret = new ConcurrentHashMap<String, ResultHandler>(); 
     
        protected abstract void processor(Channel channel, Object message); 
     
        @Override 
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
            processor(ctx.getChannel(), e.getMessage()); 
        } 
         
     
    } 



ret是封装服务器端返回结果的,它的子类负责实现processor方法。
子类代码如下
Java代码  收藏代码

    @Override 
        protected void processor(Channel channel, Object message) { 
            Transport t = (Transport) message; 
            String key = t.getClazz() + t.getMethod() + t.getKey(); 
            ResultHandler r = ret.remove(key); 
            r.processor(t.getMessage()); 
             
        } 



ResultHandler这个是我定义的一个接口,专门处理异步返回的结果,可以通过匿名函数调用
Java代码  收藏代码

    public interface ResultHandler<T> { 
         
        public void processor(T message); 
     
    } 



上面说了如何处理netty内部的异步机制,让主线程能够等待异步返回的结果
处理代码如下
Java代码  收藏代码

    public Object get(String url, Object... params) { 
            class Result { 
                public Object o; 
            } 
     
            final Result ret = new Result(); 
            synchronized (ret) { 
                try { 
                    invoke(url, params, new ResultHandler() { 
                        @Override 
                        public void processor(Object message) { 
                            synchronized (ret) { 
                                ret.o = message; 
                                ret.notify(); 
                            } 
                        } 
                    }); 
                    ret.wait(); 
                } catch (InterruptedException e) { 
                }  
                return ret.o; 
            } 
        } 



当客户端调用get方法时候,就可以得到服务器端异步返回的结果了。但是对客户端来说,他感觉到的是同步的调用。

最后我定义了一个InvokeHandler,当开发者开发服务端程序时候,需要实现这个接口,定义自己的handler
类似如下

Java代码  收藏代码

    @Handler(name="testhandler") 
    public class Server1 implements InvokeHandler   
    {   
        @Remote(url="test2") 
        public String say(String msg) { 
            System.out.println(msg); 
            return "hi"; 
        } 
         
        @Remote(url= "test2") 
        public String say2(Person p) { 
            System.out.println(p.getId()); 
            return p.getName(); 
        } 
       
    }   



上面定义的这些注解,在WSCFInit初始化的时候会放到一个map里面,类似于spring的配置文件。

最后再说说客户端是怎么调用的,在连接好服务端ip和port后,通过如下调用方式就可以了
Java代码  收藏代码

    public class Client { 
        private static ClientSender sender; 
         
        public static void main(String[] args) { 
            sender = ClientProxy.connect(ip, port); 
            Object msg = sender.get("tcp://testhandler/test1", "hello"); 
            System.out.println(msg); 
     
            Person p = new Person(); 
            p.setId(1); 
            p.setName("zhangsan"); 
            Object o = sender.get("tcp://testhandler/test2", p); 
            System.out.println(o); 
     
     
        } 
    } 


这样就ok了,也可以自定义要传输的是对象还是xml还是json。同时可以方便的定义自己的解码器。完成自己的业务需求。


接上文,这个服务是基于netty的,每connect一次,就会在服务器上建立一个tcp连接,就是一对pipe,如果不及时释放,那么建立的pipe会越来越多,严重浪费服务器的资源。但是如果释放了,就失去了tcp长连接的作用了。所以折中一下,为了减少连接数,保证客户端的固定连接,服务端不变,在客户端加入连接池功能。
Java代码  收藏代码

    public synchronized ClientSender getClientSender() { 
            Channel channel = getChannel(); 
            if(!channel.isOpen()) { 
                connectPool.remove(channel); 
                channel = createConnect(address, port); 
                connectPool.addLast(channel); 
            } 
            return new ClientSender(channel, handler.ret, this); 
        } 


ok,这样就可以用到连接池功能了。每个客户端可以用到固定连接数了。

那么客户端调用的时候需要自己动手创建ConnectPool了
Java代码  收藏代码

    public class ClientProxy { 
        private static ConnectPool pool = null; 
     
        public static void connect(String address, int port) { 
            if (pool == null) { 
                synchronized (ClientProxy.class) { 
                    if (pool == null) { 
                        pool = ConnectPool.createProxy(address, port); 
                    } 
                } 
            } 
        } 
     
        public static Object get(String url, Object... params) { 
            if(pool == null) { 
                throw new IllegalStateException("must invoke connect method first"); 
            } 
             
            ClientSender sender = pool.getClientSender(); 
            Object msg =  sender.get(url, params); 
            sender.free(); //归还连接 
            return msg; 
        } 
     
    } 



调用方式就不再是上面那样了,要如下调用:
Java代码  收藏代码

    ClientProxy.connect(ip, port); 
            Person p = new Person(); 
            p.setId(12); 
            p.setName("zhangsan"); 
             
            Object msg =  ClientProxy.get("tcp://server/test1", p); 
             
            System.out.println(msg); 



来看下控制台
Java代码  收藏代码

    Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool 
    INFO: 当前连接池中连接数量:5 
    Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool 
    INFO: 连接池中剩余连接数量:4 
    zhangsan 



上次加了连接池功能,后来又提出了一个需求,就是,原来是主线程一直在等待异步线程返回,如果没有返回,主线程就阻塞了,进行不下去。后来发现这个太受限制了。主线程可以先边做自己的事情边等待异步线程处理,符合nio的事件处理机制。于是基于上一点,改造成基于异步的同步。

Java代码  收藏代码

    private static class Result { 
            Object obj; 
        } 
     
        final Result f = new Result(); 
        private volatile boolean hasNotified = false; //异步线程是否结束的标志 
     
        public void preGet(String url, Object... params) { 
            invoke(url, params, new ResultHandler() { 
                @Override 
                public void processor(Object message) { 
                    synchronized (f) { 
                        f.obj = message; 
                        f.notify(); //异步线程结束,唤醒客户端线程 
                        hasNotified = true; 
                    } 
                } 
            }); 
        } 
     
        public Object get() { 
            synchronized (f) { 
                if (!hasNotified) //如果异步线程没有结束,则客户端线程等待 
                    try { 
                        f.wait(); 
                    } catch (InterruptedException e) { 
                        throw new RuntimeException(e); 
                    } 
            } 
            free(); // 释放连接 
            return f.obj; 
        } 



测试如下:
Java代码  收藏代码

    ClientProxy.connect(ip, port); 
            Person p = new Person(); 
            p.setId(12); 
            p.setName("zhangsan"); 
             
            Future future =  ClientProxy.get("tcp://server/test1", p); 
            String str = "hello "; //主线程继续做自己的事情,边做边等待异步返回 
             
            System.out.println(str + future.get()); 



最后打印结果
Java代码  收藏代码

    Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool 
    INFO: 当前连接池中连接数量:5 
    Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool 
    INFO: 连接池中剩余连接数量:4 
    Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool 
    INFO: 归还连接后连接池中连接的数量:5 
    hello zhangsan 



还有一种情况,主线程等待一段时间后,在规定时间内没有返回,主线程就不等待了。
所以代码改造下,加入超时功能
Java代码  收藏代码

    public Object get() { 
            return get(0); 
        } 
         
        public Object get(long timeout) { 
            synchronized (f) { 
                if (!hasNotified) 
                    try { 
                        f.wait(timeout); //主线程等待一定时间 
                    } catch (InterruptedException e) { 
                        throw new IllegalStateException(e); 
                    } 
            } 
            free(); // 释放连接 
            return f.obj; 
        } 



客户端调用,比如1个毫秒没返回,就不再等待了,主线程可以自行处理。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics