Pyspark性能问题Spark 1.5.2发行商Cloudera



我在执行pyspark脚本时会遇到一些性能问题:

import os
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.types import Row

def append_columns(row, dct):
    """
    :param row:
    :param dct:
    :type row:Row
    :type dct:dict
    :return:
    """
    schema = row.asDict()
    schema.update(dct)
    return Row(**schema)

def append_column(row, key, value):
    """
    :param row:
    :param key:
    :param value:
    :type row:Row
    :type key: str
    :return:
    """
    schema = row.asDict()
    if key not in schema:
        schema[key] = value
    res = Row(**schema)
    return res

class Components(object):
    def __init__(self):
        pass
    def first_components(self, row, key):
        """
        :param row:
        :param key:
        :type row: Row
        :type key:lambda for example labmda x: x.description
        :return:
        """
        pass
    def join(self, row, dict_other, key):
        """
        some logic with join
        :param row:
        :param dict_other:
        :param key:
        :return:
        :rtype: list
        was realized joining logic like "one to many" multiply per row ~150->1500
        """
        pass
    def some_action(self, x, key):
        pass

def append_category(row, key, is_exists_category, type_category):
    comp = Components()
    if int(is_exists_category) == 1:
        type_category = int(type_category)
        if type_category == 1:
            return append_column(row, "component", comp.first_components(row, key))
        elif type_category == 2:
            """
            copy paste
            """
            return append_column(row, "component", comp.first_components(row, key))
    else:
        return row

comp = Components()
conf = SparkConf()
sc = SparkContext(conf=conf)
sql = SQLContext(sparkContext=sc)
query = HiveContext(sparkContext=sc)
first = sql.parquetFile("some/path/to/parquetfile").rdd.collectAsMap()
first = sc.broadcast(first)
key = lambda x: x.description
"""sec has from select 2k rows"""
sec = query.sql("select bla, bla1, description from some_one").rdd 
    .filter(lambda x: x.bla1 > 10) 
    .map(lambda x: append_category(x, key, 1, 1)) 
    .map(lambda x: append_column(x, "hole_size", comp.some_action(x, key))) 
    .flatMap(lambda x: comp.join(x, first.value, key)) 
    .filter(lambda x: x)
table = 'db.some_one'
query.sql("DROP TABLE IF EXISTS {tbl}".format(tbl=table + "_test"))
query.createDataFrame(sec, samplingRatio=10).saveAsTable("{tbl}".format(tbl=table + "_dcka"), mode='overwrite',
                                                         path=os.path.join("some/path/to/",
                                                                           table.split('.')[1] + "_test"))

火花配置:

  • 6执行者
  • 每个执行者2GB

此脚本运行近5个小时,Spark历史记录仅显示一个执行人上的负载。分区没有任何效果。

您可以尝试简化逻辑:

rdd1 = query.sql("select bla, bla1, description from some_one").rdd
rdd2 = sql.parquetFile("some/path/to/parquetfile").rdd
rdd1.join (rdd2) 

然后添加过滤,然后广播加入如果性能很烂

您可以通过'rdd.partitions.ize'监视分区的数量,您的分区数应大致对应于整个群集上的核心数量,以便所有执行者都参与处理

相关内容

  • 没有找到相关文章

最新更新