pyspark 中传递rdd 函数

当pyspark 中往rdd 传递的函数中,如果包含对对象的字段的引用, 可能会把整个对象都传递给spark application 节点.

同样的, 如果含有python无法序列化的对象 ( 如 protobuf 对象 ), 同样也会导致程序失败.

class SearchFunctions(object):
  def __init__(self, query):
      self.query = query
  def isMatch(self, s):
      return self.query in s
  def getMatchesFunctionReference(self, rdd):
      # 问题:在"self.isMatch"中引用了整个self
      return rdd.filter(self.isMatch)
  def getMatchesMemberReference(self, rdd):
      # 问题:在"self.query"中引用了整个self
      return rdd.filter(lambda x: self.query in x)
	def getMatchesNoReference(self, rdd):
      # 安全:只把需要的字段提取到局部变量中
      query = self.query
      return rdd.filter(lambda x: query in x)

同样在scala 中也要注意这个问题, 不注意的引用可能会导致传递整个对象.

开启动态资源调整之后 executor 的初始数量将会由下面公式决定

max(initialExecuor = 3, –num-executors = 10)

其中 initialExecuor 是 参数 spark.dynamicAllocation.initialExecutors ,表示要运行 executor 的初始数量

GroupByKey 和 reduceByKey

groupByKey 和 reduceByKey 的最大区别在于, groupByKey 直接对数据进行 shuffle ,reduceByKey 先在每个分区上进行合并之后再进行 shuffle, 所以如果在有大量 shuffle 的情况下, 使用 reduceByKey 更好

Spark 中的包

https://spark-packages.org/ 提供一系列spark 执行操作的相关包,以 spark-csv 举例,

下载地址为 https://spark-packages.org/package/databricks/spark-csv

下载之后将包解压缩到 spark 的安装目录下,然后执行以下命令加载相应的包

./bin/pyspark --packages com.databricks:spark-csv_2.10:1.3.0