Java知识分享网 - 轻松学习从此开始!    

Java知识分享网

Java1234官方群25:java1234官方群17
Java1234官方群25:838462530
        
SpringBoot+SpringSecurity+Vue+ElementPlus权限系统实战课程 震撼发布        

最新Java全栈就业实战课程(免费)

springcloud分布式电商秒杀实战课程

IDEA永久激活

66套java实战课程无套路领取

锋哥开始收Java学员啦!

Python学习路线图

锋哥开始收Java学员啦!
当前位置: 主页 > Java文档 > Java基础相关 >

StreamTask with Mailbox PDF 下载


分享到:
时间:2020-06-29 16:36来源:http://www.java1234.com 作者:小锋  侵权举报
StreamTask with Mailbox PDF 下载
失效链接处理
StreamTask with Mailbox PDF 下载

本站整理下载:
提取码:rc1h 
 
 
相关截图:
 
主要内容:

3) Proposed changes 3.1) Changes in Stream Task We suggest to introduce a “mailbox” as field in stream task. One possible initial implementation of the mailbox could be ArrayBlockingQueue. We can later evaluate more efficient implementations for the multiple-producer-single-consumer case that we have to cover, maybe based on a Disruptor-style implementation based on a ringbuffer. This mailbox will be in the center of activities in the stream tasks main thread and (for most parts) takes over the role of the current StreamTask#run()-method, i.e. it becomes the driver of the event generating/processing. However, unlike StreamTask#run(), this method will also be responsible for executing checkpoint events and processing timer events. All such events simply will become tasks that are enqueued in the mailbox, and the stream task’s main thread will constantly pull and run the next event from the mailbox. This achieves mutually exclusive execution via the queue. As we want to be able to represent atomic sections in this model, one approach can be to represent such atomic actions as Runnable objects that are queued in the mailbox. Notice that it is ok for the task’s main thread to block in the execution of such Runnables and also for producers to block on their attempt to enqueue new actions to the queue. Case one would correspond to the case in current code that a longer critical section blocks under the checkpointing lock. The second case would be a thread blocking in the attempt to acquire the checkpointing lock. We could outline the basic changes to StreamTask as follows: BlockingQueue<Runnable> mailbox = ... void runMailboxProcessing() { //TODO: can become a cancel-event through mailbox eventually
Runnable letter; while (isRunning()) { while ((letter = mailbox.poll()) != null) { letter.run(); }defaultAction(); } }void defaultAction() { // e.g. event-processing from an input }This code is just representing the core idea and can potentially be optimized in many ways, e.g. different queue implementations, or representing very frequent events by certain marker singleton instances of Runnable that are processed differently, or using bulk methods on the queue like #drainTo, etc. Queue fairness might also be a consideration, but the current approach with synchronization on the checkpointing lock does not provide any fairness guarantees. We could also consider a variant where the “default action” is also just an event to the mailbox. 3.2) General changes in client code that is currently using the checkpointing lock We will now discuss how this model can replace the current checkpointing lock approach both for the 3 use-cases we discussed in the previous section. First of all, how can the actors in checkpointing, processing timers, and the event processing synchronize on the mailbox? Currently, the checkpointing lock is exposed to collaborating actors either via their constructor or through a getter. We nicely hide the mailbox behind a Queue interface (or similarly) and just expose the queue by passing into constructors or returning via getter. We can currently pass it along next to the checkpointing lock object, which we keep for backwards compatibility (see section 4). The code in the Runnable#run() implementations can be regarded as an atomic critical section because the mailbox will only continue work on the next event on full completion of the method.
3.3) Event generation and processing Use-case 1, the general event generation and processing, will be greatly simplified by our change. The mailbox ensures that all state changes come from a single thread and does no longer mutual-exclusiveness. This means we can drop the need for locking completely from those code paths. To use the mailbox model, we need to break-up the event processing loop of the run methods into methods that can process a bounded amount of events, for example a single event per invocation. For example, removing the loop from while (running && inputProcessor.processInput()) in One/TwoInputStreamTask and do one invocation to inputProcessor.processInput() at a time before checking the mailbox again for events from other actors. Please note that this matches our envisioned changes for selectable, (un)bounded task inputs (FLINK-11875). For sources, the situation seems a bit more complicated at first. The reason is that current source functions, from a high level view, work like an “endless”-loop of event generation. This is part of their public API and cannot be changed for backwards compatibility with custom sources. However, running the source function in a loop that never yields does not give our approach any opportunity to check the mailbox. We discuss a backwards compatible approach to adapt those “legacy” sources for the mailbox model in section 4. We are also currently discussing a new source interface (FLIP-27) that gives more control over the event generation loop and would fit very well for the mailbox model. This means that future source will probably be very easy to integrate. 3.4) Checkpoint and timer trigger One nice property of this approach is that it seems to fit already with timer and checkpointing events (use-case 2 and 3), because they are already executed on the form of Runnable objects to an async executor. We can now simply enqueue the action to the mailbox instead and can drop the locking. 4) Backwards compatibility with (legacy) sources As discussed in section 3.3, we need to provide a compatibility that allows us to run source functions that consists of an endless-loops of event generation. As such endless-loops cannot be integrated with a mailbox model without significant modification (checking the mailbox as part
of the loop) we need to think about how we can provide another approach for backwards compatibility between such source functions and the mailbox model. We will discuss a possible approach that executes stream tasks with such source functions different from other stream tasks. We can take a different branch because it is possible to detect such sources through the API and the different execution behaviour can also be an action that runs inside the original mailbox thread until the stream task terminates. The core idea behind the compatibility approach is that we will use two threads to run such source functions, the source function thread with the event generating loop and the mailbox thread of the stream task, that receives checkpointing, processing timer trigger, etc. as events. We make both threads mutually exclusive via the old checkpoint lock, which means that we run a modified version of the mailbox loop that blocks on the mailbox and executes mailbox events under the checkpointing lock (see figure). Whenever a mailbox event arrives, the mailbox thread will aim to acquire the checkpoint lock, taking it away from the source function thread. Under the lock, the mailbox action is performed exclusively. After that, the mailbox thread frees the lock again for the source unction thread and blocks in a waiting take()-call on the mailbox.

 
 
------分隔线----------------------------

锋哥公众号


锋哥微信


关注公众号
【Java资料站】
回复 666
获取 
66套java
从菜鸡到大神
项目实战课程

锋哥推荐