apache spark - Insert overwrite multiple partitions in an external Hive table - Stack Overflow

I am trying to overwrite multiple partitions in a large table. Basically I have my main external S3 tab

I am trying to overwrite multiple partitions in a large table. Basically I have my main external S3 table sandbox, partitioned by part:

scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val                                 |part|
+---+------------------------------------+----+
|0  |5d8bad52-5373-4147-8cea-9492bb0d86b9|p0  |
|1  |28ed4d74-43ac-453b-bec9-651337bc18fc|p1  |
|4  |3f958c0f-88a8-4afa-bcf5-f89bcead3712|p4  |
|2  |cb596b60-a12a-4a19-9c37-6a12a036a71d|p2  |
|3  |b69f53d6-6de4-495f-881c-9259204e4a30|p3  |
+---+------------------------------------+----+

and data that represents results of a query:

scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0  |
|1  |updated_d4987777-97f5-4676-a464-bd45877868fc|p1  |
+---+--------------------------------------------+----+

I want to merge data into sandbox and overwrite only partitions that exist in sandbox, without touching the rest of them, so basically:

q("insert overwrite sandbox partition (part) select * from data"),

but that query drops p2, p3 and p4, resulting in:

scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0  |
|1  |updated_d4987777-97f5-4676-a464-bd45877868fc|p1  |
+---+--------------------------------------------+----+

Alternatively I can q("insert overwrite sandbox partition(part='p1') select * from data where part = 'p1'"), and that achieves my desired result, but only for one partition, and I would strongly prefer to avoid running many queries for this merge.

I could also left join and union the 2 datasets, but I suspect that would defeat the whole purpose of partitioning sandbox in the first place, and data is expected to be a relatively very small portion of sandbox, which is the whole reason for trying to partition in the first place.

Any ideas how to best overwrite multiple, but not all partitions?

EDIT:

Problem Im trying to solve is that I need to overwrite only partitions in source table, and leave the other ones in destination (but not source) as is. Full illustration of the issue from ground up -

scala> def q(s: String) = spark.sql(s).show(false)
q: (s: String)Unit

scala> case class Record(id: Int, `val`: String, part: String)
defined class Record

scala> q("""
     |    create external table sandbox (
     |         id int,
     |         val string,
     |         part string
     |    ) using parquet
     |    partitioned by (
     |         part
     |    )
     |    location 's3://...../sandbox/'"""
     | )
25/03/21 19:22:28 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
++
||
++
++


scala> (0 until 10).map(i => {Record(i, java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").partitionBy("part").mode("append").saveAsTable("sandbox")

scala>

scala> q("""
     |     create table data (
     |         id int,
     |         val string,
     |         part string
     |     ) using parquet
     |     partitioned by (
     |          part
     |     )
     |     location 'hdfs:///data'""")
++
||
++
++


scala> (0 until 2).map(i => {Record(i, "updated_" + java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").mode("append").partitionBy("part").saveAsTable("data")

scala>

scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
|1  |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1  |
+---+--------------------------------------------+----+


scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val                                 |part|
+---+------------------------------------+----+
|4  |5707218e-d717-432e-b810-a20c5037de30|p4  |
|9  |5e98e6b4-474f-4674-93a0-76d26ded0679|p4  |
|0  |0dd8e874-9de8-47bb-be83-59c4e6e0f021|p0  |
|2  |d0ffb48e-0dca-493d-b4cb-4417d956145c|p2  |
|7  |7fc72d81-7124-4ce1-94ba-da9c93233f66|p2  |
|1  |53b3776c-89ce-4511-9958-9184a2a99cbf|p1  |
|6  |4dcb8a74-2cb7-4b01-87e6-713d54f79b7d|p1  |
|8  |312053a7-01bd-41a2-b4a6-0200a87f77b6|p3  |
|5  |e6198dc2-2997-4988-99da-1fb2fa525ba7|p0  |
|3  |e45489cc-bdb4-409c-972a-3229df89af73|p3  |
+---+------------------------------------+----+


scala> q("insert overwrite sandbox partition (part) select * from data")
++
||
++
++


scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
|1  |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1  |
+---+--------------------------------------------+----+


scala> q("insert overwrite sandbox partition (part) select * from data where part = 'p0'")
++
||
++
++


scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val                                         |part|
+---+--------------------------------------------+----+
|0  |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0  |
+---+--------------------------------------------+----+

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744389292a4571825.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信