Using [email protected]
when running an ingest query using both URI or Ingest URI, it ingests a column with value null as soon as the previous field contains the characters 2_
in it.
I can run the query in the azure data explorer and it works fine, but using this method it ingests null in the “value” field.
azure data explorer table schema:
{
“Name”: “kpi_table”,
“OrderedColumns”:
[{
“Name”: “server_timestamp”,
“Type”: “System.DateTime”,
“CslType”: “datetime”
},
{
“Name”: “name”,
“Type”: “System.String”,
“CslType”: “string”
},
{
“Name”: “plant_id”,
“Type”: “System. String”,
“CslType”: “string”
},
{
“Name”: “device_key”,
“Type”: “System.String”,
“CslType”: “string”
},
{
“Name”: “value”,
“Type”: “System.Double”,
“CslType”: “real”
}]
}
Data to ingest:
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, 0.0201
Data ingested:
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, 0.0201
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, null
Performs a successful ingest and 1 second later according to ingestion_time()
does another ingest with value to null
.
If in the field corresponding to device_key
, you replace the character 2
by 3
, it ingests correctly, can it detect the pattern 2_
as special characters internally?
Code:
import pandas as pd
from azure.kusto.data import KustoClient
def __init__(self, url, database, client_id, client_secret, tenant_id):
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
url, client_id, client_secret, tenant_id
)
self.conn = KustoClient(kcsb)
def execute_kql_query(self, query: str):
try:
logger.debug("Executing KQL query: %s", query)
response = self.conn.execute(self.database, query)
if response.get_exceptions():
raise Exception(response.get_exceptions())
return dataframe_from_result_table(response.primary_results[0])
except Exception as error:
logger.error("Error executing KQL query: %s", error)
return pd.DataFrame()
The query is a string with an .inline ingest
, and although I know it is not recommended for production environments, the error is also reproduced with streaming ingest.
Using [email protected]
when running an ingest query using both URI or Ingest URI, it ingests a column with value null as soon as the previous field contains the characters 2_
in it.
I can run the query in the azure data explorer and it works fine, but using this method it ingests null in the “value” field.
azure data explorer table schema:
{
“Name”: “kpi_table”,
“OrderedColumns”:
[{
“Name”: “server_timestamp”,
“Type”: “System.DateTime”,
“CslType”: “datetime”
},
{
“Name”: “name”,
“Type”: “System.String”,
“CslType”: “string”
},
{
“Name”: “plant_id”,
“Type”: “System. String”,
“CslType”: “string”
},
{
“Name”: “device_key”,
“Type”: “System.String”,
“CslType”: “string”
},
{
“Name”: “value”,
“Type”: “System.Double”,
“CslType”: “real”
}]
}
Data to ingest:
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, 0.0201
Data ingested:
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, 0.0201
2025-03-13T22:06:00Z
, kpi_inverter_production
, #ESSSAB2
, #ESSSAB2_T010JPK01KKP03
, null
Performs a successful ingest and 1 second later according to ingestion_time()
does another ingest with value to null
.
If in the field corresponding to device_key
, you replace the character 2
by 3
, it ingests correctly, can it detect the pattern 2_
as special characters internally?
Code:
import pandas as pd
from azure.kusto.data import KustoClient
def __init__(self, url, database, client_id, client_secret, tenant_id):
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
url, client_id, client_secret, tenant_id
)
self.conn = KustoClient(kcsb)
def execute_kql_query(self, query: str):
try:
logger.debug("Executing KQL query: %s", query)
response = self.conn.execute(self.database, query)
if response.get_exceptions():
raise Exception(response.get_exceptions())
return dataframe_from_result_table(response.primary_results[0])
except Exception as error:
logger.error("Error executing KQL query: %s", error)
return pd.DataFrame()
The query is a string with an .inline ingest
, and although I know it is not recommended for production environments, the error is also reproduced with streaming ingest.
- Ensure all fields are properly formatted as strings before ingestion to avoid misinterpretation of "2_" @cperez-m – Vinay B Commented Mar 17 at 5:27
- @cperez-m Where is the data coming from to Azure Data Explorer? – Sampath Commented Mar 17 at 9:12
1 Answer
Reset to default 2ingests a column with value null as soon as the previous field contains the characters
2_
in it.
Duplicate records were added, which caused the issue of null values occurring when '2_' appeared in the device key. This only occurred when using the azure-kusto-data library, not in the Azure UI.
This code verifies whether the '2_' pattern in the device_key creates any ingestion issues, takes new data while avoiding duplicates, and looks for existing data in the Azure Data Explorer database.
import os
import pandas as pd
import logging
import tempfile
from azure.kusto.data import KustoConnectionStringBuilder, KustoClient
from azure.kusto.ingest import QueuedIngestClient, IngestionProperties, FileDescriptor
from azure.kusto.ingest.ingestion_properties import DataFormat
from azure.kusto.data.helpers import dataframe_from_result_table
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
logger = logging.getLogger(__name__)
CLUSTER_URL = "<Your URL>"
DATABASE = "<Your database Name>"
TABLE_NAME = "<Your table Name>"
CLIENT_ID = "<Your clientID>"
CLIENT_SECRET = "<Your clientSecret>"
TENANT_ID = "<Your tenantID>"
if not CLIENT_ID or not CLIENT_SECRET or not TENANT_ID:
logger.error("Missing Azure credentials.")
exit(1)
try:
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(
CLUSTER_URL, CLIENT_ID, CLIENT_SECRET, TENANT_ID
)
kusto_client = KustoClient(kcsb)
ingest_client = QueuedIngestClient(kcsb)
logger.info("Connected to Azure Data Explorer.")
except Exception as e:
logger.error(f"Failed to connect to Kusto: {e}")
exit(1)
create_table_query = f"""
.create table {TABLE_NAME} (
server_timestamp: datetime,
name: string,
plant_id: string,
device_key: string,
value: real
)
"""
try:
kusto_client.execute_mgmt(DATABASE, create_table_query)
logger.info("Table check completed (created if not exists).")
except Exception as e:
logger.error(f"Error creating table: {e}")
exit(1)
data = [
("2025-03-13T22:06:00Z", "kpi_inverter_production", "#ESSSAB2", "#ESSSAB2_T010JPK01KKP03", 0.0201),
("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0301),
("2025-03-13T22:07:00Z", "kpi_inverter_production", "#ESSSAB3", "#ESSSAB3_T010JPK01KKP03", 0.0401),
]
df = pd.DataFrame(data, columns=["server_timestamp", "name", "plant_id", "device_key", "value"])
df["server_timestamp"] = pd.to_datetime(df["server_timestamp"], utc=True) # Ensure correct format
logger.info("Checking for existing data...")
existing_data_query = f"""
{TABLE_NAME}
| where server_timestamp in ({', '.join(['datetime("' + ts.strftime('%Y-%m-%dT%H:%M:%S') + '")' for ts in df["server_timestamp"]])})
| project server_timestamp, name, plant_id, tostring(device_key), value
"""
try:
response = kusto_client.execute(DATABASE, existing_data_query)
df_existing = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
except Exception as e:
logger.error(f"Error querying existing data: {e}")
df_existing = pd.DataFrame()
if not df_existing.empty:
df_existing["server_timestamp"] = pd.to_datetime(df_existing["server_timestamp"], utc=True)
df_to_ingest = df.copy()
if not df_existing.empty:
df_to_ingest = df.merge(df_existing, on=["server_timestamp", "name", "plant_id", "device_key", "value"], how="left", indicator=True)
df_to_ingest = df_to_ingest[df_to_ingest["_merge"] == "left_only"].drop(columns=["_merge"])
if df_to_ingest.empty:
logger.info("All data already exists. Skipping ingestion.")
else:
logger.info(f"{len(df_to_ingest)} new rows to ingest.")
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp_file:
df_to_ingest.to_csv(tmp_file.name, index=False, header=False, encoding="utf-8", sep=",")
tmp_filename = tmp_file.name
ingestion_props = IngestionProperties(
database=DATABASE,
table=TABLE_NAME,
data_format=DataFormat.CSV,
flush_immediately=True
)
try:
file_desc = FileDescriptor(tmp_filename)
ingest_client.ingest_from_file(file_desc, ingestion_props)
logger.info("Data ingestion successful!")
except Exception as e:
logger.error(f"Error during ingestion: {e}")
os.remove(tmp_filename)
logger.info("Validating for ingestion issues related to '2_' pattern...")
validation_query = f"""
{TABLE_NAME}
| where device_key contains "2_" and isnull(value)
| project server_timestamp, name, plant_id, device_key, value
"""
try:
response = kusto_client.execute(DATABASE, validation_query)
df_validation = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
if not df_validation.empty:
logger.warning("Warning: Found ingested rows with null values in 'value' field and device_key containing '2_'.")
logger.warning(df_validation)
else:
logger.info("No null values detected in 'value' field for device_key containing '2_'.")
except Exception as e:
logger.error(f"Error running validation query: {e}")
query = f"{TABLE_NAME} | take 10"
try:
response = kusto_client.execute(DATABASE, query)
df_result = dataframe_from_result_table(response.primary_results[0]) if response.primary_results else pd.DataFrame()
logger.info("\nIngested Data:")
logger.info(df_result)
except Exception as e:
logger.error(f"Error fetching ingested data: {e}")
Resolved it by verifying that the timestamps were accurate, determining whether the data already existed before adding new rows, and performing tests after the data was ingested to identify any problems.
Outputs:
The 'value' field appears to have no null values where the 'device_key' contains '2_'.
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744653481a4586041.html
评论列表(0条)