apache spark - Pyspark java UDF java.lang.OutOfMemoryError: Requested array size exceeds VM limit. SQLSTATE: 39000 - Stack Overf

I am running pyspark code with JAVA UDF in databricks.I have r6id.xlarge (32g) driver and node of r6id

I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB

df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))

Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error. but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted) I also tried to call my function directly like below

import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')

parser_class = spark._jvm.abc.x12_parser()
output = parser_class.call(contents)

This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.

I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local

I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB

df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))

Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error. but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted) I also tried to call my function directly like below

import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')

parser_class = spark._jvm.abc.x12_parser()
output = parser_class.call(contents)

This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.

I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local

Share Improve this question asked Feb 4 at 18:09 miltonmilton 1113 silver badges12 bronze badges
Add a comment  | 

2 Answers 2

Reset to default 0

The problem is that the Java UDF is executed on executors, not the driver. Executors process data in parallel. Despite the big memory of your workers, the heap size per executor may be not enough to handle such a large file. Furthermore, Spark's wholetext option loads a whole file as one row, therefore making things worse in terms of in-memory footprint when the UDF operates on it.

if you are using boto3 which will work on directly from driver it wont allow distributed execution. suggest you to use

  • braodcast variable for file content and then process it val fileContent = spark.sparkContext.broadcast(content) and then create dataframe with it. and then use that in your udf

  • set spark executor memroy ,memoryoverhead appropriately with number of core 4-5

it seems like there is some internal UDF memory limitation. I stopped using UDF and change the code to do map from java side

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745240580a4618158.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信