I am testing a Flink job that reads data from Kafka and writes to files.
When running an integration test with Junit, I get the following error: Caused by: java.lang.IllegalAccessError: class com.fasterxml.jackson.databind.node.NullNodeConstructorAccess tried to access protected method 'void com.fasterxml.jackson.databind.node.NullNode.<init>()' (com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is in unnamed module of loader com.esotericsoftware.reflectasm.AccessClassLoader @7a1a4114; com.fasterxml.jackson.databind.node.NullNode is in unnamed module of loader 'app')
After running the debugger, I find that this happens when deserializing a null value in data received from Kafka. But I do not understand why this an error. Seemingly, both classes are in the same package, so the constructor should be accessible. However, it also looks like com.fasterxml.jackson.databind.node.NullNodeConstructorAccess
is not a real class, and is a class name generated by Kryo to do some reflection.
How do I fix this?
MVCE for the error. Requires software.amazon.glue:schema-registry-flink-serde
along with the Flink dependencies.
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroDeserializationSchema;
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema;
import java.io.IOException;
import java.util.Map;
import .apache.avro.Schema;
import .apache.avro.generic.GenericData;
import .apache.avro.generic.GenericRecord;
import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.apimon.typeinfo.TypeHint;
import .apache.flink.apimon.typeinfo.TypeInformation;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import .apache.flink.formats.avro.AvroDeserializationSchema;
import .apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import .apache.flink.streaming.api.datastream.DataStream;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.test.util.MiniClusterWithClientResource;
import .apache.flink.util.Collector;
import .apache.kafka.clients.consumer.ConsumerRecord;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .junit.ClassRule;
import .junit.jupiter.api.BeforeAll;
import .junit.jupiter.api.Test;
import .testcontainers.containers.GenericContainer;
import .testcontainers.junit.jupiter.Container;
import .testcontainers.junit.jupiter.Testcontainers;
import .testcontainers.utility.DockerImageName;
@Testcontainers(disabledWithoutDocker = true, parallel = true)
public class IntegrationTestMvce {
static Schema SCHEMA =
new Schema.Parser()
.parse(
"{\"type\":\"record\", \"name\": \"event\", \"fields\":[{\"name\": \"flag\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}");
@ClassRule
public static MiniClusterWithClientResource FLINK_CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Container
static final GenericContainer<?> KAFKA =
new GenericContainer<>(DockerImageName.parse("apache/kafka")).withNetworkMode("host");
@BeforeAll
public static void setup() throws Exception {
KAFKA.execInContainer(
"/opt/kafka/bin/kafka-topics.sh",
"--bootstrap-server",
"localhost:9092",
"--create",
"--topic",
"test");
}
@Test
public void integrationTest() throws Exception {
Map<String, Object> producerConfig =
Map.of(
"bootstrap.servers", "localhost:9092",
"key.serializer", ".apache.kafkamon.serialization.ByteArraySerializer",
"value.serializer", ".apache.kafkamon.serialization.ByteArraySerializer");
GenericRecord data = new GenericData.Record(SCHEMA);
byte[] bytes = GlueSchemaRegistryAvroSerializationSchema.forGeneric(SCHEMA).serialize(data);
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig)) {
producer.send(new ProducerRecord<>("test", bytes)).get();
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaSourceRecord<GenericRecord>> source =
env.fromSource(
KafkaSource.<KafkaSourceRecord<GenericRecord>>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new TestDeserializer())
.build(),
WatermarkStrategy.noWatermarks(),
"kafka");
source.print();
env.execute();
}
static class TestDeserializer
implements KafkaRecordDeserializationSchema<KafkaSourceRecord<GenericRecord>> {
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> record, Collector<KafkaSourceRecord<GenericRecord>> out)
throws IOException {
AvroDeserializationSchema<GenericRecord> deserializer =
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(SCHEMA);
out.collect(
new KafkaSourceRecord<>(
record.topic(), record.partition(), deserializer.deserialize(record.value())));
}
@Override
public TypeInformation<KafkaSourceRecord<GenericRecord>> getProducedType() {
return TypeInformation.of(new TypeHint<>() {});
}
}
static class KafkaSourceRecord<T> {
public final String topic;
public final long partition;
public final T value;
public KafkaSourceRecord(String topic, long partition, T value) {
this.topic = topic;
this.partition = partition;
this.value = value;
}
}
}
I am testing a Flink job that reads data from Kafka and writes to files.
When running an integration test with Junit, I get the following error: Caused by: java.lang.IllegalAccessError: class com.fasterxml.jackson.databind.node.NullNodeConstructorAccess tried to access protected method 'void com.fasterxml.jackson.databind.node.NullNode.<init>()' (com.fasterxml.jackson.databind.node.NullNodeConstructorAccess is in unnamed module of loader com.esotericsoftware.reflectasm.AccessClassLoader @7a1a4114; com.fasterxml.jackson.databind.node.NullNode is in unnamed module of loader 'app')
After running the debugger, I find that this happens when deserializing a null value in data received from Kafka. But I do not understand why this an error. Seemingly, both classes are in the same package, so the constructor should be accessible. However, it also looks like com.fasterxml.jackson.databind.node.NullNodeConstructorAccess
is not a real class, and is a class name generated by Kryo to do some reflection.
How do I fix this?
MVCE for the error. Requires software.amazon.glue:schema-registry-flink-serde
along with the Flink dependencies.
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroDeserializationSchema;
import com.amazonaws.services.schemaregistry.flink.avro.GlueSchemaRegistryAvroSerializationSchema;
import java.io.IOException;
import java.util.Map;
import .apache.avro.Schema;
import .apache.avro.generic.GenericData;
import .apache.avro.generic.GenericRecord;
import .apache.flink.apimon.eventtime.WatermarkStrategy;
import .apache.flink.apimon.typeinfo.TypeHint;
import .apache.flink.apimon.typeinfo.TypeInformation;
import .apache.flink.connector.kafka.source.KafkaSource;
import .apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import .apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import .apache.flink.formats.avro.AvroDeserializationSchema;
import .apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import .apache.flink.streaming.api.datastream.DataStream;
import .apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import .apache.flink.test.util.MiniClusterWithClientResource;
import .apache.flink.util.Collector;
import .apache.kafka.clients.consumer.ConsumerRecord;
import .apache.kafka.clients.producer.KafkaProducer;
import .apache.kafka.clients.producer.ProducerRecord;
import .junit.ClassRule;
import .junit.jupiter.api.BeforeAll;
import .junit.jupiter.api.Test;
import .testcontainers.containers.GenericContainer;
import .testcontainers.junit.jupiter.Container;
import .testcontainers.junit.jupiter.Testcontainers;
import .testcontainers.utility.DockerImageName;
@Testcontainers(disabledWithoutDocker = true, parallel = true)
public class IntegrationTestMvce {
static Schema SCHEMA =
new Schema.Parser()
.parse(
"{\"type\":\"record\", \"name\": \"event\", \"fields\":[{\"name\": \"flag\", \"type\": [\"null\", \"boolean\"], \"default\": null}]}");
@ClassRule
public static MiniClusterWithClientResource FLINK_CLUSTER =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
@Container
static final GenericContainer<?> KAFKA =
new GenericContainer<>(DockerImageName.parse("apache/kafka")).withNetworkMode("host");
@BeforeAll
public static void setup() throws Exception {
KAFKA.execInContainer(
"/opt/kafka/bin/kafka-topics.sh",
"--bootstrap-server",
"localhost:9092",
"--create",
"--topic",
"test");
}
@Test
public void integrationTest() throws Exception {
Map<String, Object> producerConfig =
Map.of(
"bootstrap.servers", "localhost:9092",
"key.serializer", ".apache.kafkamon.serialization.ByteArraySerializer",
"value.serializer", ".apache.kafkamon.serialization.ByteArraySerializer");
GenericRecord data = new GenericData.Record(SCHEMA);
byte[] bytes = GlueSchemaRegistryAvroSerializationSchema.forGeneric(SCHEMA).serialize(data);
try (KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerConfig)) {
producer.send(new ProducerRecord<>("test", bytes)).get();
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<KafkaSourceRecord<GenericRecord>> source =
env.fromSource(
KafkaSource.<KafkaSourceRecord<GenericRecord>>builder()
.setBootstrapServers("localhost:9092")
.setTopics("test")
.setGroupId("test-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new TestDeserializer())
.build(),
WatermarkStrategy.noWatermarks(),
"kafka");
source.print();
env.execute();
}
static class TestDeserializer
implements KafkaRecordDeserializationSchema<KafkaSourceRecord<GenericRecord>> {
@Override
public void deserialize(
ConsumerRecord<byte[], byte[]> record, Collector<KafkaSourceRecord<GenericRecord>> out)
throws IOException {
AvroDeserializationSchema<GenericRecord> deserializer =
GlueSchemaRegistryAvroDeserializationSchema.forGeneric(SCHEMA);
out.collect(
new KafkaSourceRecord<>(
record.topic(), record.partition(), deserializer.deserialize(record.value())));
}
@Override
public TypeInformation<KafkaSourceRecord<GenericRecord>> getProducedType() {
return TypeInformation.of(new TypeHint<>() {});
}
}
static class KafkaSourceRecord<T> {
public final String topic;
public final long partition;
public final T value;
public KafkaSourceRecord(String topic, long partition, T value) {
this.topic = topic;
this.partition = partition;
this.value = value;
}
}
}
Share
Improve this question
edited Mar 5 at 23:13
Munir
asked Mar 5 at 22:16
MunirMunir
3,6223 gold badges21 silver badges31 bronze badges
0
1 Answer
Reset to default 0After more research, I came across https://flink.apache./2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can which is for an older version of Flink, but has the required details.
The solution for me was to remove the generic type parameter from KafkaSourceRecord and write a custom Kryo serializer for the AVRO GenericRecord class, that I added to the environment with
streamEnv
.getConfig()
.addDefaultKryoSerializer(
GenericRecord.class, KryoSerializers.GenericRecordKryoSerializer.class);
Other suggestions from the article, like enableForceAvro()
, did not work for me.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745005070a4605730.html
评论列表(0条)