I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB
df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))
Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit
. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error.
but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted)
I also tried to call my function directly like below
import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')
parser_class = spark._jvm.abc.x12_parser()
output = parser_class.call(contents)
This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.
I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local
I am running pyspark code with JAVA UDF in databricks. I have r6id.xlarge (32g) driver and node of r6id.4xlarge (128) worker. I am reading only one file and my java UDF is just calling an open source X12 java lib to parse the file as a whole Sample code below, this works for files which are less then 100MB
df = spark.read.format('text').option('wholetext', True).load("s3://xxxx/xxxxxxx")
spark.udf.registerJavaFunction("x12_parser","com.abc", pst.StringType())
df.select(expr(x12_parser(input_columns)))
Whenever I parse big file (just one file), I will get error java.lang.OutOfMemoryError: Requested array size exceeds VM limit
. When I parse this file locally, if I increase my heap size to 20g it will work otherwise same error.
but as my worker node is way larger than this.(I am in databricks so no need to configure executor memory and set -Xmx is not permitted)
I also tried to call my function directly like below
import boto3
s3 = boto3.client('s3')
bucket_name = 'xxxx'
key = 'xxxxxxx'
response = s3.get_object(Bucket=bucket_name, Key=key)
contents = response['Body'].read().decode('utf-8')
parser_class = spark._jvm.abc.x12_parser()
output = parser_class.call(contents)
This will work fine even when my driver is 4 times smaller than worker without touch java heap size. I tried to play with some spark setting like network timeout and spark.executor.extraJavaOptions -Xms20g -XX:+UseCompressedOops but none of them works.
I can't explain why with a huge worker I can't process the same file I can on much smaller driver or my local
Share Improve this question asked Feb 4 at 18:09 miltonmilton 1113 silver badges12 bronze badges2 Answers
Reset to default 0The problem is that the Java UDF is executed on executors, not the driver. Executors process data in parallel. Despite the big memory of your workers, the heap size per executor may be not enough to handle such a large file. Furthermore, Spark's wholetext option loads a whole file as one row, therefore making things worse in terms of in-memory footprint when the UDF operates on it.
if you are using boto3 which will work on directly from driver it wont allow distributed execution. suggest you to use
braodcast variable for file content and then process it
val fileContent = spark.sparkContext.broadcast(content)
and then create dataframe with it. and then use that in your udfset spark executor memroy ,memoryoverhead appropriately with number of core 4-5
it seems like there is some internal UDF memory limitation. I stopped using UDF and change the code to do map from java side
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745240580a4618158.html
评论列表(0条)