Flink处理机制的核心就是“有状态的流处理”,在某些情况下,一条数据的计算不仅要基于当前数据自身,还需要依赖数据流中的一些其他数据。这些在一个任务中,用来辅助计算的数据我们就称之为这个任务的状态。
一、按键分区状态(KeyedState)分类
按键分区状态是根据输入流中定义的key来进行维护和访问的,所有是定义在KeyedStream中的,也就是对datastream进行keyby之后才能使用按键分区状态。
按键分区状态支持的结构类型主要有以下几种:
(1)ValueState(值状态)
顾名思义 ,就是状态只保存一个值,这个值的类型可以是任何具体的数据类型。如ValueState<Long>。
publice interface ValueState<T> extends State {T value(); // 获取当前状态的值void update(T value); // 对状态进行更新
}
(2)ListState(列表状态)
以列表的形式保存数据。
publice interface ListState<T> extends State {Iterable<T> get(); // 获取当前的列表状态 是一个可迭代对象void update(List<T> values); // 传入一个列表,对列表状态进行覆盖更新void add(T value); // 在状态列表中添加一个元素void addAll(List<T> values); //添加多个元素
}
(3)MapState(映射状态)
把键值对作为状态保存起来。
publice interface MapState<K, V> extends State {V get(K key); // 获取传入key对应的value值void put(K key, V value); //更新key对应的valuevoid putAll(Map<K, V> map);void remove(K key); boolean contains(K key);Iterable<Map.Enter<K, V>> entries();Iterable<K> keys();Iterable<V> values();boolean isEmpty();
}
(4)ReducingState(归约状态)
归约状态保存的是进行归约计算后的结果值,也就是每add一个元素,都进行归约计算,并将归约结果保存为当前状态值,因此需要在归约状态描述器中声明一个归约函数。
(5)AggregatingState(聚合状态)
聚合状态与归约状态类似,聚合状态也是一个值,只不过聚合状态描述器传入的是一个更加一般化的聚合函数,可以重新定义中间状态和输出状态的类型。
二、状态生存时间(TTL,time-to-live)
在实际应用中,状态会随着时间的推移而逐渐增多,如果不加以限制,最终就会导致存储空间的耗尽。Flink可以为状态配置“生存时间”,当状态在内存中存活的时间超过设定的值时,就将他清除掉,调用clear方法可以清除状态。
但是,如果额外开启一个进程不断扫描所有的状态是否过期会占用大量资源且很多情况下是无用功,一个比较好的方法是:状态失效的时候不立即删除,之后如果有对这个状态进行访问,再判断是否已经失效、从而进行清除,则不需要另外开启进程进行扫描了。
配置状态的TTL时,首先需要创建一个StateTtlConfig配置对象,然后调用(状态描述器.enableTimeToLive())方法启动TTL功能。
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//创建状态和更新状态时才更新失效时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期值.build();ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>('my_state', String.class);stateDescriptor.enableTimeToLive(ttlConfig)
三、算子状态分类
算子状态针对当前算子任务有效,不对key进行隔离,与key无关,因此一个并行子任务上的不同key会访问到相同的算子状态。
(1)ListState(列表状态)
与上面类似。差别主要在于重分区时,按键分区状态的ListState是以keyGrouo的形式重新均衡发送到下游的,而算子状态的ListState是将所有数据收集到一起均匀分配。
(2)UnionListState(联合列表状态)
与ListState类似。差别主要在于重分区时,是将所有的状态一起发送到下游的每一个并行子任务上,列表状态太大时效率很低,不建议使用。
(3)BroadcastState(广播状态)
对所有的并行子任务保持同一份“全局”状态,一般用来做统一的规则或配置设定。这时所有的并行子任务都将访问同一个状态,就像是状态被广播了,注意没有真正广播。
广播状态必须基于广播流来创建。
(4)算子状态的持久化保存
与按键分区状态相比,算子状态的故障后重新恢复稍显复杂:因为故障重启后可能发生并行度调整,按键分区状态中相同的key仍然可以被分配到一个子任务上,然而算子状态下的数据所发往的分区可能会发生变化,那么如何保证原先的状态与故障恢复后数据的对应关系呢?
Flink提供了CheckpointedFunction接口,让我们可以根据业务需求自行设计状态的保存和恢复逻辑,这里就不展开说了。
四、状态持久化和状态后端
(1)开启检查点
对状态进行持久化保存,可以在发生故障后进行重启恢复 。Flink对状态进行持久化的方式,就是将状态写入检查点保存到外部存储系统中。具体的存储介质,一般是分布式文件系统。因此,要相对状态进行自动持久化保存,首先就要开启检查点。调用执行环境的.enableCheckpointing()方法就可以开启检查点。
env.enableCheckpointing(1000); //每隔1s保存检查点
(2)检查点的保存流程
检查点的保存主要是JobManager TaskManager和外部存储系统三者之间的协调。具体来说: 在应用触发检查点保存时,首先由JobManager向每个TaskManager发送触发检查点命令;TaskManager收到命令后,对当前任务的状态进行快照保存,持久化到远程的存储介质;完成后向JobManager返回确认消息;JobManager只有收到所有TaskManager确认消息,才会确认当前检查点保存成功。而这一切工作的协调,就需要一个“专职人员”来完成,也就是状态后端。
(3)状态后端(state backends)
状态后端就是Flink中负责状态的存储、访问以及维护的一个可插拔组件,主要负责两件事:一是本地的状态管理,而是将检查点写入远程的持久化介质。
状态后端可以分为两类:(默认)哈希表状态后端(HashMapStateBackend)、内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)。可以通过对执行环境调用.setStateBackend()方法设置状态后端类型。
env.setStateBackend(new HashMapStateBackend());env.setStateBackend(new EmbeddedRocksDBStateBackend());
哈希表状态后端优点:本地状态放入内存,读写效率高
内嵌RocksDB状态后端优点:异步快照;增量式保存检查点机制