I am trying to overwrite multiple partitions in a large table. Basically I have my main external S3 table sandbox
, partitioned by part
:
scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val |part|
+---+------------------------------------+----+
|0 |5d8bad52-5373-4147-8cea-9492bb0d86b9|p0 |
|1 |28ed4d74-43ac-453b-bec9-651337bc18fc|p1 |
|4 |3f958c0f-88a8-4afa-bcf5-f89bcead3712|p4 |
|2 |cb596b60-a12a-4a19-9c37-6a12a036a71d|p2 |
|3 |b69f53d6-6de4-495f-881c-9259204e4a30|p3 |
+---+------------------------------------+----+
and data
that represents results of a query:
scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val |part|
+---+--------------------------------------------+----+
|0 |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0 |
|1 |updated_d4987777-97f5-4676-a464-bd45877868fc|p1 |
+---+--------------------------------------------+----+
I want to merge data
into sandbox
and overwrite only partitions that exist in sandbox
, without touching the rest of them, so basically:
q("insert overwrite sandbox partition (part) select * from data")
,
but that query drops p2
, p3
and p4
, resulting in:
scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val |part|
+---+--------------------------------------------+----+
|0 |updated_9521f4d0-0717-4025-b0a2-1237bf1b3b34|p0 |
|1 |updated_d4987777-97f5-4676-a464-bd45877868fc|p1 |
+---+--------------------------------------------+----+
Alternatively I can q("insert overwrite sandbox partition(part='p1') select * from data where part = 'p1'")
, and that achieves my desired result, but only for one partition, and I would strongly prefer to avoid running many queries for this merge.
I could also left join and union the 2 datasets, but I suspect that would defeat the whole purpose of partitioning sandbox
in the first place, and data
is expected to be a relatively very small portion of sandbox
, which is the whole reason for trying to partition in the first place.
Any ideas how to best overwrite multiple, but not all partitions?
EDIT:
Problem Im trying to solve is that I need to overwrite only partitions in source table, and leave the other ones in destination (but not source) as is. Full illustration of the issue from ground up -
scala> def q(s: String) = spark.sql(s).show(false)
q: (s: String)Unit
scala> case class Record(id: Int, `val`: String, part: String)
defined class Record
scala> q("""
| create external table sandbox (
| id int,
| val string,
| part string
| ) using parquet
| partitioned by (
| part
| )
| location 's3://...../sandbox/'"""
| )
25/03/21 19:22:28 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
++
||
++
++
scala> (0 until 10).map(i => {Record(i, java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").partitionBy("part").mode("append").saveAsTable("sandbox")
scala>
scala> q("""
| create table data (
| id int,
| val string,
| part string
| ) using parquet
| partitioned by (
| part
| )
| location 'hdfs:///data'""")
++
||
++
++
scala> (0 until 2).map(i => {Record(i, "updated_" + java.util.UUID.randomUUID().toString(), "p" + (i % 5))}).toSeq.toDS.write.format("parquet").mode("append").partitionBy("part").saveAsTable("data")
scala>
scala> q("select * from data")
+---+--------------------------------------------+----+
|id |val |part|
+---+--------------------------------------------+----+
|0 |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0 |
|1 |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1 |
+---+--------------------------------------------+----+
scala> q("select * from sandbox")
+---+------------------------------------+----+
|id |val |part|
+---+------------------------------------+----+
|4 |5707218e-d717-432e-b810-a20c5037de30|p4 |
|9 |5e98e6b4-474f-4674-93a0-76d26ded0679|p4 |
|0 |0dd8e874-9de8-47bb-be83-59c4e6e0f021|p0 |
|2 |d0ffb48e-0dca-493d-b4cb-4417d956145c|p2 |
|7 |7fc72d81-7124-4ce1-94ba-da9c93233f66|p2 |
|1 |53b3776c-89ce-4511-9958-9184a2a99cbf|p1 |
|6 |4dcb8a74-2cb7-4b01-87e6-713d54f79b7d|p1 |
|8 |312053a7-01bd-41a2-b4a6-0200a87f77b6|p3 |
|5 |e6198dc2-2997-4988-99da-1fb2fa525ba7|p0 |
|3 |e45489cc-bdb4-409c-972a-3229df89af73|p3 |
+---+------------------------------------+----+
scala> q("insert overwrite sandbox partition (part) select * from data")
++
||
++
++
scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val |part|
+---+--------------------------------------------+----+
|0 |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0 |
|1 |updated_a3e80d62-c10b-4b05-ac1e-d054f121909c|p1 |
+---+--------------------------------------------+----+
scala> q("insert overwrite sandbox partition (part) select * from data where part = 'p0'")
++
||
++
++
scala> q("select * from sandbox")
+---+--------------------------------------------+----+
|id |val |part|
+---+--------------------------------------------+----+
|0 |updated_ca718ad1-319d-4bbf-9469-6539db7918f2|p0 |
+---+--------------------------------------------+----+
发布者:admin,转转请注明出处:http://www.yc00.com/questions/1744389292a4571825.html
评论列表(0条)