I found an issue when using an User Defined Aggregate Function (udaf)
in the same .select
with an internal aggregate function. I get a cryptic NullPointerException
as a result.
Could this be a bug or am I understanding something fundamentally wrong? I could not find anything in the documentation around this. Please find a minimal (not) working example below:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
DataTypes,
EnvironmentSettings,
StreamTableEnvironment,
)
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf
import pandas as pd
@udaf(result_type=DataTypes.INT(), func_type='pandas')
def last_value_agg(v: pd.Series):
return v.iloc[-1]
@udaf(result_type=DataTypes.INT(), func_type='pandas')
def first_value_agg(v: pd.Series):
return v.iloc[-1]
if __name__ == "__main__":
source = """
CREATE TABLE src_trade (
`id` INT,
`unix_time` INT,
`price` INT,
`time` AS TO_TIMESTAMP_LTZ(`unix_time` * 1000, 3),
WATERMARK FOR `time` AS `time`
) WITH (
'connector' = 'datagen',
'fields.unix_time.start' = '0',
'fields.unix_time.end' = '1000',
'fields.unix_time.kind' = 'sequence',
'fields.price.min' = '0',
'fields.price.max' = '10',
'fields.price.kind' = 'random',
'fields.id.min' = '0',
'fields.id.max' = '0',
'rows-per-second' = '1'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
env_settings = (
EnvironmentSettings.new_instance()
.in_streaming_mode()
.build()
)
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
(
t_env.from_path('src_trade')
.window(Tumble.over(lit(1).seconds).on(col('time')).alias('w'))
.group_by(col('id'), col('w'))
.select(
col('id'),
col('w').end,
last_value_agg(col('price')),
first_value_agg(col('price')),
col('price').max, # when commenting this line, everything works as expected
)
.execute().print()
)
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744662266a4586532.html
评论列表(0条)