AWS Redshift VARCHAR argument inside Scalar Python UDF inconsistency - Stack Overflow

I am trying to create a Scalar Python UDF (.html ) to parse a VARBYTE column in Redshift.Sample code:

I am trying to create a Scalar Python UDF (.html ) to parse a VARBYTE column in Redshift.

Sample code:

CREATE OR REPLACE FUNCTION decode_kafka_event_protobuf(proto_data
varchar(MAX)) RETURNS VARCHAR(MAX) VOLATILE as $$

import base64
from com.example.events import kafka_event_pb2
binary_str = base64.b64decode(proto_data)
proto_obj = kafka_event_pb2.ParseFromString(bytes(binary_str))
# some code to convert proto_obj to JSON string and return it
return json;
$$ LANGUAGE plpythonu;

As Redshift doesn't support VARBYTE arguments inside Python UDF, I am converting the VARBYTE column (kafka_value) to base64 encoded string using FROM_VARBYTE function as in below sample query:

SELECT FROM_VARBYTE(kafka_value, 'base64')) FROM my_table;

The question here is what's the difference between:

SELECT FROM_VARBYTE(kafka_value, 'base64') from my_table;
-- returns the base64 encoded string. BASE64_STR = abcd....
SELECT decode_kafka_event_protobuf('abcd....') from my_table; 
-- works and returns the JSON

VS

SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'base64')) from my_table;
-- fails with the Decode error

Also, If I copy the BASE64_STR string inside UDF function (hardcode it's value), the parser works just fine.

I tried comparing the hardcoded BASE64_STR and proto_data with all kind of comparisons (including checking types, comparing string character by character, converting them to bytes and compare, converting them to hex strings and compare) but they appear identical.

So is there any non-documented details about the VARCHAR argument which can help me here, that would be really great. Thank you in advance!

EDIT 1

Here is the error message:

this is the full error from svl_udf_log:
line 130, in decode_kafka_event_protobuf\n
  File "0.py", line 116, in decode_protobuf\n
  File "message.py", line 199, in ParseFromString\n
    return self.MergeFromString(serialized)\n
      File "python_message.py", line 1134, in MergeFromString\n
if self._InternalParse(serialized, 0, length) != length:\n
  File "python_message.py", line 1201, in InternalParse\n
pos = field_decoder(buffer, new_pos, end, self, field_dict)\n
      File "decoder.py", line 738, in DecodeField\n

if value._InternalParse(buffer, pos, new_pos) != new_pos:\n
  File "python_message.py", line 1188, in InternalParse\n
    buffer, new_pos, wire_type)  # pylint: disable=protected-access\n
  File "decoder.py", line 977, in _DecodeUnknownField\n
    raise _DecodeError('Wrong wire type in tag.')\n 

Also, I tried using the Hexadecimal string format instead of base64 like this (also changed the UDF code to use binascii.unhexlify() to convert the Hex to binary):

SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'hex')) from my_table;

but still got the same error.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744731386a4590488.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信