Dataframe Schema & Datatypes

df.schema         // [StructField('_id', StringType(), True), 
df.printSchema()  // StructField('created_at', StringType(), True) .....]

// Iterate over the fields in the DataFrame's schema
for field in df.schema.fields:    
		field_name = field.name    //company
		field_type = field.dataType      
				// StringType()
				// StructType([StructField('name', StringType(), True), StructField('revenue', IntegerType(), True)])
		field_type_name = field.dataType.typeName()  
				// string
				// struct

for column in dataframe.columns
// ['_id', 'created_at', company, .....]

// checking field type
isinstance(field_type, T.StringType)

Data 자료화

delete_ids = [row._id for row in new_delete_df.select("_id").collect()]

Data View

new_insert_df.show(5, False)

StructType Column

key_cols = [F.col(f"{'fullDocument'}.{field}").alias(field) for field in nested_fields]
df = df.select(*key_cols)

for field in df.schema.fields:
		field_type = field.dataType
		field_type_names = field.dataType.fieldNames()  //struct type에서만 사용가능
			// ['name', 'revenue']

		for nested_field in field_type.fields:
				field_name = nested_field.name
				field_type = nested_field.dataType

field_type_names = field.dataType.fieldNames()
coalesce_columns = [df[column_name].getField(field_type_name).cast(T.StringType()) 
										for field_type_name in field_type_names]

If / Else

df = df.withColumn(
    field_name,
    F.when(F.col(field_name) == "", None)
    .otherwise(F.col(field_name))
)

UDF (user defined function) 만드는 법

def is_json_object(content: str) -> bool:
    if content is None:
        return False
    try:
        parsed = json.loads(content, object_pairs_hook=check_duplicates)
        return isinstance(parsed, (dict, list))
    except:
        return False
																					 // return type
is_json_object_udf = F.udf(is_json_object, T.BooleanType())

UDF 사용법

dataframe.filter(is_json_object_udf(F.col(column)))

Column 형변환

df = df.withColumn(field_name, F.col(field_name).cast(redshift_field_type))

// Struct Type일 때
// Coalesce function to merge multiple type columns into string type
coalesce_columns = [df[column_name].getField(field_type_name).cast(redshift_field_type)
										for field_type_name in field_type_names]
coalesce_func = F.coalesce(*coalesce_columns)
df = df.withColumn(column_name, coalesce_func)

SQL 문법들

// Is Null
null_count = df.filter(df[column_name].isNull()).count()

// F.lit
df = df.withColumn('__hevo__marked_deleted', F.when(F.lit(True), F.lit(False)))

// Window Function
windowSpec = Window.partitionBy("documentKey._id").orderBy(F.desc("clusterTime"))

// Row Number
df_with_rn = df.withColumn("rn", F.row_number().over(windowSpec))

// Column 이름변경
df = latest_rn_df.withColumnRenamed("operationType", "_operation_type")

// Alias
key_cols = [F.col("documentKey._id").alias("_id"), F.col("_operation_type")]
df = df.select(*key_cols)

// Union
union_df = new_insert_df.union(new_update_df)

// column 삭제
union_df = union_df.drop("_operation_type")