python - Apache_beam write to big query json payload error - Stack Overflow

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date in the data, in Dataflow.

The tables are being created, with the correct name, but the data is failing to be written, citing the below formatting error.

"Unknown name "json" at 'rows[0]': Proto field is not repeating, cannot start list"

The print record stage provides this log, immediately before writetobigquery is called - which to me looks correct:

About to write to BigQuery - Table: PROJECT_ID:DATASET_NAME.TABLE_NAME, Record: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]

(for clarity i have tried removing square brackets, with the same result)

Here is the pipeline code

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
    table_name, record = element
    logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
    return element 

class SplitByParameter(beam.DoFn):
    def process(self, element):
        event_name = element['event_name']
        event_date = element['event_date']
        yield (event_name, event_date, element)

def format_table_name(element):
    event_name, event_date, record = element
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name, record


def split_records(element):
    table_name, record = element

    
    json_record = [{
    'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
    'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
    'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
    'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
    }]

    yield (table_name,json_record)

def print_record(record):
    logging.info(f"Record before WriteToBigQuery: {record}")
    return record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    # Read from BigQuery, apply windowing, and process records
    (p
     | 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
        SELECT *
        FROM `PROJECT_ID.DATASET.TABLE`
        WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
       ''', use_standard_sql=True)
     | 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
     | 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
     | 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
     | 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'LogBeforeWrite' >> beam.Map(log_before_write)
     | 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=lambda x: x[0],  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

Not sure where to go from here...

I've tried a bunch of different formatting, but there's only so many configs that have a chance of working.

Share Improve this question edited Nov 18, 2024 at 21:26 danronmoon 3,8735 gold badges36 silver badges58 bronze badges asked Nov 18, 2024 at 10:54 Rick RicklesRick Rickles 112 bronze badges 1
  • I do not think you can pass (table_name,json_record) to WriteToBigQuery. If your table name is not changed, follow this example: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/…. If your table name is dynamic based on the input data, check this: github/GoogleCloudPlatform/dataflow-cookbook/blob/main/… – XQ Hu Commented Nov 18, 2024 at 16:08
Add a comment  | 

1 Answer 1

Reset to default 0

WriteToBigQuery expects to process json fields, but in this pipeline it is receiving a tuple of (table_name, json_field).

The PCollection parsed by WriteToBigQuery needs to be a json field, and the table name function should genererate the table name using the json field as an input like

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import logging

def format_table_name(element):
    event_name = element['event_name']
    event_date = element['event_date']
    sanitized_event_name = event_name.replace(' ', '_')
    sanitized_event_date = event_date.replace(' ', '_')
    table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
    return table_name


def split_records(element):
    json_record = {
    'event_name': str(element.get('event_name', '')) if element.get('event_name') is not None else '',
    'event_date': str(element.get('event_date', '')) if element.get('event_date') is not None else '',
    'user_id': str(element.get('user_id', '')) if element.get('user_id') is not None else '',
    'platform': str(element.get('platform', '')) if element.get('platform') is not None else ''
    }

    yield json_record

def run(argv=None):
    options = PipelineOptions(argv)
    options.view_as(StandardOptions).streaming = True
    p = beam.Pipeline(options=options)

    # Define schema for BigQuery (this needs to match your record structure)
    schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

    (p
     | beam.Create([
         {
            'event_name': 'scroll',
            'event_date': '20241118',
            'user_id': 'user1',
            'platform': 'WEB'
        },
        {
            'event_name': 'click',
            'event_date': '20241117',
            'user_id': 'user2',
            'platform': 'WEB'
        }
     ])
     | 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
     | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           table=format_table_name,  # Table name is the first element of the tuple
           schema=schema,  # Use the schema defined above
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
       )
    )

    p.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745624376a4636716.html

相关推荐

  • python - Apache_beam write to big query json payload error - Stack Overflow

    Writing a pipeline that splits a stream into tables dynamically named by the event_name and event_date

    6小时前
    10

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信