I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream. In order to achieve this I'm keying the stream based on an attribute(of flink's internal RowData type) and then I keep on accumulating the elements in a hashmap(which happens to be of type <RowData, MyPojo>). When triggering the finishBundle function, I'm iterating through all the keys, setting the key for the context on every iteration using
ctx.setCurrentKey(currentKey);
performing the transformations and updating the state.
Here are what the key components of the code look like.
- Setter of the key based on which the stream is being keyed
public void setStateKey(String id) {
this.setStateKey = GenericRowData.of(id);
}
Keying of the stream
stream1 .union(stream2) .keyBy(MyPojo::getStateKey) .transform("M",TypeInformation.of(MyPojo.class), microBatchAccumulator) .sinkTo(kafkaSink);
Implementation of the finishBundle function.
@Override public void finishBundle(Map<RowData, List<MoPojo>> buffer, Collector<MyPojo> collector) throws Exception { buffer.forEach((currentKey, currentValue) -> { log.info("Hash of the current Key -> {}", currentKey.hashCode()); ctx.setCurrentKey(currentKey); /....some code../////
This setup works fine locally, when deployed on kubernetes fails with the exception:
Caused by: java.lang.IllegalArgumentException: Key group 193 is not in KeyGroupRange{startKeyGroup=144, endKeyGroup=179}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
at .apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.19.0.jar:1.19.0]
at .apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383) ~[flink-dist-1.19.0.jar:1.19.0]
... 18 more
The hashcode that I'm logging also seems to be inconsistent over restarts, for the same string ids being processed.
Flink version -> 1.19.0
parallelism -> 20
max parallelism -> 720
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745166014a4614634.html
评论列表(0条)