apache flink - Potential Bug? UDAF together with internal aggregate function -> NullPointerException - Stack Overflow

I found an issue when using an User Defined Aggregate Function (udaf) in the same .select with an inter

I found an issue when using an User Defined Aggregate Function (udaf) in the same .select with an internal aggregate function. I get a cryptic NullPointerException as a result.

Could this be a bug or am I understanding something fundamentally wrong? I could not find anything in the documentation around this. Please find a minimal (not) working example below:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import (
    DataTypes,
    EnvironmentSettings,
    StreamTableEnvironment,
)
from pyflink.table.expressions import col, lit
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf

import pandas as pd


@udaf(result_type=DataTypes.INT(), func_type='pandas')
def last_value_agg(v: pd.Series):
    return v.iloc[-1]


@udaf(result_type=DataTypes.INT(), func_type='pandas')
def first_value_agg(v: pd.Series):
    return v.iloc[-1]


if __name__ == "__main__":
    source = """
        CREATE TABLE src_trade (
            `id` INT,
            `unix_time` INT,
            `price` INT,
            `time` AS TO_TIMESTAMP_LTZ(`unix_time` * 1000, 3),
            WATERMARK FOR `time` AS `time`
        ) WITH (
            'connector' = 'datagen',
            'fields.unix_time.start' = '0',
            'fields.unix_time.end' = '1000',
            'fields.unix_time.kind' = 'sequence',
            'fields.price.min' = '0',
            'fields.price.max' = '10',
            'fields.price.kind' = 'random',
            'fields.id.min' = '0',
            'fields.id.max' = '0',
            'rows-per-second' = '1'
        )
    """

    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)

    env_settings = (
        EnvironmentSettings.new_instance()
        .in_streaming_mode()
        .build()
    )
    t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
    t_env.execute_sql(source)

    (
        t_env.from_path('src_trade')
        .window(Tumble.over(lit(1).seconds).on(col('time')).alias('w'))
        .group_by(col('id'), col('w'))
        .select(
            col('id'),
            col('w').end,
            last_value_agg(col('price')),
            first_value_agg(col('price')),
            col('price').max,  # when commenting this line, everything works as expected
        )
        .execute().print()
    )

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744662266a4586532.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信