Dataframe Schema & Datatypes
df.schema // [StructField('_id', StringType(), True),
df.printSchema() // StructField('created_at', StringType(), True) .....]
// Iterate over the fields in the DataFrame's schema
for field in df.schema.fields:
field_name = field.name //company
field_type = field.dataType
// StringType()
// StructType([StructField('name', StringType(), True), StructField('revenue', IntegerType(), True)])
field_type_name = field.dataType.typeName()
// string
// struct
for column in dataframe.columns
// ['_id', 'created_at', company, .....]
// checking field type
isinstance(field_type, T.StringType)
Data 자료화
delete_ids = [row._id for row in new_delete_df.select("_id").collect()]
Data View
new_insert_df.show(5, False)
StructType Column
key_cols = [F.col(f"{'fullDocument'}.{field}").alias(field) for field in nested_fields]
df = df.select(*key_cols)
for field in df.schema.fields:
field_type = field.dataType
field_type_names = field.dataType.fieldNames() //struct type에서만 사용가능
// ['name', 'revenue']
for nested_field in field_type.fields:
field_name = nested_field.name
field_type = nested_field.dataType
field_type_names = field.dataType.fieldNames()
coalesce_columns = [df[column_name].getField(field_type_name).cast(T.StringType())
for field_type_name in field_type_names]
If / Else
df = df.withColumn(
field_name,
F.when(F.col(field_name) == "", None)
.otherwise(F.col(field_name))
)
UDF (user defined function) 만드는 법
def is_json_object(content: str) -> bool:
if content is None:
return False
try:
parsed = json.loads(content, object_pairs_hook=check_duplicates)
return isinstance(parsed, (dict, list))
except:
return False
// return type
is_json_object_udf = F.udf(is_json_object, T.BooleanType())
UDF 사용법
dataframe.filter(is_json_object_udf(F.col(column)))
Column 형변환
df = df.withColumn(field_name, F.col(field_name).cast(redshift_field_type))
// Struct Type일 때
// Coalesce function to merge multiple type columns into string type
coalesce_columns = [df[column_name].getField(field_type_name).cast(redshift_field_type)
for field_type_name in field_type_names]
coalesce_func = F.coalesce(*coalesce_columns)
df = df.withColumn(column_name, coalesce_func)
SQL 문법들
// Is Null
null_count = df.filter(df[column_name].isNull()).count()
// F.lit
df = df.withColumn('__hevo__marked_deleted', F.when(F.lit(True), F.lit(False)))
// Window Function
windowSpec = Window.partitionBy("documentKey._id").orderBy(F.desc("clusterTime"))
// Row Number
df_with_rn = df.withColumn("rn", F.row_number().over(windowSpec))
// Column 이름변경
df = latest_rn_df.withColumnRenamed("operationType", "_operation_type")
// Alias
key_cols = [F.col("documentKey._id").alias("_id"), F.col("_operation_type")]
df = df.select(*key_cols)
// Union
union_df = new_insert_df.union(new_update_df)
// column 삭제
union_df = union_df.drop("_operation_type")