博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的Triggers
阅读量:7071 次
发布时间:2019-06-28

本文共 14310 字,大约阅读时间需要 47 分钟。

本文主要研究一下flink的Triggers

Trigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/Trigger.java

@PublicEvolvingpublic abstract class Trigger
implements Serializable { private static final long serialVersionUID = -4104633972991191369L; public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception; public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception; public boolean canMerge() { return false; } public void onMerge(W window, OnMergeContext ctx) throws Exception { throw new UnsupportedOperationException("This trigger does not support merging."); } public abstract void clear(W window, TriggerContext ctx) throws Exception; // ------------------------------------------------------------------------ public interface TriggerContext { long getCurrentProcessingTime(); MetricGroup getMetricGroup(); long getCurrentWatermark(); void registerProcessingTimeTimer(long time); void registerEventTimeTimer(long time); void deleteProcessingTimeTimer(long time); void deleteEventTimeTimer(long time);
S getPartitionedState(StateDescriptor
stateDescriptor); @Deprecated
ValueState getKeyValueState(String name, Class stateType, S defaultState); @Deprecated ValueState getKeyValueState(String name, TypeInformation stateType, S defaultState); } public interface OnMergeContext extends TriggerContext { > void mergePartitionedState(StateDescriptor
stateDescriptor); }}复制代码
  • Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult
  • onElement在每个element添加到window的时候会被回调;onProcessingTime在注册的event-time timer触发时会被回调;onEventTime在注册的processing-time timer触发时会被回调
  • canMerge用于标识是否支持trigger state的合并,默认返回false;onMerge在多个window合并的时候会被触发;clear用于清除TriggerContext中存储的相关state
  • Trigger还定义了TriggerContext及OnMergeContext;TriggerContext定义了注册及删除EventTimeTimer、ProcessingTimeTimer方法,同时还定义了getCurrentProcessingTime、getMetricGroup、getCurrentWatermark、getPartitionedState、getKeyValueState、getKeyValueState方法
  • OnMergeContext继承了TriggerContext,它多定义了mergePartitionedState方法

TriggerResult

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/TriggerResult.java

public enum TriggerResult {	CONTINUE(false, false),	FIRE_AND_PURGE(true, true),	FIRE(true, false),	PURGE(false, true);	// ------------------------------------------------------------------------	private final boolean fire;	private final boolean purge;	TriggerResult(boolean fire, boolean purge) {		this.purge = purge;		this.fire = fire;	}	public boolean isFire() {		return fire;	}	public boolean isPurge() {		return purge;	}}复制代码
  • TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性,CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
  • fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据
  • CONTINUE表示不对window做任何操作;FIRE_AND_PURGE表示要触发window的computation操作然后清理window的窗口数据;FIRE表示仅仅触发window的computation操作但不清理window的窗口数据;PURGE表示不触发window的computation操作但是要清理window的窗口数据

EventTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

@PublicEvolvingpublic class EventTimeTrigger extends Trigger
{ private static final long serialVersionUID = 1L; private EventTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteEventTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the watermark is not yet past the end of the merged window // this is in line with the logic in onElement(). If the watermark is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentWatermark()) { ctx.registerEventTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "EventTimeTrigger()"; } public static EventTimeTrigger create() { return new EventTimeTrigger(); }}复制代码
  • EventTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger
  • onElement在window.maxTimestamp()小于等于ctx.getCurrentWatermark()的时候,返回TriggerResult.FIRE,否则执行ctx.registerEventTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime在time等于window.maxTimestamp()的时候返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.CONTINUE
  • canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerEventTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteEventTimeTimer(window.maxTimestamp())

ProcessingTimeTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java

@PublicEvolvingpublic class ProcessingTimeTrigger extends Trigger
{ private static final long serialVersionUID = 1L; private ProcessingTimeTrigger() {} @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) { ctx.registerProcessingTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) { return TriggerResult.FIRE; } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.deleteProcessingTimeTimer(window.maxTimestamp()); } @Override public boolean canMerge() { return true; } @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { // only register a timer if the time is not yet past the end of the merged window // this is in line with the logic in onElement(). If the time is past the end of // the window onElement() will fire and setting a timer here would fire the window twice. long windowMaxTimestamp = window.maxTimestamp(); if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) { ctx.registerProcessingTimeTimer(windowMaxTimestamp); } } @Override public String toString() { return "ProcessingTimeTrigger()"; } public static ProcessingTimeTrigger create() { return new ProcessingTimeTrigger(); }}复制代码
  • ProcessingTimeTrigger继承了Trigger,element类型为Object,窗口类型为TimeWindow;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger
  • onElement执行ctx.registerProcessingTimeTimer(window.maxTimestamp()),然后返回TriggerResult.CONTINUE;onEventTime返回TriggerResult.CONTINUE;onProcessingTime则返回TriggerResult.FIRE
  • canMerge返回true;onMerge在window.maxTimestamp()大于ctx.getCurrentWatermark()的时候会执行ctx.registerProcessingTimeTimer(windowMaxTimestamp);clear则执行ctx.deleteProcessingTimeTimer(window.maxTimestamp())

NeverTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java

@Internal	public static class NeverTrigger extends Trigger
{ private static final long serialVersionUID = 1L; @Override public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {} @Override public void onMerge(GlobalWindow window, OnMergeContext ctx) { } }复制代码
  • NeverTrigger的onElement、onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;GlobalWindows默认使用的是NeverTrigger

CountTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java

@PublicEvolvingpublic class CountTrigger
extends Trigger
{ private static final long serialVersionUID = 1L; private final long maxCount; private final ReducingStateDescriptor
stateDesc = new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE); private CountTrigger(long maxCount) { this.maxCount = maxCount; } @Override public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception { ReducingState
count = ctx.getPartitionedState(stateDesc); count.add(1L); if (count.get() >= maxCount) { count.clear(); return TriggerResult.FIRE; } return TriggerResult.CONTINUE; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) { return TriggerResult.CONTINUE; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { return TriggerResult.CONTINUE; } @Override public void clear(W window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(stateDesc).clear(); } @Override public boolean canMerge() { return true; } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { ctx.mergePartitionedState(stateDesc); } @Override public String toString() { return "CountTrigger(" + maxCount + ")"; } public static
CountTrigger
of(long maxCount) { return new CountTrigger<>(maxCount); } private static class Sum implements ReduceFunction
{ private static final long serialVersionUID = 1L; @Override public Long reduce(Long value1, Long value2) throws Exception { return value1 + value2; } }}复制代码
  • CountTrigger继承了Trigger,指定了element类型为Object类型;它定义了maxCount及ReducingStateDescriptor;其中ReducingStateDescriptor用于窗口计数(它使用的是自己定义的Sum函数),在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;onEventTime、onProcessingTime均返回TriggerResult.CONTINUE;canMerge返回true;onMerge执行的是ctx.mergePartitionedState(stateDesc);clear执行的是ctx.getPartitionedState(stateDesc).clear()

PurgingTrigger

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java

@PublicEvolvingpublic class PurgingTrigger
extends Trigger
{ private static final long serialVersionUID = 1L; private Trigger
nestedTrigger; private PurgingTrigger(Trigger
nestedTrigger) { this.nestedTrigger = nestedTrigger; } @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx); return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult; } @Override public void clear(W window, TriggerContext ctx) throws Exception { nestedTrigger.clear(window, ctx); } @Override public boolean canMerge() { return nestedTrigger.canMerge(); } @Override public void onMerge(W window, OnMergeContext ctx) throws Exception { nestedTrigger.onMerge(window, ctx); } @Override public String toString() { return "PurgingTrigger(" + nestedTrigger.toString() + ")"; } public static
PurgingTrigger
of(Trigger
nestedTrigger) { return new PurgingTrigger<>(nestedTrigger); } @VisibleForTesting public Trigger
getNestedTrigger() { return nestedTrigger; }}复制代码
  • PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理

小结

  • Trigger接收两个泛型,一个是element类型,一个是窗口类型;它定义了onElement、onProcessingTime、onEventTime、canMerge、onMerge、clear几个方法,其中onElement、onProcessingTime、onEventTime均需要返回TriggerResult;TriggerResult用于表示trigger在onElement、onProcessingTime、onEventTime被回调时返回的action枚举,它有fire、purge两个属性(fire表示是否要触发window的computation操作;而purge表示是否要清理window的窗口数据),CONTINUE、FIRE_AND_PURGE、FIRE、PURGE五个枚举
  • SlidingEventTimeWindows、TumblingEventTimeWindows、EventTimeSessionWindows、DynamicEventTimeSessionWindows默认都使用EventTimeTrigger;SlidingProcessingTimeWindows、TumblingProcessingTimeWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows默认都使用ProcessingTimeTrigger;GlobalWindows默认使用的是NeverTrigger
  • CountTrigger主要用于计数的窗口类型,它使用ReducingStateDescriptor来进行窗口计数,在onElement方法里头,当计数大于等于maxCount时,则会清空计数,然后返回TriggerResult.FIRE,否则返回TriggerResult.CONTINUE;PurgingTrigger是包装型的Trigger,它包装了nestedTrigger,其onElement、onEventTime、onProcessingTime根据nestedTrigger的返回结果,在triggerResult.isFire()为true的时候,包装返回TriggerResult.FIRE_AND_PURGE;canMerge、onMerge、clear等方法均是委托给nestedTrigger处理

doc

转载地址:http://pokml.baihongyu.com/

你可能感兴趣的文章
《王牌特工2》情景再现,Youbionic推出可穿戴式机械手
查看>>
雪城大学信息安全讲义 五、竞态条件
查看>>
干货分享:MySQL之化险为夷的【钻石】抢购风暴
查看>>
量子通信能否跨越“死亡之谷”?2017年市场化的量子通信产品可能产生
查看>>
有序顺序表合并
查看>>
设计模式-观察者模式
查看>>
Spring4-自动装配Beans-按属性名称自动装配
查看>>
精通比特币系列---挖矿与共识
查看>>
to use extended Windows dialogs
查看>>
3A级VR游戏将至?汪丛青力挺G胖正在开发的三款VR游戏
查看>>
Mongodb 3.2 Manual阅读笔记:CH9 存储
查看>>
关于同一线程两次调用EnterCriticalSection的测试
查看>>
说说网络通信模型
查看>>
Wireshark网络抓包(二)——过滤器
查看>>
Ubuntu系统主题及插件工具等官方地址
查看>>
Linux 特殊目录
查看>>
AnguarJs-01-HelloWorld
查看>>
实现前端MD5加密与记住用户名密码功能
查看>>
command for cut
查看>>
Fortinet安全能力融入华为CloudEPN 联合防御网络威胁
查看>>