转换 DataFrame 的字段类型

def convertColumn(df, name, newType):
	 return df.withColumn(name, df[name].cast(newType))  # 使用 cast

df = convertColumn(df, "column", FloatType())

预处理

# 将某个字段除以 100000
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

# 新建列
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households"))
  .withColumn("populationPerHousehold", col("population")/col("households"))
  .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

选择部分数据集

df = df.select("medianHouseValue",
             "totalBedRooms",
             "population",
             "households",
             "medianIncome",
             "roomsPerHousehold",
             "populationPerHousehold",
             "bedroomsPerRoom")

拆分数据段

from pyspark.ml.linalg import DenseVector

input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df = spark.createDataFrame(input_data, ["label", "features"])

数据标准化

from pyspark.ml.feature import StandardScaler

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)

构建简单的线性回归模型并预测, 评估

train_data, test_data = scaled_df.randomSplit([.8,.2], seed=123)

from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features_scaled', labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)

predicted = linearModel.transform(test_data)
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()  # 合并出结果, 前者是预测值

pyspark.ml package - PySpark master documentation