We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions
.
However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions
to 32, it will effectively do a .repartition(32)
on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.
Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?
I was thinking of something like this, but I was hoping there's something more efficient:
// Imagine "df" is the incoming dataframe we want to write.
val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt
val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))
for (chunkNum <- 0 to chunks) {
chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
.drop("CHUNK_NUM")
.write
.format("jdbc")
.options(...) // DB info + numPartitions = 32
.save
}
Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.
I'm also using batchSize
set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.
We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions
.
However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions
to 32, it will effectively do a .repartition(32)
on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.
Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?
I was thinking of something like this, but I was hoping there's something more efficient:
// Imagine "df" is the incoming dataframe we want to write.
val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt
val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))
for (chunkNum <- 0 to chunks) {
chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
.drop("CHUNK_NUM")
.write
.format("jdbc")
.options(...) // DB info + numPartitions = 32
.save
}
Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.
I'm also using batchSize
set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.
2 Answers
Reset to default 1I was overthinking it. We can constrain how much Spark is writing at once by simply constraining the resources we give Spark. If I set numPartitions
to 500 but only give Spark a single 32-core worker, it will only write 32 partitions at a time, limiting how much we're hammering Oracle. Thus effectively "chunks" the job.
In your case i would simply repartition
val threads = 32
df.repartition(threads).write.format("jdbc").options(...).save()
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1745594360a4635018.html
评论列表(0条)