Databricks Runtime Autoloader Behaviour with csv - Stack Overflow

I have been trying to read csv files with Databricks Autoloader for a while now, earlier in DBR 11.3LTS

I have been trying to read csv files with Databricks Autoloader for a while now, earlier in DBR 11.3LTS version, my csv files used to be read pretty easily without any significant issues.

Basically I have a bunch of csv files that needs to be ingested and there are a few files where certain rows of the csv file the data is delimited by ";", Problem is that certain files have a lot of semi-colons, so it turns out that after reading it with this delimiter, it has 2 billion columns. Earlier DBR versions used to just skip these extra columns but now in DBR 14.3LTS which we require as an upgrade doesn't skip these extra columns.

We have a custom logic to segregate such rows into bad tables and stuff so it was well handled in the old DBR 11.3LTS version

The new version requires me to use cloudFiles.maxColumns, so I hardcoded it to use 2.5 billion columns so that it can read it but now it gives an error saying "Requested array size exceeds VM Limit". I have tried using 128gb ram clusters as well as multiple node clusters as well and it always gives an executor lost failure with no particular useful information on the SPARK-UI.

Tried using different cloudFiles options to make it work but similar problems.

Is there anyway to replicate the behaviour of DBR 11.3LTS in 14.3lts without any significant changes?

This is how the configuration file looks like

{
  "Name": "eventlog_n4_wlf_csv",
  "DataProductName": "some_data_prod",
  "LookbackRange": 21,
  "Database": "some_db",
  "DatasetPath": "some_path",
  "CheckpointPath": "some_checkpoint",
  "BadPath": "some_bad_path",
  "BadTablePath": "some_bad_table_path",
  "PartitionColumns": [
    "Year",
    "Month",
    "Day"
  ],
  "SchemaColumns": [
    {
      "colName": "Year",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "Month",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "Day",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "SerialNumber",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "MaterialNumber",
      "nullable": false,
      "dataType": "int"
    },
    {
      "colName": "EventSourceHash",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "Category",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "CategoryString",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "EventCode",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "EventIdentifier",
      "nullable": true,
      "dataType": "bigint"
    },
    {
      "colName": "EventType",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "Logfile",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "Message",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "RecordNumber",
      "nullable": false,
      "dataType": "bigint"
    },
    {
      "colName": "SourceName",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "TimeGenerated",
      "nullable": false,
      "dataType": "timestamp"
    },
    {
      "colName": "TimeZoneOffset",
      "nullable": true,
      "dataType": "int"
    },
    {
      "colName": "Type",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "User",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "Data",
      "nullable": true,
      "dataType": "string"
    },
    {
      "colName": "FileName",
      "nullable": false,
      "dataType": "string"
    },
    {
      "colName": "FileUploadDate",
      "nullable": false,
      "dataType": "timestamp"
    }
  ],
  "Properties": {
    "delta.enableChangeDataFeed": "true",
    "delta.checkpoint.writeStatsAsStruct": "true",
    "delta.targetFileSize": "32mb",
    "delta.autoOptimize.autoCompact": "true"
  },
  "WriteOptions": {
    
  },
  "Sources": {
    "csv_source": {
      "Path": "/mnt/raw/some_data_product_path/exp/day=*/materialnum=*/serialnum=*/",
      "ReadOptions": {
        "useStrictGlobber": "true",
        "header": "true",
        "sep": ";",
        "cloudFiles.partitionColumns": "day,materialnum,serialnum",
        "cloudFiles.schemaEvolutionMode": "none",
        "cloudFiles.schemaLocation": "/mnt/gold/tables/data_product/base_table/_schemaLocation",
        "cloudFiles.useNotifications": "false",
        "cloudFiles.maxFileAge": "28 days",
        "cloudFiles.maxBytesPerTrigger": "8589934592",
        "cloudFiles.maxFilesPerTrigger": "100",
        "maxColumns": 2147483646
      }
    }
  },
  "Monitoring": {
    "TableName": "monitoring.processing_statistics",
    "TablePath": "/mnt/gold/tables/monitoring/processing_statistics"
  }
}

Just another sample code where I am trying to read a particular file

    from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType


# Define schema correctly
schema = StructType(
    [
        StructField("Category", IntegerType(), True),
        StructField("CategoryString", StringType(), True),
        StructField("EventCode", IntegerType(), True),
        StructField("EventIdentifier", LongType(), True),
        StructField("EventType", IntegerType(), True),
        StructField("Logfile", StringType(), True),
        StructField("Message", StringType(), True),
        StructField("RecordNumber", LongType(), True),
        StructField("SourceName", StringType(), True),
        StructField("TimeGenerated", StringType(), True),
        StructField("Type", StringType(), True),
        StructField("User", StringType(), True),
        StructField("Data", StringType(), True),  # Fixed the incorrect schema syntax
    ]
)

# Read CSV file with schema
# df = spark.read.csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)

# df = spark.read.option("mode", "DROPMALFORMED").csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema, header=True)


df = (
    spark.read.option("header", True)
    .option("delimiter", ";")
    .option("mode", "PERMISSIVE")
    .option("encoding", "UTF-8")
    .option("maxColumns", 2147483647)
    .csv("dbfs:/FileStore/asarkar/sample.csv", schema=schema)
)


display(df)

When I change maxColumns to use this particular value, it gives me Requested array size exceeds VM limit

if I give a different value around 50000 or so, it gives me this

aused by: com.univocity.parsersmon.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - 20480 Hint: Number of columns processed may have exceeded limit of 20480 columns. Use settings.setMaxColumns(int) to define the maximum number of columns your input can have Ensure your configuration is correct, with delimiters, quotes and escape sequences that match the input format you are trying to parse

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744767104a4592527.html

相关推荐

  • Databricks Runtime Autoloader Behaviour with csv - Stack Overflow

    I have been trying to read csv files with Databricks Autoloader for a while now, earlier in DBR 11.3LTS

    16小时前
    10

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信