您的位置:首页 > 健康 > 美食 > 建模培训_国外经典平面设计网站_世界足球排名最新_今日刚刚发生的新闻

建模培训_国外经典平面设计网站_世界足球排名最新_今日刚刚发生的新闻

2025/1/10 15:14:47 来源:https://blog.csdn.net/weixin_43839095/article/details/144968306  浏览:    关键词:建模培训_国外经典平面设计网站_世界足球排名最新_今日刚刚发生的新闻
建模培训_国外经典平面设计网站_世界足球排名最新_今日刚刚发生的新闻

示例代码

public class StateTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.enableCheckpointing(1000*60*10, CheckpointingMode.EXACTLY_ONCE);DataStreamSource<String> source = env.socketTextStream("127.0.0.1", 9999, "\n");DataStream<Tuple2<String, Integer>> res1 = source.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {@Overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = value.split(",");for (String word : words) {out.collect(new Tuple2<>(word,1));}}}).keyBy(0).flatMap(new RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {private ValueState<Integer> cnt;@Overridepublic void open(Configuration parameters) throws Exception {ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("cnt", TypeInformation.of(Integer.class));cnt = getRuntimeContext().getState(descriptor);}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {Integer count = cnt.value();if(count==null){count=0;}Integer updateCnt = value.f1+count;if (updateCnt%3==0){out.collect(new Tuple2<>(value.f0, updateCnt));}cnt.update(updateCnt);}});res1.print();env.execute("word count");}
}

state分类

Keyed States:记录每个Key对应的状态值,一个Task上可能包含多个Key不同Task上不会出现相同的Key。

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>) – 1.4版本废弃,推荐使用AggregatingState
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

Operator States:记录每个Task对应的状态值数据类型

  1. ListState:并发度在改变的时候,会将并发上的每个List都取出,然后把这些List合并到一个新的List,然后根据元素的个数在均匀分配给新的Task;
  2. UnionListState:相比于ListState更加灵活,把划分的方式交给用户去做,当改变并发的时候,会将原来的List拼接起来。然后不做划分,直接交给用户;
  3. BroadcastState:如大表和小表做Join时,小表可以直接广播给大表的分区,在每个并发上的数据都是完全一致的。做的更新也相同,当改变并发的时候,把这些数据COPY到新的Task即可

valueState初始化过程

private ValueState<Integer> cnt;
@Override
public void open(Configuration parameters) throws Exception {// 设计模式--装饰者ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>("cnt", TypeInformation.of(Integer.class));cnt = getRuntimeContext().getState(descriptor);
}

StreamingRuntimeContext

	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {// 获取state的存储类,默认是DefaultKeyedStateStoreKeyedStateStore keyedStateStore = checkPreconditionsAndGetKeyedStateStore(stateProperties);stateProperties.initializeSerializerUnlessSet(getExecutionConfig());return keyedStateStore.getState(stateProperties);}private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor<?, ?> stateDescriptor) {Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");KeyedStateStore keyedStateStore = operator.getKeyedStateStore();Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");return keyedStateStore;}public final void initializeState() throws Exception {final StreamOperatorStateContext context =streamTaskStateManager.streamOperatorStateContext(getOperatorID(),getClass().getSimpleName(),getProcessingTimeService(),this,keySerializer,streamTaskCloseableRegistry,metrics);this.operatorStateBackend = context.operatorStateBackend();this.keyedStateBackend = context.keyedStateBackend();if (keyedStateBackend != null) {this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, getExecutionConfig());}timeServiceManager = context.internalTimerServiceManager();CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = context.rawKeyedStateInputs();CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = context.rawOperatorStateInputs();try {StateInitializationContext initializationContext = new StateInitializationContextImpl(context.isRestored(), // information whether we restore or start for the first timeoperatorStateBackend, // access to operator state backendkeyedStateStore, // access to keyed state backendkeyedStateInputs, // access to keyed state streamoperatorStateInputs); // access to operator state stream// KeyedStateStore 在类初始化的创建initializeState(initializationContext);} finally {closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);}}

DefaultKeyedStateStore

	protected final KeyedStateBackend<?> keyedStateBackend;public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {requireNonNull(stateProperties, "The state properties must not be null");try {stateProperties.initializeSerializerUnlessSet(executionConfig);return getPartitionedState(stateProperties);} catch (Exception e) {throw new RuntimeException("Error while getting state", e);}}protected  <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {// VoidNamespace 空namespace时占位符return keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE,VoidNamespaceSerializer.INSTANCE,stateDescriptor);}

使用KeyedStateBackend存储key state。

AbstractKeyedStateBackend

/** So that we can give out state when the user uses the same key. */
private final HashMap<String, InternalKvState<K, ?, ?>> keyValueStatesByName;
public <N, S extends State> S getPartitionedState(final N namespace,final TypeSerializer<N> namespaceSerializer,final StateDescriptor<S, ?> stateDescriptor) throws Exception {checkNotNull(namespace, "Namespace");// 如果还是上一次访问的state,直接返回if (lastName != null && lastName.equals(stateDescriptor.getName())) {lastState.setCurrentNamespace(namespace);return (S) lastState;}// 根据名称获取state,如果存在,直接返回InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());if (previous != null) {lastState = previous;lastState.setCurrentNamespace(namespace);lastName = stateDescriptor.getName();return (S) previous;}//首次访问state,初始化InternalKvStatefinal S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;lastName = stateDescriptor.getName();lastState = kvState;kvState.setCurrentNamespace(namespace);return state;}
public <N, S extends State, V> S getOrCreateKeyedState(final TypeSerializer<N> namespaceSerializer,StateDescriptor<S, V> stateDescriptor) throws Exception {checkNotNull(namespaceSerializer, "Namespace serializer");checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +"This operation cannot use partitioned state.");InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if (kvState == null) {if (!stateDescriptor.isSerializerInitialized()) {stateDescriptor.initializeSerializerUnlessSet(executionConfig);}kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(namespaceSerializer, stateDescriptor, this, ttlTimeProvider);keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return (S) kvState;}

不同state对应的内部状态类,ValueState对应的是InternalKvState

TtlStateFactory

	public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(TypeSerializer<N> namespaceSerializer,StateDescriptor<S, SV> stateDesc,KeyedStateBackend<K> stateBackend,TtlTimeProvider timeProvider) throws Exception {Preconditions.checkNotNull(namespaceSerializer);Preconditions.checkNotNull(stateDesc);Preconditions.checkNotNull(stateBackend);Preconditions.checkNotNull(timeProvider);return  stateDesc.getTtlConfig().isEnabled() ?new TtlStateFactory<K, N, SV, TTLSV, S, IS>(namespaceSerializer, stateDesc, stateBackend, timeProvider).createState() :stateBackend.createInternalState(namespaceSerializer, stateDesc);}

KeyedStateFactory

	default <N, SV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer,@Nonnull StateDescriptor<S, SV> stateDesc) throws Exception {return createInternalState(namespaceSerializer, stateDesc, StateSnapshotTransformFactory.noTransform());}

HeapKeyedStateBackend

	public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(@Nonnull TypeSerializer<N> namespaceSerializer,@Nonnull StateDescriptor<S, SV> stateDesc,@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());if (stateFactory == null) {String message = String.format("State %s is not supported by %s",stateDesc.getClass(), this.getClass());throw new FlinkRuntimeException(message);}// 1.创建stateTable存储kv值StateTable<K, N, SV> stateTable = tryRegisterStateTable(namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));// 2.创建state,里面有一个属性是stateTablereturn stateFactory.createState(stateDesc, stateTable, getKeySerializer());}private <N, V> StateTable<K, N, V> tryRegisterStateTable(TypeSerializer<N> namespaceSerializer,StateDescriptor<?, V> stateDesc,@Nonnull StateSnapshotTransformFactory<V> snapshotTransformFactory) throws StateMigrationException {@SuppressWarnings("unchecked")StateTable<K, N, V> stateTable = (StateTable<K, N, V>) registeredKVStates.get(stateDesc.getName());TypeSerializer<V> newStateSerializer = stateDesc.getSerializer();if (stateTable != null) {// ...} else {RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo = new RegisteredKeyValueStateBackendMetaInfo<>(stateDesc.getType(),stateDesc.getName(),namespaceSerializer,newStateSerializer,snapshotTransformFactory);stateTable = snapshotStrategy.newStateTable(keyContext, newMetaInfo, keySerializer);registeredKVStates.put(stateDesc.getName(), stateTable);}return stateTable;

HeapSnapshotStrategy–1

	public <N, V> StateTable<K, N, V> newStateTable(InternalKeyContext<K> keyContext,RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo,TypeSerializer<K> keySerializer) {return snapshotStrategySynchronicityTrait.newStateTable(keyContext, newMetaInfo, keySerializer);}

AsyncSnapshotStrategySynchronicityBehavior–1

	@Overridepublic <N, V> StateTable<K, N, V> newStateTable(InternalKeyContext<K> keyContext,RegisteredKeyValueStateBackendMetaInfo<N, V> newMetaInfo,TypeSerializer<K> keySerializer) {return new CopyOnWriteStateTable<>(keyContext, newMetaInfo, keySerializer);}

CopyOnWriteStateTable–1

	CopyOnWriteStateTable(InternalKeyContext<K> keyContext,RegisteredKeyValueStateBackendMetaInfo<N, S> metaInfo,TypeSerializer<K> keySerializer) {super(keyContext, metaInfo, keySerializer);}@Overrideprotected CopyOnWriteStateMap<K, N, S> createStateMap() {// 底层最终存储return new CopyOnWriteStateMap<>(getStateSerializer());}

HeapKeyedStateBackend – 2

	private static final Map<Class<? extends StateDescriptor>, StateFactory> STATE_FACTORIES =Stream.of(Tuple2.of(ValueStateDescriptor.class, (StateFactory) HeapValueState::create),Tuple2.of(ListStateDescriptor.class, (StateFactory) HeapListState::create),Tuple2.of(MapStateDescriptor.class, (StateFactory) HeapMapState::create),Tuple2.of(AggregatingStateDescriptor.class, (StateFactory) HeapAggregatingState::create),Tuple2.of(ReducingStateDescriptor.class, (StateFactory) HeapReducingState::create),Tuple2.of(FoldingStateDescriptor.class, (StateFactory) HeapFoldingState::create)).collect(Collectors.toMap(t -> t.f0, t -> t.f1));

HeapValueState – 2

/** Map containing the actual key/value pairs. */
// K key 
// N namespace
// SV value
protected final StateTable<K, N, SV> stateTable;static <K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc,StateTable<K, N, SV> stateTable,TypeSerializer<K> keySerializer) {// 最终返回 HeapValueStatereturn (IS) new HeapValueState<>(stateTable,keySerializer,stateTable.getStateSerializer(),stateTable.getNamespaceSerializer(),stateDesc.getDefaultValue());}

初始化完成。

数据结构CopyOnWriteStateTable

主要参考:https://blog.csdn.net/u013939918/article/details/106755128

AbstractHeapState protected final StateTable<K, N, SV> stateTable;

StateTable protected final StateMap<K, N, S>[] keyGroupedStateMaps;

keyGroupedStateMaps数组在一开始就初始化好了每一个位置为空的StateMap

StateMap<K, N, S>[] state = (StateMap<K, N, S>[]) new StateMap[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];this.keyGroupedStateMaps = state;for (int i = 0; i < this.keyGroupedStateMaps.length; i++) {this.keyGroupedStateMaps[i] = createStateMap();}

�StateMap有两个实现 :CopyOnWriteStateMap 和 NestedStateMap

NestedStateMap是使用两层hashmap实现的。同步快照。

CopyOnWriteStateMap是使用数组加链表实现的。

CopyOnWriteStateMap sacrifices some peak performance and memory efficiency for features like incremental rehashing(渐进式扩容) and asynchronous snapshots(异步快照) through copy-on-write.

CopyOnWrite通过比较版本大小,尽量减少copy的数量。

CopyOnWriteStateMap 属性

public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {// 默认容量 128,即: hash 表中桶的个数默认 128public static final int DEFAULT_CAPACITY = 128;// hash 扩容迁移数据时,每次最少要迁移 4 条数据private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;// State 的序列化器protected final TypeSerializer<S> stateSerializer;// 空表:提前创建好private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE = new StateMapEntry[MINIMUM_CAPACITY >>> 1];// 当前 StateMap 的 version,每次创建一个 Snapshot 时,StateMap 的版本号加一private int stateMapVersion;// 所有 正在进行中的 snapshot 的 version// 每次创建出一个 Snapshot 时,都需要将 Snapshot 的 version 保存到该 Set 中private final TreeSet<Integer> snapshotVersions;// 正在进行中的那些 snapshot 的最大版本号// 这里保存的就是 TreeSet<Integer> snapshotVersions 中最大的版本号private int highestRequiredSnapshotVersion;// 主表:用于存储数据的 tableprivate StateMapEntry<K, N, S>[] primaryTable;// 扩容时的新表,扩容期间数组长度为 primaryTable 的 2 倍。// 非扩容期间为 空表private StateMapEntry<K, N, S>[] incrementalRehashTable;// primaryTable 中元素个数private int primaryTableSize;// incrementalRehashTable 中元素个数private int incrementalRehashTableSize;// primary table 中增量 rehash 要迁移的下一个 index// 即:primaryTable 中 rehashIndex 之前的数据全部搬移完成private int rehashIndex;// 扩容阈值,与 HashMap 类似,当元素个数大于 threshold 时,就会开始扩容。// 默认 threshold 为 StateMap 容量 * 0.75private int threshold;// 用于记录元素修改的次数,遍历迭代过程中,发现 modCount 修改了,则抛异常private int modCount;
}
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {final K key;final N namespace;S state;final int hash;StateMapEntry<K, N, S> next;// new entry 时的版本号int entryVersion;// state (数据)更新时的 版本号int stateVersion;
}

渐进式扩容

在内存中有两个 hash 表,一个是 primaryTable 作为主桶,一个是 rehashTable 作为扩容期间用的桶。初始阶段只有 primaryTable,当 primaryTable 中元素个数大于设定的阈值时,就要开始扩容。

扩容过程:申请一个相比 primaryTable 容量大一倍的 hash 表保存到 rehashTable 中,慢慢地将 primaryTable 中的元素迁移到 rehashTable 中。对应到源码中:putEntry 方法中判断 size() > threshold 时,会调用 doubleCapacity 方法申请新的 hash 表赋值给 rehashTable。

扩容时 primaryTable 中 0 位置上的元素会迁移到 rehashTable 的 0 和 4 位置上,同理 primaryTable 中 1 位置上的元素会迁移到 rehashTable 的 1 和 5 位置上。

这样就会带来一个问题:部分数据已经从primaryTable转移到了rehashTable中,查询数据的时候应该去primaryTable和rehashTable中的哪一个。

选择策略:

// primary table 中增量 rehash 要迁移的下一个 index// 即:primaryTable 中 rehashIndex 之前的数据全部搬移完成private int rehashIndex;/*** Select the sub-table which is responsible for entries with the given hash code.** @param hashCode the hash code which we use to decide about the table that is responsible.* @return the index of the sub-table that is responsible for the entry with the given hash code.*/private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {//大于等于rehashIndex,选择primaryTable,否则选择incrementalRehashTablereturn (hashCode & (primaryTable.length - 1)) >= rehashIndex ? primaryTable : incrementalRehashTable;}

迁移过程

	private int computeHashForOperationAndDoIncrementalRehash(K key, N namespace) {if (isRehashing()) {// 进行迁移incrementalRehash();}//计算hashreturn compositeHash(key, namespace);}
private void incrementalRehash() {StateMapEntry<K, N, S>[] oldMap = primaryTable;StateMapEntry<K, N, S>[] newMap = incrementalRehashTable;int oldCapacity = oldMap.length;int newMask = newMap.length - 1;int requiredVersion = highestRequiredSnapshotVersion;int rhIdx = rehashIndex;// 记录本次迁移了几个元素int transferred = 0;// 每次至少迁移 MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 个元素到新桶、// MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 默认为 4while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {// 遍历 oldMap 的第 rhIdx 个桶StateMapEntry<K, N, S> e = oldMap[rhIdx];// 每次 e 都指向 e.next,e 不为空,表示当前桶中还有元素未遍历,需要继续遍历// 每次迁移必须保证,整个桶被迁移完,不能是某个桶迁移到一半while (e != null) {// 遇到版本比 highestRequiredSnapshotVersion 小的元素,则 copy 一份if (e.entryVersion < requiredVersion) {e = new StateMapEntry<>(e, stateMapVersion);}// 保存下一个要迁移的节点节点到 nStateMapEntry<K, N, S> n = e.next;// 迁移当前元素 e 到新的 table 中,插入到链表头部int pos = e.hash & newMask;e.next = newMap[pos];newMap[pos] = e;// e 指向下一个要迁移的节点e = n;// 迁移元素数 +1++transferred;}oldMap[rhIdx] = null;// rhIdx 之前的桶已经迁移完,rhIdx == oldCapacity 就表示迁移完成了// 做一些初始化操作if (++rhIdx == oldCapacity) {//here, the rehash is complete and we release resources and reset fields//扩容完成,primaryTable变成有全部数据,incrementalRehashTable置空primaryTable = newMap;incrementalRehashTable = (StateMapEntry<K, N, S>[]) EMPTY_TABLE;primaryTableSize += incrementalRehashTableSize;incrementalRehashTableSize = 0;rehashIndex = 0;return;}}// primaryTableSize 中减去 transferred,增加 transferredprimaryTableSize -= transferred;incrementalRehashTableSize += transferred;rehashIndex = rhIdx;
}

异步快照�

StateMap 的 Snapshot 策略是指:为了支持异步的 Snapshot,需要将 Snapshot 时 StateMap 的快照保存下来。

生成快照

传统的方法就是将 StateMap 的全量数据在内存中深拷贝一份,然后拷贝的这一份数据去慢慢做快照,原始的数据可以对外服务。但是深拷贝需要拷贝所有的真实数据,所以效率会非常低。为了提高效率,Flink 只是对数据进行了浅拷贝。

CopyOnWriteStateTable 的 stateSnapshot 方法对整个 StateTable 进行快照。

stateSnapshot 方法会创建 CopyOnWriteStateTableSnapshot

CopyOnWriteStateTableSnapshot的构造器中会调用CopyOnWriteStateTable的getStateMapSnapshotList方法。

CopyOnWriteStateTable

	List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() {List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList = new ArrayList<>(keyGroupedStateMaps.length);for (int i = 0; i < keyGroupedStateMaps.length; i++) {CopyOnWriteStateMap<K, N, S> stateMap = (CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i];snapshotList.add(stateMap.stateSnapshot());}return snapshotList;}

CopyOnWriteStateTable 中为每个 KeyGroup 维护了一个 StateMap 到 keyGroupedStateMaps 中,getStateMapSnapshotList 方法会调用所有 CopyOnWriteStateMap 的 stateSnapshot 方法。

public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {return new CopyOnWriteStateMapSnapshot<>(this);
}CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {super(owningStateMap);// 对 StateMap 的数据进行浅拷贝,生成 snapshotDatathis.snapshotData = owningStateMap.snapshotMapArrays();// 记录当前的 StateMap 版本到 snapshotVersion 中this.snapshotVersion = owningStateMap.getStateMapVersion();this.numberOfEntriesInSnapshotData = owningStateMap.size();
}
 // 当前 StateMap 的 versionprivate int stateMapVersion;// 所有 正在进行中的 snapshot 的 versionprivate final TreeSet<Integer> snapshotVersions;// 正在进行中的那些 snapshot 的最大版本号private int highestRequiredSnapshotVersion;StateMapEntry<K, N, S>[] snapshotMapArrays() {// 1、stateMapVersion 版本 + 1,赋值给 highestRequiredSnapshotVersion,// 并加入snapshotVersionssynchronized (snapshotVersions) {++stateMapVersion;highestRequiredSnapshotVersion = stateMapVersion;snapshotVersions.add(highestRequiredSnapshotVersion);}// 2、 将现在 primary 和 Increment 的元素浅拷贝一份到 copy 中// copy 策略:copy 数组长度为 primary 中剩余的桶数 + Increment 中有数据的桶数// primary 中剩余的数据放在 copy 数组的前面,Increment 中低位数据随后,// Increment 中高位数据放到 copy 数组的最后StateMapEntry<K, N, S>[] table = primaryTable;final int totalMapIndexSize = rehashIndex + table.length;final int copiedArraySize = Math.max(totalMapIndexSize, size());final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize];if (isRehashing()) {final int localRehashIndex = rehashIndex;final int localCopyLength = table.length - localRehashIndex;// for the primary table, take every index >= rhIdx.System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);table = incrementalRehashTable;System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);System.arraycopy(table, table.length >>> 1, copy, localCopyLength + localRehashIndex, localRehashIndex);} else {System.arraycopy(table, 0, copy, 0, table.length);}return copy;}
清除快照

releaseSnapshot

void releaseSnapshot(int snapshotVersion) {synchronized (snapshotVersions) {// 将 相应的 snapshotVersion 从 snapshotVersions 中 removesnapshotVersions.remove(snapshotVersion);// 将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,// 如果snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ? 0 : snapshotVersions.last();}
}

releaseSnapshot 方法将相应的 snapshotVersion 从 snapshotVersions 中 remove,并将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,如果snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0。

CopyOnWrite

每次 Snapshot 时仅仅是浅拷贝一份,所以 Snapshot 和 StateMap 共同引用真实的数据。假如 Snapshot 还没将数据 flush 到磁盘,但是 StateMap 中对数据进行了修改,那么 Snapshot 最后 flush 的数据就是错误的。Snapshot 的目标是:将 Snapshot 快照中原始的数据刷到磁盘,既然叫快照,所以不允许被修改。

那 StateMap 如何来保证修改数据的时候,不会修改 Snapshot 的数据呢?其实原理很简单:StateMap 和 Snapshot 共享了一大堆数据,既然 Snapshot 要求数据不能修改,那么 StateMap 在修改某条数据时可以将这条数据复制一份产生一个副本,所以 Snapshot 和 StateMap 就会各自拥有自己的副本,所以 StateMap 对数据的修改就不会影响 Snapshot 的快照。当然为了节省内存和提高效率,StateMap 只会拷贝那些要改变的数据,尽量多的实现共享,不能实现共享的数据只能 Copy 一份再修改了,这就是类名用 CopyOnWrite 修饰的原因。

修改头部节点

  • 深拷贝一个 Entry a 对象为 Entry a copy
  • 将 Entry a copy 放到 primaryTable 的链表中,且 next 指向 Entry b
  • 应用层修改 Entry a copy 的 data,将 data1 修改为设定的 data2
修改中间节点

在修改 Entry b 时,不仅仅要将 Entry b 拷贝一份,而且还要将链表中 Entry b 之前的 Entry 必须全部 copy 一份,这样才能保证在满足正确性的前提下修改 Entry b,毕竟正确性是第一位。

  • 深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy
  • 将 Entry a copy 和 b copy 串在 primaryTable 的链表中,且 Entry b 的 next 指向 Entry c
  • 应用层修改 Entry b copy 的 data,将 data 修改为设定的 data2
	private StateMapEntry<K, N, S> putEntry(K key, N namespace) {final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);int index = hash & (tab.length - 1);for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {// 如果根据 key 和 namespace 找到了对应的 Entry,则认为是修改数据// 普通的 HashMap 结构有一个 Key ,而这里 key 和 namespace 的组合当做 keyif (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {// entryVersion 表示 entry 创建时的版本号// highestRequiredSnapshotVersion 表示 正在进行中的那些 snapshot 的最大版本号// entryVersion 小于 highestRequiredSnapshotVersion,说明 Entry 的版本小于当前某些 Snapshot 的版本号,// 即:当前 Entry 是旧版本的数据,当前 Entry 被其他 snapshot 持有。// 为了保证 Snapshot 的数据正确性,这里必须为 e 创建新的副本,且 e 之前的某些元素也需要 copy 副本// handleChainedEntryCopyOnWrite 方法将会进行相应的 copy 操作,并返回 e 的新副本// 然后将返回 handleChainedEntryCopyOnWrite 方法返回的 e 的副本返回给上层,进行数据的修改操作。if (e.entryVersion < highestRequiredSnapshotVersion) {e = handleChainedEntryCopyOnWrite(tab, index, e);}// 反之,entryVersion >= highestRequiredSnapshotVersion// 说明当前 Entry 创建时的版本比所有 Snapshot 的版本高// 即:当前 Entry 是新版本的数据,不被任何 Snapshot 持有// 注:Snapshot 不可能引用高版本的数据// 此时,e 是新的 Entry,不存在共享问题,所以直接修改当前 Entry 即可,所以返回当前 e return e;}}++modCount;if (size() > threshold) {doubleCapacity();}// 插入新元素return addNewStateMapEntry(tab, key, namespace, hash);}private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(StateMapEntry<K, N, S>[] tab,int mapIdx,StateMapEntry<K, N, S> untilEntry) {// 最大版本号final int required = highestRequiredSnapshotVersion;// 当前entryStateMapEntry<K, N, S> current = tab[mapIdx];// 复制的entry,最终为新的entryStateMapEntry<K, N, S> copy;if (current.entryVersion < required) {// 头部entry首先复制,此时头部插入完成copy = new StateMapEntry<>(current, stateMapVersion);tab[mapIdx] = copy;} else {// nothing to do, just advance copy to currentcopy = current;}// we iterate the chain up to 'until entry'// 遍历查找中间部分while (current != untilEntry) {//advance currentcurrent = current.next;if (current.entryVersion < required) {// copy and advance the current's copy// 复制新的entrycopy.next = new StateMapEntry<>(current, stateMapVersion);copy = copy.next;} else {// nothing to do, just advance copy to currentcopy = current;}}return copy;}
插入新数据

	private StateMapEntry<K, N, S> addNewStateMapEntry(StateMapEntry<K, N, S>[] table,K key,N namespace,int hash) {// small optimization that aims to avoid holding references on duplicate namespace objectsif (namespace.equals(lastNamespace)) {namespace = lastNamespace;} else {lastNamespace = namespace;}int index = hash & (table.length - 1);StateMapEntry<K, N, S> newEntry = new StateMapEntry<>(key,namespace,null,hash,table[index],stateMapVersion,stateMapVersion);table[index] = newEntry;if (table == primaryTable) {++primaryTableSize;} else {++incrementalRehashTableSize;}return newEntry;}
链表头部有新节点再修改链表中间节点的场景

get 链表中间节点

因为获得entry很有可能会修改data,所以CopyOnWriteStateMap 把 get 操作跟 put 操作同等对待,无论是 get 还是 put 都需要将 Entry 及其之前的 Entry copy 一份。

public S get(K key, N namespace) {final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);final int requiredVersion = highestRequiredSnapshotVersion;final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);int index = hash & (tab.length - 1);for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {final K eKey = e.key;final N eNamespace = e.namespace;if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {// copy-on-write check for stateif (e.stateVersion < requiredVersion) {// copy-on-write check for entryif (e.entryVersion < requiredVersion) {// 跟put调用相同方法处理entrye = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);}e.stateVersion = stateMapVersion;// 深度copy,避免修改data影响SnapshotDatae.state = getStateSerializer().copy(e.state);}return e.state;}}return null;}
删除头部节点

删除中间节点

private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);int index = hash & (tab.length - 1);for (StateMapEntry<K, N, S> e = tab[index], prev = null; e != null; prev = e, e = e.next) {if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {if (prev == null) {// 删除头部节点tab[index] = e.next;} else {// copy-on-write check for entryif (prev.entryVersion < highestRequiredSnapshotVersion) {// 同put时候调用一样方法,找到节点,并处理entryChainprev = handleChainedEntryCopyOnWrite(tab, index, prev);}prev.next = e.next;}++modCount;if (tab == primaryTable) {--primaryTableSize;} else {--incrementalRehashTableSize;}return e;}}// 没有找到对应节点return null;}


版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com