当pyspark 中往rdd 传递的函数中,如果包含对对象的字段的引用, 可能会把整个对象都传递给spark application 节点.
同样的, 如果含有python无法序列化的对象 ( 如 protobuf 对象 ), 同样也会导致程序失败.
class SearchFunctions(object):
def __init__(self, query):
self.query = query
def isMatch(self, s):
return self.query in s
def getMatchesFunctionReference(self, rdd):
# 问题:在"self.isMatch"中引用了整个self
return rdd.filter(self.isMatch)
def getMatchesMemberReference(self, rdd):
# 问题:在"self.query"中引用了整个self
return rdd.filter(lambda x: self.query in x)
def getMatchesNoReference(self, rdd):
# 安全:只把需要的字段提取到局部变量中
query = self.query
return rdd.filter(lambda x: query in x)
同样在scala 中也要注意这个问题, 不注意的引用可能会导致传递整个对象.
max(initialExecuor = 3, –num-executors = 10)
其中 initialExecuor
是 参数 spark.dynamicAllocation.initialExecutors
,表示要运行 executor 的初始数量
groupByKey 和 reduceByKey 的最大区别在于, groupByKey 直接对数据进行 shuffle ,reduceByKey 先在每个分区上进行合并之后再进行 shuffle, 所以如果在有大量 shuffle 的情况下, 使用 reduceByKey 更好
https://spark-packages.org/ 提供一系列spark 执行操作的相关包,以 spark-csv 举例,
下载地址为 https://spark-packages.org/package/databricks/spark-csv
下载之后将包解压缩到 spark 的安装目录下,然后执行以下命令加载相应的包
./bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0