I am trying to create a Scalar Python UDF (.html ) to parse a VARBYTE column in Redshift.
Sample code:
CREATE OR REPLACE FUNCTION decode_kafka_event_protobuf(proto_data
varchar(MAX)) RETURNS VARCHAR(MAX) VOLATILE as $$
import base64
from com.example.events import kafka_event_pb2
binary_str = base64.b64decode(proto_data)
proto_obj = kafka_event_pb2.ParseFromString(bytes(binary_str))
# some code to convert proto_obj to JSON string and return it
return json;
$$ LANGUAGE plpythonu;
As Redshift doesn't support VARBYTE
arguments inside Python UDF, I am converting the VARBYTE
column (kafka_value
) to base64 encoded string using FROM_VARBYTE
function as in below sample query:
SELECT FROM_VARBYTE(kafka_value, 'base64')) FROM my_table;
The question here is what's the difference between:
SELECT FROM_VARBYTE(kafka_value, 'base64') from my_table;
-- returns the base64 encoded string. BASE64_STR = abcd....
SELECT decode_kafka_event_protobuf('abcd....') from my_table;
-- works and returns the JSON
VS
SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'base64')) from my_table;
-- fails with the Decode error
Also, If I copy the BASE64_STR string inside UDF function (hardcode it's value), the parser works just fine.
I tried comparing the hardcoded BASE64_STR
and proto_data
with all kind of comparisons (including checking types, comparing string character by character, converting them to bytes and compare, converting them to hex strings and compare) but they appear identical.
So is there any non-documented details about the VARCHAR
argument which can help me here, that would be really great. Thank you in advance!
EDIT 1
Here is the error message:
this is the full error from svl_udf_log:
line 130, in decode_kafka_event_protobuf\n
File "0.py", line 116, in decode_protobuf\n
File "message.py", line 199, in ParseFromString\n
return self.MergeFromString(serialized)\n
File "python_message.py", line 1134, in MergeFromString\n
if self._InternalParse(serialized, 0, length) != length:\n
File "python_message.py", line 1201, in InternalParse\n
pos = field_decoder(buffer, new_pos, end, self, field_dict)\n
File "decoder.py", line 738, in DecodeField\n
if value._InternalParse(buffer, pos, new_pos) != new_pos:\n
File "python_message.py", line 1188, in InternalParse\n
buffer, new_pos, wire_type) # pylint: disable=protected-access\n
File "decoder.py", line 977, in _DecodeUnknownField\n
raise _DecodeError('Wrong wire type in tag.')\n
Also, I tried using the Hexadecimal string format instead of base64 like this (also changed the UDF code to use binascii.unhexlify()
to convert the Hex to binary):
SELECT decode_kafka_event_protobuf(FROM_VARBYTE(kafka_value, 'hex')) from my_table;
but still got the same error.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744731386a4590488.html
评论列表(0条)