pyspark - Parallelism for many dataframes with sparksqlcompare - Stack Overflow

I have many number of data frames up to 10. I need to compare 10 data frames with other 10 data frames

I have many number of data frames up to 10. I need to compare 10 data frames with other 10 data frames which are independent to each other. Each of data frame has a specific keys to be compared. pyspark has inbuilt function which compares two data frames called sparksqlcompare.

This function works well for huge data and there are up to 1 million records. It takes much time to compare these data frames sequentially. I want to compare all the data frames at once. here is the code.

def compare(spark: SparkSession, process_parameters: ProcessParameters, parsed_dfs_1: Dict[str, DataFrame],
            parsed_dfs_2: Dict[str, DataFrame], selected_ids: List[str]) -> List[Dict[str, Any]]:
    error_list = []
    comparison_config = process_parameters.config['comparison']

    def process_record_type(record_type, value, df2_value):
        thread_name = threading.current_thread().name
        logging.info(f"{thread_name} starting comparison for {record_type}")
        if (value is not None or df2_value is not None) and record_type in comparison_config['comparisonKey']:
            # Perform the comparison
            errors.extend(compare_dataframes(spark, value, df2_value, comparison_config, record_type))
            logging.info(f"Completed comparison for record_type: {record_type}")
            return errors
        return []

    # To Run comparisons in parallel for each record_type
    with ThreadPoolExecutor(max_workers=6) as executor:
        future_to_record_type = {
            executor.submit(process_record_type, record_type, value, parsed_dfs_2.get(record_type)): record_type
            for record_type, value in parsed_dfs_1.items()
        }
        for future in as_completed(future_to_record_type):
            record_type = future_to_record_type[future]
            try:
                error_list.extend(future.result())
                logging.info(f"Successfully  completed test for {record_type}")
            except Exception as e:
                logging.error(f"Error processing record_type {record_type}: {e}")
    logging.info("All comparisons completed")
    return error_list

def compare_dataframes(spark: SparkSession, df1: DataFrame, df2: DataFrame, parameters:Dict[str, Any],record_type: str) -> List[Dict[str, Any]]:
    compare_obj = SparkSQLCompare(spark, df1, df2, join_columns=parameters['comparisonKey'][record_type],
                                  ignore_case=True, ignore_spaces=True, abs_tol=1.1)
    mismatches_list = find_mismatches_and_matches(compare_obj, parameters, record_type)
    return mismatches_list

The code is implemented for parallelism using thread pool. This code is not working fine when max workers is 6 but when max workers is 1 is working fine. when max workers is increased one of the data frames is not compared properly and is saying key not found.

Error looks like: Error processing record_type tax_monthly: ('Column not found: %s', 'taxmonthlydetail_totaltaxyamt_df1') Everytime we run different data frame or record type fails. could someone give me a suggestion to run all the dataframes at once. any other way approach instead of parallelism.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744653790a4586058.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信