java - Getting Error while deserializing message from kafka using Avro Schema - Stack Overflow

I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though t

I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.

Code

import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;

public class AvroUtility {
    
  private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);

  public SpecificDatumReader<GenericRecord> datumReader() {
    String valueSchemaString = null;
    valueSchemaString = "my schema in form of json string"
    Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
    return datumReader;
  }
}

We used the above AvroUtility class in below code

ConsumerRecord<String, byte[]> record

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

We are getting below error

.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
        at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at com.myapp.controller.Replicator.replicator(Replicator.java:104)
        at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
        at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)

I am trying to deserialize a kafka record consumed from kafka topic using the below code, even though the schema is correct we are facing the error. can you suggest what else could be wrong.

Code

import .apache.avro.Schema;
import .apache.avro.generic.GenericRecord;
import .apache.avro.specific.SpecificDatumReader;

public class AvroUtility {
    
  private static final .slf4j.Logger log = .slf4j.LoggerFactory.getLogger(Replicator.class);

  public SpecificDatumReader<GenericRecord> datumReader() {
    String valueSchemaString = null;
    valueSchemaString = "my schema in form of json string"
    Schema avroValueSchema = new Schema.Parser().parse(valueSchemaString);
    SpecificDatumReader<GenericRecord> datumReader = new SpecificDatumReader<>(avroValueSchema);
    return datumReader;
  }
}

We used the above AvroUtility class in below code

ConsumerRecord<String, byte[]> record

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

We are getting below error

.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
        at .apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:308)
        at .apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:208)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
        at .apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
        at .apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at .apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
        at .apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at .apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at com.myapp.controller.Replicator.lambda$replicator$0(Replicator.java:116)
        at java.lang.Iterable.forEach(Iterable.java:75)
        at com.myapp.controller.Replicator.replicator(Replicator.java:104)
        at com.myapp.SpringBootWithKafkaApplication.main(SpringBootWithKafkaApplication.java:21)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at .springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:48)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:87)
        at .springframework.boot.loader.Launcher.launch(Launcher.java:51)
        at .springframework.boot.loader.PropertiesLauncher.main(PropertiesLauncher.java:597)
Share Improve this question edited Apr 1 at 4:05 Akash Rai asked Mar 30 at 17:34 Akash RaiAkash Rai 335 bronze badges 1
  • Went through producer code and found that kafka producer has been configured with ("value.serializer", KafkaAvroSerializer.class.getName()); and our consumer is using ByteArrayDeserializer Could be issue right? If we use KafkaAvroDeserializer.class.getName(), can i expect it to be in json? – Akash Rai Commented Mar 31 at 2:23
Add a comment  | 

1 Answer 1

Reset to default 1

Here you are fetting one thing:

ConsumerRecord<String, byte[]> record;

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(record.value(), null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

The binary payload that is passing into binaryDecoder includes extra bytes :

[Magic][4-byte_schemaID][Avro bytes]

The datumReader doesn't expect this, thou causing the failure. This would work if you were using raw Avro, but as you said you serialize with the KafkaAvroSerializer

So, in order to read properly the payload, you could:

byte[] kafkaPayload = record.value();

int schemaRegistryHeaderLength = 5;  //header bytes
byte[] avroData = Arrays.copyOfRange(kafkaPayload, schemaRegistryHeaderLength, 
kafkaPayload.length);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(avroData, null);
GenericRecord deserializedValue = datumReader.read(null, binaryDecoder);

If using Confluent, you could also use the AvroDeserializer:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;

KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer();
Properties props = new Properties();
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "yourRegistry:port");

deserializer.configure(props, false);

GenericRecord deserializedRecord = (GenericRecord) deserializer.deserialize("topic", record.value());

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1743980107a4538692.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信