java - Error while deserializing null values in Flink application - Stack Overflow

I am testing a Flink job that reads data from Kafka and writes to files.When running an integration te

I am testing a Flink job that reads data from Kafka and writes to files.

When running an integration test with Junit, I get the following error: Caused by: java.lang.IllegalAccessError: class com.fasterxml.jackson.databind.node.NullNodeConstructorAccess tried to access protected method 'void com.fasterxml.jackson.databind.node.NullNode.<init>()' (com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is in unnamed module of loader com.esotericsoftware.reflectasm.AccessClassLoader @7a1a4114; com.fasterxml.jackson.databind.node.NullNode is in unnamed module of loader 'app')

After running the debugger, I find that this happens when deserializing a null value in data received from Kafka. But I do not understand why this an error. Seemingly, both classes are in the same package, so the constructor should be accessible. However, it also looks like com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is not a real class, and is a class name generated by Kryo to do some reflection.

How do I fix this?

MVCE for the error. Requires software.amazon.glue:schema-registry-flink-serde along with the Flink dependencies.

import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroDeserializationSchema;
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema;

import java.io.IOException;
import java.util.Map;
import .apache.avro.Schema;
import .apache.avro.generic.GenericData;
import .apache.avro.generic.GenericRecord;
import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.apimon.typeinfo.TypeHint;
import .apache.flink.apimon.typeinfo.TypeInformation;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import .apache.flink.formats.avro.AvroDeserializationSchema;
import .apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import .apache.flink.streaming.api.datastream.DataStream;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.test.util.MiniClusterWithClientResource;
import .apache.flink.util.Collector;
import .apache.kafka.clients.consumer.ConsumerRecord;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .junit.ClassRule;
import .junit.jupiter.api.BeforeAll;
import .junit.jupiter.api.Test;
import .testcontainers.containers.GenericContainer;
import .testcontainers.junit.jupiter.Container;
import .testcontainers.junit.jupiter.Testcontainers;
import .testcontainers.utility.DockerImageName;

@Testcontainers(disabledWithoutDocker = true, parallel = true)
public class IntegrationTestMvce {

  static Schema SCHEMA =
      new Schema.Parser()
          .parse(
              "{\"type\":\"record\", \"name\":  \"event\", \"fields\":[{\"name\": \"flag\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}");

  @ClassRule
  public static MiniClusterWithClientResource FLINK_CLUSTER =
      new MiniClusterWithClientResource(
          new MiniClusterResourceConfiguration.Builder()
              .setNumberSlotsPerTaskManager(2)
              .setNumberTaskManagers(1)
              .build());

  @Container
  static final GenericContainer<?> KAFKA =
      new GenericContainer<>(DockerImageName.parse("apache/kafka")).withNetworkMode("host");

  @BeforeAll
  public static void setup() throws Exception {

    KAFKA.execInContainer(
        "/opt/kafka/bin/kafka-topics.sh",
        "--bootstrap-server",
        "localhost:9092",
        "--create",
        "--topic",
        "test");
  }

  @Test
  public void integrationTest() throws Exception {
    Map<String, Object> producerConfig =
        Map.of(
            "bootstrap.servers", "localhost:9092",
            "key.serializer", ".apache.kafkamon.serialization.ByteArraySerializer",
            "value.serializer", ".apache.kafkamon.serialization.ByteArraySerializer");
    GenericRecord data = new GenericData.Record(SCHEMA);
    byte[] bytes = GlueSchemaRegistryAvroSerializationSchema.forGeneric(SCHEMA).serialize(data);

    try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig)) {
      producer.send(new ProducerRecord<>("test", bytes)).get();
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<KafkaSourceRecord<GenericRecord>> source =
        env.fromSource(
            KafkaSource.<KafkaSourceRecord<GenericRecord>>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("test")
                .setGroupId("test-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(new TestDeserializer())
                .build(),
            WatermarkStrategy.noWatermarks(),
            "kafka");
    source.print();
    env.execute();
  }

  static class TestDeserializer
      implements KafkaRecordDeserializationSchema<KafkaSourceRecord<GenericRecord>> {
    @Override
    public void deserialize(
            ConsumerRecord<byte[], byte[]> record, Collector<KafkaSourceRecord<GenericRecord>> out)
        throws IOException {

      AvroDeserializationSchema<GenericRecord> deserializer =
          GlueSchemaRegistryAvroDeserializationSchema.forGeneric(SCHEMA);

      out.collect(
          new KafkaSourceRecord<>(
              record.topic(), record.partition(), deserializer.deserialize(record.value())));
    }

    @Override
    public TypeInformation<KafkaSourceRecord<GenericRecord>> getProducedType() {
      return TypeInformation.of(new TypeHint<>() {});
    }
  }

  static class KafkaSourceRecord<T> {
    public final String topic;
    public final long partition;
    public final T value;

    public KafkaSourceRecord(String topic, long partition, T value) {
      this.topic = topic;
      this.partition = partition;
      this.value = value;
    }

  }
}

I am testing a Flink job that reads data from Kafka and writes to files.

When running an integration test with Junit, I get the following error: Caused by: java.lang.IllegalAccessError: class com.fasterxml.jackson.databind.node.NullNodeConstructorAccess tried to access protected method 'void com.fasterxml.jackson.databind.node.NullNode.<init>()' (com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is in unnamed module of loader com.esotericsoftware.reflectasm.AccessClassLoader @7a1a4114; com.fasterxml.jackson.databind.node.NullNode is in unnamed module of loader 'app')

After running the debugger, I find that this happens when deserializing a null value in data received from Kafka. But I do not understand why this an error. Seemingly, both classes are in the same package, so the constructor should be accessible. However, it also looks like com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is not a real class, and is a class name generated by Kryo to do some reflection.

How do I fix this?

MVCE for the error. Requires software.amazon.glue:schema-registry-flink-serde along with the Flink dependencies.

import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroDeserializationSchema;
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema;

import java.io.IOException;
import java.util.Map;
import .apache.avro.Schema;
import .apache.avro.generic.GenericData;
import .apache.avro.generic.GenericRecord;
import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.apimon.typeinfo.TypeHint;
import .apache.flink.apimon.typeinfo.TypeInformation;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import .apache.flink.formats.avro.AvroDeserializationSchema;
import .apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import .apache.flink.streaming.api.datastream.DataStream;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.test.util.MiniClusterWithClientResource;
import .apache.flink.util.Collector;
import .apache.kafka.clients.consumer.ConsumerRecord;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .junit.ClassRule;
import .junit.jupiter.api.BeforeAll;
import .junit.jupiter.api.Test;
import .testcontainers.containers.GenericContainer;
import .testcontainers.junit.jupiter.Container;
import .testcontainers.junit.jupiter.Testcontainers;
import .testcontainers.utility.DockerImageName;

@Testcontainers(disabledWithoutDocker = true, parallel = true)
public class IntegrationTestMvce {

  static Schema SCHEMA =
      new Schema.Parser()
          .parse(
              "{\"type\":\"record\", \"name\":  \"event\", \"fields\":[{\"name\": \"flag\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}");

  @ClassRule
  public static MiniClusterWithClientResource FLINK_CLUSTER =
      new MiniClusterWithClientResource(
          new MiniClusterResourceConfiguration.Builder()
              .setNumberSlotsPerTaskManager(2)
              .setNumberTaskManagers(1)
              .build());

  @Container
  static final GenericContainer<?> KAFKA =
      new GenericContainer<>(DockerImageName.parse("apache/kafka")).withNetworkMode("host");

  @BeforeAll
  public static void setup() throws Exception {

    KAFKA.execInContainer(
        "/opt/kafka/bin/kafka-topics.sh",
        "--bootstrap-server",
        "localhost:9092",
        "--create",
        "--topic",
        "test");
  }

  @Test
  public void integrationTest() throws Exception {
    Map<String, Object> producerConfig =
        Map.of(
            "bootstrap.servers", "localhost:9092",
            "key.serializer", ".apache.kafkamon.serialization.ByteArraySerializer",
            "value.serializer", ".apache.kafkamon.serialization.ByteArraySerializer");
    GenericRecord data = new GenericData.Record(SCHEMA);
    byte[] bytes = GlueSchemaRegistryAvroSerializationSchema.forGeneric(SCHEMA).serialize(data);

    try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig)) {
      producer.send(new ProducerRecord<>("test", bytes)).get();
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<KafkaSourceRecord<GenericRecord>> source =
        env.fromSource(
            KafkaSource.<KafkaSourceRecord<GenericRecord>>builder()
                .setBootstrapServers("localhost:9092")
                .setTopics("test")
                .setGroupId("test-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(new TestDeserializer())
                .build(),
            WatermarkStrategy.noWatermarks(),
            "kafka");
    source.print();
    env.execute();
  }

  static class TestDeserializer
      implements KafkaRecordDeserializationSchema<KafkaSourceRecord<GenericRecord>> {
    @Override
    public void deserialize(
            ConsumerRecord<byte[], byte[]> record, Collector<KafkaSourceRecord<GenericRecord>> out)
        throws IOException {

      AvroDeserializationSchema<GenericRecord> deserializer =
          GlueSchemaRegistryAvroDeserializationSchema.forGeneric(SCHEMA);

      out.collect(
          new KafkaSourceRecord<>(
              record.topic(), record.partition(), deserializer.deserialize(record.value())));
    }

    @Override
    public TypeInformation<KafkaSourceRecord<GenericRecord>> getProducedType() {
      return TypeInformation.of(new TypeHint<>() {});
    }
  }

  static class KafkaSourceRecord<T> {
    public final String topic;
    public final long partition;
    public final T value;

    public KafkaSourceRecord(String topic, long partition, T value) {
      this.topic = topic;
      this.partition = partition;
      this.value = value;
    }

  }
}
Share Improve this question edited Mar 5 at 23:13 Munir asked Mar 5 at 22:16 MunirMunir 3,6223 gold badges21 silver badges31 bronze badges 0
Add a comment  | 

1 Answer 1

Reset to default 0

After more research, I came across https://flink.apache./2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can which is for an older version of Flink, but has the required details.

The solution for me was to remove the generic type parameter from KafkaSourceRecord and write a custom Kryo serializer for the AVRO GenericRecord class, that I added to the environment with

    streamEnv
            .getConfig()
            .addDefaultKryoSerializer(
                    GenericRecord.class, KryoSerializers.GenericRecordKryoSerializer.class);

Other suggestions from the article, like enableForceAvro(), did not work for me.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745005070a4605730.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信