kv put :
kv.put("sue.color", "blue".getBytes());
最终调用
private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) throws IOException, JetStreamApiException
subject名称是" K V . p r o f i l e . s u e . c o l o r " , 这里有一个问题:其实 " KV.profile.sue.color ", 这里有一个问题:其实" KV.profile.sue.color",这里有一个问题:其实"KV.profile"就是一个流,如果有人写错了代码,可以删除它。
get action将调用
protected MessageInfo _getLast(String subject) throws IOException, JetStreamApiException {try {return jsm.getLastMessage(streamName, subject);}catch (JetStreamApiException jsae) {if (jsae.getApiErrorCode() == JS_NO_MESSAGE_FOUND_ERR) {return null;}throw jsae;}
}
使用 subject “DIRECT.GET.KV_profiles.$KV.profiles.sue.color”
调用makeRequestResponseRequired 函数
// ----------------------------------------------------------------------------------------------------
// Request Utils
// ----------------------------------------------------------------------------------------------------
Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeout) throws IOException {try {return responseRequired(conn.request(prependPrefix(subject), bytes, timeout));} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new IOException(e);}
}
通过指令:DIRECT.GET. 告诉服务端调用 $JS.API.DIRECT.GET.
update需要设置expectedRevision,其动作可以描述为:如果键存在并且其最新版本(expectedRevision)与预期匹配,则将其设置为键的值
/*** {@inheritDoc}*/
@Override
public long update(String key, byte[] value, long expectedRevision) throws IOException, JetStreamApiException {validateNonWildcardKvKeyRequired(key);Headers h = new Headers().add(EXPECTED_LAST_SUB_SEQ_HDR, Long.toString(expectedRevision));return _write(key, value, h).getSeqno();
}
如果lastRevision参数不等于服务端key的当前revision,将抛出JetStreamApiException,其内容如下:
{"error":{"code":400,"err_code":10071,"description":"wrong last sequence: 13"}
所以应该编写如下代码:
long lastRevision = entry.getRevision();...kv.update("sue.color", "red".getBytes(), lastRevision);
接下来分析watcher,核心代码是公共类NatsWatchSubscription,它实现了AutoCloseable的finishInit函数。KV也是stream,所以watcher就是subscribe 对应的stream。
PushSubscribeOptions pso = PushSubscribeOptions.builder().stream(fb.getStreamName()).ordered(true).configuration(ConsumerConfiguration.builder().ackPolicy(AckPolicy.None).deliverPolicy(deliverPolicy).startSequence(fromRevision).headersOnly(headersOnly).filterSubjects(subscribeSubjects).build()).build();dispatcher = (NatsDispatcher) ((NatsJetStream) js).conn.createDispatcher();
sub = js.subscribe(null, dispatcher, handler, false, pso);
if (!handler.endOfDataSent) {long pending = sub.getConsumerInfo().getCalculatedPending();if (pending == 0) {handler.sendEndOfData();}
}
使用filter订阅的要注意,以下代码是有问题,不能包括"sue"
List<String> keys=new ArrayList<>();
keys.add("sue.*");
kv.watch( keys, watcher, KeyValueWatchOption.UPDATES_ONLY);
通过一些代码可以实现全部订阅 keys.add(“sue”);
List<String> keys=new ArrayList<>();
keys.add("sue");
keys.add("sue.*");
kv.watch( keys, watcher, KeyValueWatchOption.UPDATES_ONLY);