失效链接处理 |
Java编程方法论 之 Spring Reactor Reactor-Netty Spring Webflux 全面解读 PDF 下载
本站整理下载:
提取码:rogo
相关截图:
![]()
主要内容:
ChannelPool 设计实现解读
回到Connection的设定这节最初,我们主要是通过 ChannelPool 来解决与多个服务端交互以及与单
个服务端建立多个连接的问题。那么这里就来对 ChannelPool 其中的设计与实现进行探索一番。
ChannelPool ,顾名思义,就是一个管理 channel 的容器,里面包含了从容器里获取 channel ,
将使用的 channel 放回容器中,还有一个就是关闭容器,于是,就有下面这个接口设计:
作为一个容器资源,为了让 ChannelPool 具备自动关闭的能力,即可通过 try-with-resources 的
编码方式释放资源, ChannelPool 继承 Closeable 接口,在关闭释放容器这块儿它的实现类只需
实现 close() 即可。 同时,为了实现 ChannelPool 作为容器的属性,这里采用 Deque 来做元素
存储,为了应对并发操作,同时针对不同的jdk版本做了兼容实现,然后在 ChannelPool 实现类
SimpleChannelPool 中进行调用:
public interface ChannelPool extends Closeable {
Future<Channel> acquire();
Future<Channel> acquire(Promise<Channel> promise);
Future<Void> release(Channel channel);
Future<Void> release(Channel channel, Promise<Void> promise);
@Override
void close();
} 123456789
10
11
12
13
假如我们想要从容器中获取或者返还一个 channel 时候附加一些自定义的动作,假如是纯粹的
JDK8+ 的环境的话,我们完全可以通过我们之前接触的函数式接口进行外包策略的设计。但是,为
了保证兼容性, netty 就不得不去做妥协,使用传统的一些代码设计手段,不过,依然是外包策
略,通过一个动作接口来进行,在这个接口中我们分别设定初始化 channel 时进行的动作,从
deque 中获取 channel 时进行的动作,将 channel 返还给 deque 时进行的动作,此接口设计代
码如下:
但我们还漏了一点,通过 channelPool 的 acquire() 方法获取到一个 channel ,直到这个
channel 返还回去,这个期间要做的动作是不是也应该封装一个接口规范,方便使用者做对接,那
索性在调用 io.netty.channel.pool.SimpleChannelPool#acquire() 方法的时候返回一个
Future<Channel> 类型的对象,然后在这个 Future<Channel> 对象上添加一个监听器,里面存放
我们的对接逻辑,这样,在 SimpleChannelPool#acquire() 获取成功时就会执行我们所定义监听
器中的逻辑,也就自然做到了类函数式的封装设计。 channelPool 中获取一个 channel ,在刚开
始的时候就绝对会涉及到创建 channel ,那么上面 channelCreated(Channel ch) 这个动作就应
该放在一个 channelHandler 中进行,那在 SimpleChannelPool 中定义一个 Bootstrap 类型的
字段,在我们在外围将 bootstrap 配置完后传入这个类中即可,并在 SimpleChannelPool 的构造
器中给 bootstrap 进行添加 channelHandler 的设定:
//io.netty.util.internal.PlatformDependent#newConcurrentDeque
public static <C> Deque<C> newConcurrentDeque() {
if (javaVersion() < 7) {
return new LinkedBlockingDeque<C>();
} else {
return new ConcurrentLinkedDeque<C>();
}
}
//io.netty.channel.pool.SimpleChannelPool
private final Deque<Channel> deque = PlatformDependent.newConcurrentDeque();
123456789
10
//io.netty.channel.pool.ChannelPoolHandler
public interface ChannelPoolHandler {
void channelCreated(Channel ch) throws Exception;
void channelAcquired(Channel ch) throws Exception;
void channelReleased(Channel ch) throws Exception;
} 123456789
这样,在初始化 channel 的时候就会执行我们所设定的 channelCreated(Channel ch) ,接下来
就是从 channelPool 中获取 channel ,刚开始从 deque 中获取,获取不到,那就调用
io.netty.bootstrap.Bootstrap#connect() 。新建的 channel 由于建完就拿来用了,并没有
放入 deque 中,无须调用在 ChannelPoolHandler 接口中定义的 channelAcquired ,如果是从
deque 中获取到的 channel ,那么就必须对该 channel 进行一些检查,这个检查主要还是针对我
们的自定义逻辑,也就是 channelAcquired :
|