python - Why are date and time data not mappinguploading to PostgreSQL with the correct datatype? - Stack Overflow

I'm working with a PostgreSQL database in Python using psycopg2 and pandas. My workflow involves r

I'm working with a PostgreSQL database in Python using psycopg2 and pandas. My workflow involves reading data into Python, uploading it to the database, retrieving it back into Python, and then updating the database. To ensure data integrity, I aim to create a comprehensive mapping between pandas data types and PostgreSQL data types. Currently, I'm encountering an issue where date and time data (including both day and hour) are not being uploaded to PostgreSQL with the correct data type.

I'm utilizing the following function to map pandas data types to PostgreSQL data types:


def map_dtype(dtype):
    if np.issubdtype(dtype, np.integer):
        return 'BIGINT'  # Maps to PostgreSQL's BIGINT for large integers
    elif np.issubdtype(dtype, np.floating):
        return 'DOUBLE PRECISION'  # Maps to PostgreSQL's DOUBLE PRECISION for floating-point numbers
    elif np.issubdtype(dtype, np.datetime64):
        return 'TIMESTAMP'  # Maps to PostgreSQL's TIMESTAMP for date and time
    elif np.issubdtype(dtype, np.bool_):
        return 'BOOLEAN'  # Maps to PostgreSQL's BOOLEAN for boolean values
    else:
        return 'TEXT'  # Default mapping to PostgreSQL's TEXT for other types

This is the function I'm using to upload data to DB:

def upload_to_db(path_to_config, tbl_name, col_str, dataframe):
    """Upload a DataFrame to the specified database."""
    conn = connect_to_db(path_to_config)
    if not conn:
        return

    cursor = conn.cursor()

    try:
        # Get the current user
        cursor.execute("SELECT current_user;")
        current_user = cursor.fetchone()[0]

        # Define schema based on user
        schema = "developer_schema" if current_user == "dev1" else "public"
        full_table_name = f"{schema}.{tbl_name}"

        cursor.execute(f"DROP TABLE IF EXISTS {full_table_name};")
                # Generate the CREATE TABLE statement with explicit data types

        cursor.execute(f"CREATE TABLE {full_table_name} ({col_str});")
        print(f'Table {full_table_name} was created successfully.')

        csv_file_path = f"{tbl_name}.csv"
        dataframe.to_csv(csv_file_path, header=True, index=False, encoding='utf-8')

        with open(csv_file_path, 'r') as my_file:
            print('File opened in memory.')
            sql_statement = f"""
            COPY {full_table_name} FROM STDIN WITH
                CSV
                HEADER
                DELIMITER ',' 
            """
            cursor.copy_expert(sql=sql_statement, file=my_file)
            print('File copied to the database.')

        cursor.execute(f"GRANT SELECT ON TABLE {full_table_name} TO public;")
        connmit()

        print(f'Table {full_table_name} imported to the database successfully.')
    except Exception as e:
        print(f"Error during upload: {e}")
        conn.rollback()
    finally:
        cursor.close()
        conn.close()

        if os.path.exists(csv_file_path):
            try:
                os.remove(csv_file_path)
                print(f"Temporary file {csv_file_path} has been deleted.")
            except Exception as e:
                print(f"Error deleting file {csv_file_path}: {e}")

Is this the best approach? Do you have any suggestions? Ps. I don't want to use sqlalchemy because is too slow for large dataset (my case).

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744404412a4572555.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信