I have been trying to read csv files with Databricks Autoloader for a while now, earlier in DBR 11.3LTS version, my csv files used to be read pretty easily without any significant issues.
Basically I have a bunch of csv files that needs to be ingested and there are a few files where certain rows of the csv file the data is delimited by ";", Problem is that certain files have a lot of semi-colons, so it turns out that after reading it with this delimiter, it has 2 billion columns. Earlier DBR versions used to just skip these extra columns but now in DBR 14.3LTS which we require as an upgrade doesn't skip these extra columns.
We have a custom logic to segregate such rows into bad tables and stuff so it was well handled in the old DBR 11.3LTS version
The new version requires me to use cloudFiles.maxColumns, so I hardcoded it to use 2.5 billion columns so that it can read it but now it gives an error saying "Requested array size exceeds VM Limit". I have tried using 128gb ram clusters as well as multiple node clusters as well and it always gives an executor lost failure with no particular useful information on the SPARK-UI.
Tried using different cloudFiles options to make it work but similar problems.
Is there anyway to replicate the behaviour of DBR 11.3LTS in 14.3lts without any significant changes?
This is how the configuration file looks like
{
"Name": "eventlog_n4_wlf_csv",
"DataProductName": "some_data_prod",
"LookbackRange": 21,
"Database": "some_db",
"DatasetPath": "some_path",
"CheckpointPath": "some_checkpoint",
"BadPath": "some_bad_path",
"BadTablePath": "some_bad_table_path",
"PartitionColumns": [
"Year",
"Month",
"Day"
],
"SchemaColumns": [
{
"colName": "Year",
"nullable": false,
"dataType": "int"
},
{
"colName": "Month",
"nullable": false,
"dataType": "int"
},
{
"colName": "Day",
"nullable": false,
"dataType": "int"
},
{
"colName": "SerialNumber",
"nullable": false,
"dataType": "int"
},
{
"colName": "MaterialNumber",
"nullable": false,
"dataType": "int"
},
{
"colName": "EventSourceHash",
"nullable": false,
"dataType": "string"
},
{
"colName": "Category",
"nullable": true,
"dataType": "int"
},
{
"colName": "CategoryString",
"nullable": true,
"dataType": "string"
},
{
"colName": "EventCode",
"nullable": true,
"dataType": "int"
},
{
"colName": "EventIdentifier",
"nullable": true,
"dataType": "bigint"
},
{
"colName": "EventType",
"nullable": true,
"dataType": "int"
},
{
"colName": "Logfile",
"nullable": false,
"dataType": "string"
},
{
"colName": "Message",
"nullable": true,
"dataType": "string"
},
{
"colName": "RecordNumber",
"nullable": false,
"dataType": "bigint"
},
{
"colName": "SourceName",
"nullable": true,
"dataType": "string"
},
{
"colName": "TimeGenerated",
"nullable": false,
"dataType": "timestamp"
},
{
"colName": "TimeZoneOffset",
"nullable": true,
"dataType": "int"
},
{
"colName": "Type",
"nullable": true,
"dataType": "string"
},
{
"colName": "User",
"nullable": true,
"dataType": "string"
},
{
"colName": "Data",
"nullable": true,
"dataType": "string"
},
{
"colName": "FileName",
"nullable": false,
"dataType": "string"
},
{
"colName": "FileUploadDate",
"nullable": false,
"dataType": "timestamp"
}
],
"Properties": {
"delta.enableChangeDataFeed": "true",
"delta.checkpoint.writeStatsAsStruct": "true",
"delta.targetFileSize": "32mb",
"delta.autoOptimize.autoCompact": "true"
},
"WriteOptions": {
},
"Sources": {
"csv_source": {
"Path": "/mnt/raw/some_data_product_path/exp/day=*/materialnum=*/serialnum=*/",
"ReadOptions": {
"useStrictGlobber": "true",
"header": "true",
"sep": ";",
"cloudFiles.partitionColumns": "day,materialnum,serialnum",
"cloudFiles.schemaEvolutionMode": "none",
"cloudFiles.schemaLocation": "/mnt/gold/tables/data_product/base_table/_schemaLocation",
"cloudFiles.useNotifications": "false",
"cloudFiles.maxFileAge": "28 days",
"cloudFiles.maxBytesPerTrigger": "8589934592",
"cloudFiles.maxFilesPerTrigger": "100",
"maxColumns": 2147483646
}
}
},
"Monitoring": {
"TableName": "monitoring.processing_statistics",
"TablePath": "/mnt/gold/tables/monitoring/processing_statistics"
}
}
Just another sample code where I am trying to read a particular file
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
# Define schema correctly
schema = StructType(
[
StructField("Category", IntegerType(), True),
StructField("CategoryString", StringType(), True),
StructField("EventCode", IntegerType(), True),
StructField("EventIdentifier", LongType(), True),
StructField("EventType", IntegerType(), True),
StructField("Logfile", StringType(), True),
StructField("Message", StringType(), True),
StructField("RecordNumber", LongType(), True),
StructField("SourceName", StringType(), True),
StructField("TimeGenerated", StringType(), True),
StructField("Type", StringType(), True),
StructField("User", StringType(), True),
StructField("Data", StringType(), True), # Fixed the incorrect schema syntax
]
)
# Read CSV file with schema
# df = spark.read.csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)
# df = spark.read.option("mode", "DROPMALFORMED").csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)
df = (
spark.read.option("header", True)
.option("delimiter", ";")
.option("mode", "PERMISSIVE")
.option("encoding", "UTF-8")
.option("maxColumns", 2147483647)
.csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema)
)
display(df)
When I change maxColumns to use this particular value, it gives me Requested array size exceeds VM limit
if I give a different value around 50000 or so, it gives me this
aused by: com.univocity.parsersmon.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - 20480 Hint: Number of columns processed may have exceeded limit of 20480 columns. Use settings.setMaxColumns(int) to define the maximum number of columns your input can have Ensure your configuration is correct, with delimiters, quotes and escape sequences that match the input format you are trying to parse
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744767104a4592527.html
评论列表(0条)