python - azure-kusto-data insert duplicated and corrupted data when some pattern '2_' in it - Stack Overflow

Using [email protected] when running an ingest query using both URI or Ingest URI, it ingests a column

Using [email protected] when running an ingest query using both URI or Ingest URI, it ingests a column with value null as soon as the previous field contains the characters 2_ in it.

I can run the query in the azure data explorer and it works fine, but using this method it ingests null in the “value” field.

azure data explorer table schema:

{
 “Name”: “kpi_table”, 
 “OrderedColumns”: 
    [{
        “Name”: “server_timestamp”, 
        “Type”: “System.DateTime”, 
        “CslType”: “datetime”
    },
    {
        “Name”: “name”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “plant_id”, 
        “Type”: “System. String”, 
        “CslType”: “string”
    },
    {
        “Name”: “device_key”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “value”, 
        “Type”: “System.Double”, 
        “CslType”: “real”
    }]
}

Data to ingest:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201

Data ingested:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, null

Performs a successful ingest and 1 second later according to ingestion_time() does another ingest with value to null.

If in the field corresponding to device_key, you replace the character 2 by 3, it ingests correctly, can it detect the pattern 2_ as special characters internally?

Code:

import pandas as pd
from azure.kusto.data import KustoClient

def __init__(self, url, database, client_id, client_secret, tenant_id):
        
        kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
            url, client_id, client_secret, tenant_id
        )
        self.conn = KustoClient(kcsb)
        
def execute_kql_query(self, query: str):
    try:
        logger.debug("Executing KQL query: %s", query)
        response = self.conn.execute(self.database, query)
        if response.get_exceptions():
            raise Exception(response.get_exceptions())
        return dataframe_from_result_table(response.primary_results[0])
    except Exception as error:
        logger.error("Error executing KQL query: %s", error)
        return pd.DataFrame()

The query is a string with an .inline ingest, and although I know it is not recommended for production environments, the error is also reproduced with streaming ingest.

Using [email protected] when running an ingest query using both URI or Ingest URI, it ingests a column with value null as soon as the previous field contains the characters 2_ in it.

I can run the query in the azure data explorer and it works fine, but using this method it ingests null in the “value” field.

azure data explorer table schema:

{
 “Name”: “kpi_table”, 
 “OrderedColumns”: 
    [{
        “Name”: “server_timestamp”, 
        “Type”: “System.DateTime”, 
        “CslType”: “datetime”
    },
    {
        “Name”: “name”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “plant_id”, 
        “Type”: “System. String”, 
        “CslType”: “string”
    },
    {
        “Name”: “device_key”, 
        “Type”: “System.String”, 
        “CslType”: “string”
    },
    {
        “Name”: “value”, 
        “Type”: “System.Double”, 
        “CslType”: “real”
    }]
}

Data to ingest:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201

Data ingested:
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, 0.0201
2025-03-13T22:06:00Z, kpi_inverter_production, #ESSSAB2, #ESSSAB2_T010JPK01KKP03, null

Performs a successful ingest and 1 second later according to ingestion_time() does another ingest with value to null.

If in the field corresponding to device_key, you replace the character 2 by 3, it ingests correctly, can it detect the pattern 2_ as special characters internally?

Code:

import pandas as pd
from azure.kusto.data import KustoClient

def __init__(self, url, database, client_id, client_secret, tenant_id):
        
        kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
            url, client_id, client_secret, tenant_id
        )
        self.conn = KustoClient(kcsb)
        
def execute_kql_query(self, query: str):
    try:
        logger.debug("Executing KQL query: %s", query)
        response = self.conn.execute(self.database, query)
        if response.get_exceptions():
            raise Exception(response.get_exceptions())
        return dataframe_from_result_table(response.primary_results[0])
    except Exception as error:
        logger.error("Error executing KQL query: %s", error)
        return pd.DataFrame()

The query is a string with an .inline ingest, and although I know it is not recommended for production environments, the error is also reproduced with streaming ingest.

Share Improve this question asked Mar 14 at 13:39 cperez-mcperez-m 31 silver badge2 bronze badges 2
  • Ensure all fields are properly formatted as strings before ingestion to avoid misinterpretation of "2_" @cperez-m – Vinay B Commented Mar 17 at 5:27
  • @cperez-m Where is the data coming from to Azure Data Explorer? – Sampath Commented Mar 17 at 9:12
Add a comment  | 

1 Answer 1

Reset to default 2

ingests a column with value null as soon as the previous field contains the characters 2_ in it.

Duplicate records were added, which caused the issue of null values occurring when '2_' appeared in the device key. This only occurred when using the azure-kusto-data library, not in the Azure UI.

This code verifies whether the '2_' pattern in the device_key creates any ingestion issues, takes new data while avoiding duplicates, and looks for existing data in the Azure Data Explorer database.

import os
import pandas as pd
import logging
import tempfile
from azure.kusto.data import KustoConnectionStringBuilder, KustoClient
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor
from azure.kusto.ingest.ingestion_properties import DataFormat
from azure.kusto.data.helpers import dataframe_from_result_table
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
CLUSTER_URL = "<Your URL>"
DATABASE = "<Your database Name>"
TABLE_NAME = "<Your table Name>"
CLIENT_ID = "<Your clientID>"
CLIENT_SECRET = "<Your clientSecret>"
TENANT_ID = "<Your tenantID>"
if not CLIENT_ID or not CLIENT_SECRET or not TENANT_ID:
    logger.error("Missing Azure credentials.")
    exit(1)
try:
    kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
        CLUSTER_URL, CLIENT_ID, CLIENT_SECRET, TENANT_ID
    )
    kusto_client = KustoClient(kcsb)
    ingest_client = QueuedIngestClient(kcsb)
    logger.info("Connected to Azure Data Explorer.")
except Exception as e:
    logger.error(f"Failed to connect to Kusto: {e}")
    exit(1)
create_table_query = f"""
.create table {TABLE_NAME} (
    server_timestamp: datetime,
    name: string,
    plant_id: string,
    device_key: string,
    value: real
)
"""
try:
    kusto_client.execute_mgmt(DATABASE, create_table_query) 
    logger.info("Table check completed (created if not exists).")
except Exception as e:
    logger.error(f"Error creating table: {e}")
    exit(1)
data = [
    ("2025-03-13T22:06:00Z", "kpi_inverter_production", "#ESSSAB2", "#ESSSAB2_T010JPK01KKP03", 0.0201),
    ("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0301),
    ("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0401),
]
df = pd.DataFrame(data, columns=["server_timestamp", "name", "plant_id", "device_key", "value"])
df["server_timestamp"] = pd.to_datetime(df["server_timestamp"], utc=True)  # Ensure correct format
logger.info("Checking for existing data...")
existing_data_query = f"""
{TABLE_NAME}
| where server_timestamp in ({', '.join(['datetime("' + ts.strftime('%Y-%m-%dT%H:%M:%S') + '")' for ts in df["server_timestamp"]])})
| project server_timestamp, name, plant_id, tostring(device_key), value
"""
try:
    response = kusto_client.execute(DATABASE, existing_data_query)
    df_existing = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
except Exception as e:
    logger.error(f"Error querying existing data: {e}")
    df_existing = pd.DataFrame()

if not df_existing.empty:
    df_existing["server_timestamp"] = pd.to_datetime(df_existing["server_timestamp"], utc=True)
df_to_ingest = df.copy()
if not df_existing.empty:
    df_to_ingest = df.merge(df_existing, on=["server_timestamp", "name", "plant_id", "device_key", "value"], how="left", indicator=True)
    df_to_ingest = df_to_ingest[df_to_ingest["_merge"] == "left_only"].drop(columns=["_merge"])
if df_to_ingest.empty:
    logger.info("All data already exists. Skipping ingestion.")
else:
    logger.info(f"{len(df_to_ingest)} new rows to ingest.")
    with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp_file:
        df_to_ingest.to_csv(tmp_file.name, index=False, header=False, encoding="utf-8", sep=",")
        tmp_filename = tmp_file.name
    ingestion_props = IngestionProperties(
        database=DATABASE,
        table=TABLE_NAME,
        data_format=DataFormat.CSV,
        flush_immediately=True
    )
    try:
        file_desc = FileDescriptor(tmp_filename)
        ingest_client.ingest_from_file(file_desc, ingestion_props)
        logger.info("Data ingestion successful!")
    except Exception as e:
        logger.error(f"Error during ingestion: {e}")

    os.remove(tmp_filename)
logger.info("Validating for ingestion issues related to '2_' pattern...")

validation_query = f"""
{TABLE_NAME}
| where device_key contains "2_" and isnull(value)
| project server_timestamp, name, plant_id, device_key, value
"""
try:
    response = kusto_client.execute(DATABASE, validation_query)
    df_validation = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
    if not df_validation.empty:
        logger.warning("Warning: Found ingested rows with null values in 'value' field and device_key containing '2_'.")
        logger.warning(df_validation)
    else:
        logger.info("No null values detected in 'value' field for device_key containing '2_'.")
except Exception as e:
    logger.error(f"Error running validation query: {e}")
query = f"{TABLE_NAME} | take 10"
try:
    response = kusto_client.execute(DATABASE, query)
    df_result = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()

    logger.info("\nIngested Data:")
    logger.info(df_result)
except Exception as e:
    logger.error(f"Error fetching ingested data: {e}")

Resolved it by verifying that the timestamps were accurate, determining whether the data already existed before adding new rows, and performing tests after the data was ingested to identify any problems.

Outputs:

The 'value' field appears to have no null values where the 'device_key' contains '2_'.

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744653481a4586041.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信