python - How to join 2 DataFrames on really specific condition? - Stack Overflow

I have those 2 DataFrames:df1:+---+----------+----------+|id |id_special|date_1|+---+----------

I have those 2 DataFrames:

df1:
+---+----------+----------+
|id |id_special|date_1    |
+---+----------+----------+
|1  |101       |2024-11-01|
|2  |102       |2024-11-03|
|3  |103       |2024-11-04|
|4  |null      |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2    |type  |
+----------+----------+------+
|101       |2024-10-30|Type_1|
|101       |2024-10-31|Type_2|
|101       |2024-11-01|Type_3|
|102       |2024-11-03|Type_4|
+----------+----------+------+

My goal is to create a new column named df2_type in df1. To do so, I need a special join between df1 and df2. Here are the rule to create the column df2_type.

  1. If df1.id_special is null, set df1.df2_type to "Unknown".
  2. If df1.id_special is not in df2.id_client, set df1.df2_type to "Unknown".
  3. If df1.id_special is in df2.id_client:
    1. Get the record where df2.date_2 < df1.date_1 and is the closest to df1.date_1
    2. From the record, use df2.type to set df2_type.

So, from the precious DataFrames, this is the result that I am expecting:

+---+----------+----------+--------+
|id |id_special|date_1    |df2_type|
+---+----------+----------+--------+
|1  |101       |2024-11-01|Type_2  |
|2  |102       |2024-11-03|Unknown |
|3  |103       |2024-11-04|Unknown |
|4  |null      |2024-11-05|Unknown |
+---+----------+----------+--------+

I tried to do a join between my 2 DataFrames, but I never been able to join it properly. Here is the code that I have:

from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType

glueContext = GlueContext(SparkContext.getOrCreate())

data1 = [
    (1, 101, date.fromisoformat("2024-11-01")),
    (2, 102, date.fromisoformat("2024-11-03")),
    (3, 103, date.fromisoformat("2024-11-04")),
    (4, None, date.fromisoformat("2024-11-05")),
]

data2 = [
    (101, date.fromisoformat("2024-10-30"), "Type_1"),
    (101, date.fromisoformat("2024-10-31"), "Type_2"),
    (101, date.fromisoformat("2024-11-01"), "Type_3"),
    (102, date.fromisoformat("2024-11-03"), "Type_4"),
]

schema1 = StructType([
   StructField("id", IntegerType(), True), # Unique key
   StructField("id_special", IntegerType(), True),
   StructField("date_1", DateType(), True),
])

schema2 = StructType([
   StructField("id_special", IntegerType(), True),
   StructField("date_2", DateType(), True),
   StructField("type", StringType(), True),
])


df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)


# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))


# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1    |df2_type|
# +---+----------+----------+--------+
# |1  |101       |2024-11-01|Type_2  |
# |2  |102       |2024-11-03|Unknown |
# |3  |103       |2024-11-04|Unknown |
# |4  |null      |2024-11-05|Unknown |
# +---+----------+----------+--------+

I have those 2 DataFrames:

df1:
+---+----------+----------+
|id |id_special|date_1    |
+---+----------+----------+
|1  |101       |2024-11-01|
|2  |102       |2024-11-03|
|3  |103       |2024-11-04|
|4  |null      |2024-11-05|
+---+----------+----------+
df2:
+----------+----------+------+
|id_special|date_2    |type  |
+----------+----------+------+
|101       |2024-10-30|Type_1|
|101       |2024-10-31|Type_2|
|101       |2024-11-01|Type_3|
|102       |2024-11-03|Type_4|
+----------+----------+------+

My goal is to create a new column named df2_type in df1. To do so, I need a special join between df1 and df2. Here are the rule to create the column df2_type.

  1. If df1.id_special is null, set df1.df2_type to "Unknown".
  2. If df1.id_special is not in df2.id_client, set df1.df2_type to "Unknown".
  3. If df1.id_special is in df2.id_client:
    1. Get the record where df2.date_2 < df1.date_1 and is the closest to df1.date_1
    2. From the record, use df2.type to set df2_type.

So, from the precious DataFrames, this is the result that I am expecting:

+---+----------+----------+--------+
|id |id_special|date_1    |df2_type|
+---+----------+----------+--------+
|1  |101       |2024-11-01|Type_2  |
|2  |102       |2024-11-03|Unknown |
|3  |103       |2024-11-04|Unknown |
|4  |null      |2024-11-05|Unknown |
+---+----------+----------+--------+

I tried to do a join between my 2 DataFrames, but I never been able to join it properly. Here is the code that I have:

from awsglue.context import GlueContext
from datetime import date
from pyspark.context import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DateType, IntegerType, StringType, StructField, StructType

glueContext = GlueContext(SparkContext.getOrCreate())

data1 = [
    (1, 101, date.fromisoformat("2024-11-01")),
    (2, 102, date.fromisoformat("2024-11-03")),
    (3, 103, date.fromisoformat("2024-11-04")),
    (4, None, date.fromisoformat("2024-11-05")),
]

data2 = [
    (101, date.fromisoformat("2024-10-30"), "Type_1"),
    (101, date.fromisoformat("2024-10-31"), "Type_2"),
    (101, date.fromisoformat("2024-11-01"), "Type_3"),
    (102, date.fromisoformat("2024-11-03"), "Type_4"),
]

schema1 = StructType([
   StructField("id", IntegerType(), True), # Unique key
   StructField("id_special", IntegerType(), True),
   StructField("date_1", DateType(), True),
])

schema2 = StructType([
   StructField("id_special", IntegerType(), True),
   StructField("date_2", DateType(), True),
   StructField("type", StringType(), True),
])


df1 = spark.createDataFrame(data1, schema1)
df2 = spark.createDataFrame(data2, schema2)


# Step 1 - Add df2_type columns
df1 = df1.withColumn("df2_type", lit(None))


# The final DataFrame need to be like this
# +---+----------+----------+--------+
# |id |id_special|date_1    |df2_type|
# +---+----------+----------+--------+
# |1  |101       |2024-11-01|Type_2  |
# |2  |102       |2024-11-03|Unknown |
# |3  |103       |2024-11-04|Unknown |
# |4  |null      |2024-11-05|Unknown |
# +---+----------+----------+--------+
Share Improve this question asked Nov 20, 2024 at 19:32 jeremie bergeronjeremie bergeron 5297 silver badges13 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 1

Transform the task into a simple (outer) join and run some post-processing afterwards:

Step 1: group df2 by id_special and collect all rows into a list of structs.

from pyspark.sql import functions as F

df2grouped = df2.groupBy('id_special') \
    .agg(F.collect_list(F.struct('date_2', 'type')).alias('data'))

Step 2: run a left outer join between df1 and the df2grouped. Sort the array from df2grouped, filter out the dates that are not matching and then take the type from the first entry of the remaining array. Finally replace all null values with Unknown.

df1.join(df2grouped, on = 'id_special', how = 'left') \
    .withColumn('data', F.expr('array_sort(data, (l,r) -> datediff(r.date_2, l.date_2))')) \
    .withColumn('data', F.filter(F.col('data'), lambda x: x.date_2 < F.col('date_1'))) \
    .withColumn('df2_type', F.expr('data[0].type')) \
    .drop('data') \
    .na.fill('Unknown')

发布者:admin,转转请注明出处:http://www.yc00.com/questions/1742334541a4424387.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信