什么是Reactor方式
Reactor方式一般翻译成反应器方式,也有人称为分发者方式。是根据事情驱动的设计方式,拥有一个或多个并发输入源,有一个服务处理器和多个恳求处理器,服务处理器会同步地将输入的恳求事情以多路复用的方式分发给相应的恳求处理器。简略来说就是由一个线程来接纳所有的恳求,然后派发这些恳求到相关的作业线程中。
为什么运用Reactor方式
在java中,没有NIO呈现之前都是运用Socket编程。Socket接纳恳求是堵塞的,需求处理完一个恳求才能处理下一个恳求,所以在面临高并发的服务恳求时,功能就会很差。
那有人就会说运用多线程(如下图所示)。接纳到一个恳求,就创立一个线程处理,这样就不会堵塞了。实际上这样的确是能够在提高功能上起到必定的效果,可是当恳求许多的时候,就会创立大量的线程,保护线程需求资源的耗费,线程之间的切换也需求耗费功能。并且体系创立线程的数量也是有限的,所以当高并发时,会直接把体系拖垮。
6ecb081aa3b7e22408262674075d227a.webp
因而,根据Java,DougLea提出了三种方式的Reactor方式:单Reactor单线程、单Reactor多线程和多Reactor多线程。
在Reactor方式中有三个重要的角色:
Reactor:担任呼应事情,将事情分发到绑定了对应事情的Handler,假如是衔接事情,则分发到Acceptor;
Handler:事情处理器。担任履行对应事情对应的事务逻辑;
Acceptor:绑定了connect事情,当客户端建议connect恳求时,Reactor会将accept事情分发给Acceptor处理;
单Reactor单线程版别
2f70d1e5484265c9e59e954f30acf1d0.webp单Reactor单线程
只有一个Selector循环承受恳求,客户端注册进因由Reactor接纳注册事情,然后再由Reactor分发出去,由对应的Handler进行事务逻辑处理。
伪代码实例
classReactorimplementsRunnable{
finalSelectorselector;
finalServerSocketChannelserverSocket;
Reactor(intport)throwsIOException{
selector=Selector.open();
serverSocket=ServerSocketChannel.open();
serverSocket.socket().bind(newInetSocketAddress(port));
serverSocket.configureBlocking(false);
SelectionKeysk=serverSocket.register(selector,SelectionKey.OP_ACCEPT);
sk.attach(newAcceptor());
}
publicvoidrun(){
try{
while(!Thread.interrupted()){
selector.select();
Setselected=selector.selectedKeys();
Iteratorit=selected.iterator();
while(it.hasNext()){
dispatch((SelectionKey)(it.next()))
}
selected.clear();
}
}catch(IOExceptione){
}
}
voiddispatch(SelectionKeyk){
Runnabler=(Runnable)(k.attachment());
if(r!=null){
r.run();
}
}
classAcceptorimplementsRunnable{
publicvoidrun(){
try{
SocketChannelc=serverSocket.accept();
if(c!=null){
newHandler(selector,c);
}
}catch(IOExceptione){
}
}
}
}finalclassHandlerimplementsRunnable{
finalSocketChannelsocket;
finalSelectionKeysk;
ByteBufferinput=ByteBuffer.allocate(MAXIN);
ByteBufferoutput=ByteBuffer.allocate(MAXOUT);
staticfinalintREADING=0,SENDING=1;
intstate=READING;
Handler(Selectorsel,SocketChannelc)throwsIOException{
socket=c;
c.configureBlocking(false);
//optionallytryfirstreadnowsk=socket.register(sel,0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
/**
*selector.wakeup();唤醒堵塞在select办法上的线程,使其当即返回
*/sel.wakeup();
}
booleaninputIsComplete(){/*……*/}
booleanoutputIsComplete(){/*……*/}
voidprocess(){/*……*/}
publicvoidrun(){
try{
if(state==READING){
read();
}elseif(state==SENDING){
send();
}
}catch(IOExceptione){
}
}
voidread()throwsIOException{
socket.read(input);
if(inputIsComplete()){
process();
state=SENDING;
//Normallyalsodofirstwritenowsk.interestOps(SelectionKey.OP_WRITE);
}
}
voidsend()throwsIOException{
socket.write(output);
if(outputIsComplete()){
sk.cacel();
}
}
}
这儿需求注意的两点是
Selector.wakeup()办法的效果是:唤醒堵塞在select办法上的线程,使其当即返回。
在Reactor.dispatch()办法中,调用的是任务的run办法,同步履行。
单线程的问题实际上是很明显的。只需其中一个Handler办法堵塞了,那就会导致所有的client的Handler都被堵塞了,也会导致注册事情也无法处理,无法接纳新的恳求。所以这种方式用的比较少,由于不能充分利用到多核的资源。因而,这种方式只是只能处理Handler比较快速完成的场景。
单Reactor多线程版别
11a6d51ced5f2a58530e20c7b6ef4a7a.webp单Reactor多线程
在多线程Reactor中,注册接纳事情都是由Reactor来做,其它的核算,编解码由一个线程池来做。从图中能够看出作业线程是多线程的,监听注册事情的Reactor还是单线程。
伪代码示例
publicclassHandlerimplementsRunnable{
finalSocketChannelsocket;
finalSelectionKeysk;
ByteBufferinput=ByteBuffer.allocate(Integer.MAX_VALUE);
ByteBufferoutput=ByteBuffer.allocate(Integer.MAX_VALUE);
staticfinalintREADING=0,SENDING=1;
intstate=READING;
staticExecutorServicepool=Executors.newCachedThreadPool();
Handler(Selectorsel,SocketChannelc)throwsIOException{
socket=c;
c.configureBlocking(false);
//optionallytryfirstreadnowsk=socket.register(sel,0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
booleaninputIsComplete(){returntrue;}
booleanoutputIsComplete(){returntrue;}
voidprocess(){}
publicvoidrun(){
try{
if(state==READING){
read();
}elseif(state==SENDING){
send();
}
}catch(IOExceptione){
}
}
voidsend()throwsIOException{
socket.write(output);
if(outputIsComplete()){
sk.cancel();
}
}
synchronizedvoidread()throwsIOException{
socket.read(input);
if(inputIsComplete()){
pool.execute(newProcesser());
}
}
synchronizedvoidprocessAndHandOff(){
process();
sk.attach(this);
sk.interestOps(SelectionKey.OP_WRITE);
}
classProcesserimplementsRunnable{
@Overridepublicvoidrun(){
processAndHandOff();
}
}
}
对于Reactor部分,代码不需求调整,由于也是单Reactor,Handler部分增加了线程池的支撑。
对比单Reactor单线程模型,多线程Reactor方式在Handler读写处理时,交给作业线程池处理,能够充分利用多核cpu的处理能力,由于Reactor分发和Handler处理是分隔的,不会导致Reactor无法履行。然后提高应用的功能。缺陷是Reactor只在主线程中运行,承担所有事情的监听和呼应,假如短时间的高并发场景下,仍然会造成功能瓶颈。
多Reactor多线程版别
3980bd7ae29d019d64a602fc864873ea.webp多Reactor多线程
也称为主从Reactor方式,在这种方式下,一般会有两个Reactor:mainReactor和subReactor。mainReactor担任监听客户端恳求,专门处理新衔接的树立,再将树立好的衔接注册到subReactor。subReactor将分配的衔接加入到队列进行监听,当有新的事情发生时,会调用衔接相对应的Handler进行事务处理。
这样的模型使得每个模块愈加专一,耦合度更低,能支撑更高的并发量。许多框架也运用这种方式。
Reactor方式的长处
呼应快,不用为单个同步时间所堵塞,尽管Reactor本身仍然是同步的。
能够最大程度地防止复杂的多线程及同步问题,并且防止多线程/进程的切换开支。
扩展性好,能够方便地经过增加Reactor实例个数来充分利用CPU资源。
复用性好,Reactor方式本身与详细事情处理逻辑无关,具有很高的复用性。
评论前必须登录!
注册