Issue while setting current context using MapBundleOperator in Flink - Stack Overflow

I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream.In ord

I'm using Flink's MapBundleFunction to achieve mini batch processing using datastream. In order to achieve this I'm keying the stream based on an attribute(of flink's internal RowData type) and then I keep on accumulating the elements in a hashmap(which happens to be of type <RowData, MyPojo>). When triggering the finishBundle function, I'm iterating through all the keys, setting the key for the context on every iteration using

ctx.setCurrentKey(currentKey);

performing the transformations and updating the state.

Here are what the key components of the code look like.

  1. Setter of the key based on which the stream is being keyed
    public void setStateKey(String id) {
        this.setStateKey = GenericRowData.of(id);
    }
  1. Keying of the stream

    stream1
       .union(stream2)
       .keyBy(MyPojo::getStateKey)
       .transform("M",TypeInformation.of(MyPojo.class), microBatchAccumulator)
       .sinkTo(kafkaSink);
    
  2. Implementation of the finishBundle function.

    @Override
    public void finishBundle(Map<RowData, List<MoPojo>> buffer, Collector<MyPojo> collector) throws Exception {
    
        buffer.forEach((currentKey, currentValue) -> {
    
            log.info("Hash of the current Key -> {}", currentKey.hashCode());
    
            ctx.setCurrentKey(currentKey);
            /....some code../////
    

This setup works fine locally, when deployed on kubernetes fails with the exception: 

Caused by: java.lang.IllegalArgumentException: Key group 193 is not in KeyGroupRange{startKeyGroup=144, endKeyGroup=179}. Unless you're directly using low level state access APIs, this is most likely caused by non-deterministic shuffle key (hashCode and equals implementation).
    at .apache.flink.runtime.state.KeyGroupRangeOffsets.newIllegalKeyGroupException(KeyGroupRangeOffsets.java:37) ~[flink-dist-1.19.0.jar:1.19.0]
    at .apache.flink.runtime.state.InternalKeyContextImpl.setCurrentKeyGroupIndex(InternalKeyContextImpl.java:74) ~[flink-dist-1.19.0.jar:1.19.0]
    at .apache.flink.runtime.state.AbstractKeyedStateBackend.setCurrentKey(AbstractKeyedStateBackend.java:249) ~[flink-dist-1.19.0.jar:1.19.0]
    at .apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.setCurrentKey(RocksDBKeyedStateBackend.java:430) ~[flink-dist-1.19.0.jar:1.19.0]
    at .apache.flink.streaming.api.operators.StreamOperatorStateHandler.setCurrentKey(StreamOperatorStateHandler.java:383) ~[flink-dist-1.19.0.jar:1.19.0]
    ... 18 more
The hashcode that I'm logging also seems to be inconsistent over restarts, for the same string ids being processed.

Flink version -> 1.19.0
parallelism -> 20
max parallelism -> 720

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745166014a4614634.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信