I'm trying to implement Optimistic Locking with Redis Lettuce client in Java Reactive according to this article: . I got 2 approaches, V1 and V2. V2 works but not V1. I don't understand why, and V2 doesn't seem very effecient in my opinion.
This doesn't work:
public Mono<List<T>> optimisticUpdateAllV1(List<String> ids, Function<T, T> mapper) {
// map the ids to redis keys
List<String> keys = // ...
String[] keysArray = keys.toArray(new String[0]);
RedisReactiveCommands<String, String> redisReactiveCommands = redisConnection.reactive();
return redisReactiveCommands.watch(keysArray)
.then(redisReactiveCommands.mget(keysArray).collectList())
.map(entries -> {
List<T> oldItems = // ...
return oldItems.stream().map(mapper).toList();
})
.flatMap(newItems -> {
// map the entities back to <key, stringified entity>
Map<String, String> newItemsMap = // ...
return redisReactiveCommands.multi()
.flatMap(multiStatus -> {
return redisReactiveCommands.mset(newItemsMap);
})
.flatMap(mSetStatus -> {
return redisReactiveCommands.exec();
})
.flatMap(executeStatus -> {
if(executeStatus.wasDiscarded() || executeStatus.isEmpty()){
return optimisticUpdateAllV1(ids, mapper);
}
return Mono.just(newItems);
});
});
}
And this works by switching to blocking mset instruction inside multi block by adding an onSuccess callback to the multi mono.
public Mono<List<T>> optimisticUpdateAllV2(List<String> ids, Function<T, T> mapper) {
// map the ids to redis keys
List<String> keys = // ...
String[] keysArray = keys.toArray(new String[0]);
RedisReactiveCommands<String, String> redisReactiveCommands = redisConnection.reactive();
return redisReactiveCommands.watch(keysArray)
.then(redisReactiveCommands.mget(keysArray).collectList())
.map(entries -> {
List<T> oldItems = // ...
return oldItems.stream().map(mapper).toList();
})
.flatMap(newItems -> {
// map the entities back to <key, stringified entity>
Map<String, String> newItemsMap = // ...
return redisReactiveCommands.multi()
.doOnSuccess((e) -> {
redisReactiveCommands.mset(newItemsMap).subscribe();
})
.flatMap(multiStatus -> {
return redisReactiveCommands.exec();
})
.flatMap(executeStatus -> {
if(executeStatus.wasDiscarded() || executeStatus.isEmpty()){
return optimisticUpdateAllV2(ids, mapper);
}
return Mono.just(newItems);
});
});
}
First, i don't understand why it doesn't work with the first approach, and secondly, i'm not sure if putting a subscribe()
and blocking the flow inside the mutli mono block is a wise idea.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745156633a4614161.html
评论列表(0条)