转换 DataFrame 的字段类型
def convertColumn(df, name, newType):
return df.withColumn(name, df[name].cast(newType)) # 使用 cast
df = convertColumn(df, "column", FloatType())
预处理
# 将某个字段除以 100000
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
# 新建列
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households"))
.withColumn("populationPerHousehold", col("population")/col("households"))
.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
选择部分数据集
df = df.select("medianHouseValue",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom")
拆分数据段
from pyspark.ml.linalg import DenseVector
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df = spark.createDataFrame(input_data, ["label", "features"])
数据标准化
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)
构建简单的线性回归模型并预测, 评估
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=123)
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features_scaled', labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)
predicted = linearModel.transform(test_data)
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect() # 合并出结果, 前者是预测值
pyspark.ml package - PySpark master documentation