I have many number of data frames up to 10. I need to compare 10 data frames with other 10 data frames which are independent to each other. Each of data frame has a specific keys to be compared. pyspark has inbuilt function which compares two data frames called sparksqlcompare.
This function works well for huge data and there are up to 1 million records. It takes much time to compare these data frames sequentially. I want to compare all the data frames at once. here is the code.
def compare(spark: SparkSession, process_parameters: ProcessParameters, parsed_dfs_1: Dict[str, DataFrame],
parsed_dfs_2: Dict[str, DataFrame], selected_ids: List[str]) -> List[Dict[str, Any]]:
error_list = []
comparison_config = process_parameters.config['comparison']
def process_record_type(record_type, value, df2_value):
thread_name = threading.current_thread().name
logging.info(f"{thread_name} starting comparison for {record_type}")
if (value is not None or df2_value is not None) and record_type in comparison_config['comparisonKey']:
# Perform the comparison
errors.extend(compare_dataframes(spark, value, df2_value, comparison_config, record_type))
logging.info(f"Completed comparison for record_type: {record_type}")
return errors
return []
# To Run comparisons in parallel for each record_type
with ThreadPoolExecutor(max_workers=6) as executor:
future_to_record_type = {
executor.submit(process_record_type, record_type, value, parsed_dfs_2.get(record_type)): record_type
for record_type, value in parsed_dfs_1.items()
}
for future in as_completed(future_to_record_type):
record_type = future_to_record_type[future]
try:
error_list.extend(future.result())
logging.info(f"Successfully completed test for {record_type}")
except Exception as e:
logging.error(f"Error processing record_type {record_type}: {e}")
logging.info("All comparisons completed")
return error_list
def compare_dataframes(spark: SparkSession, df1: DataFrame, df2: DataFrame, parameters:Dict[str, Any],record_type: str) -> List[Dict[str, Any]]:
compare_obj = SparkSQLCompare(spark, df1, df2, join_columns=parameters['comparisonKey'][record_type],
ignore_case=True, ignore_spaces=True, abs_tol=1.1)
mismatches_list = find_mismatches_and_matches(compare_obj, parameters, record_type)
return mismatches_list
The code is implemented for parallelism using thread pool. This code is not working fine when max workers is 6 but when max workers is 1 is working fine. when max workers is increased one of the data frames is not compared properly and is saying key not found.
Error looks like: Error processing record_type tax_monthly: ('Column not found: %s', 'taxmonthlydetail_totaltaxyamt_df1') Everytime we run different data frame or record type fails. could someone give me a suggestion to run all the dataframes at once. any other way approach instead of parallelism.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744653790a4586058.html
评论列表(0条)