如何使用DataFrame和JDBC连接来提高慢速Spark作业的性能



我试图通过JDBC在单个节点(本地[*])上以独立模式访问一个中等大小的Teradata表(约1亿行)。

我正在使用Spark 1.4.1。并且安装在一个非常强大的机器上(2个cpu, 24核,126G RAM)。

我已经尝试了几个内存设置和调优选项来使它工作得更快,但它们都没有产生很大的影响。

我确信我错过了一些东西,下面是我最后的尝试,花了大约11分钟来获得这个简单的计数,而使用JDBC连接通过R只花了40秒来获得计数。

bin/pyspark --driver-memory 40g --executor-memory 40g
df = sqlContext.read.jdbc("jdbc:teradata://......)
df.count()

当我尝试使用大表(5B条记录)时,查询完成后没有返回结果。

所有的聚合操作都是在将整个数据集检索到内存中的DataFrame集合后执行的。因此,在Spark中进行计数永远不会像直接在TeraData中那样高效。有时,通过创建视图,然后使用JDBC API映射这些视图,将一些计算推入数据库是值得的。

每次使用JDBC驱动程序访问大型表时,都应该指定分区策略,否则您将创建具有单个分区的DataFrame/RDD,并且您将重载单个JDBC连接。

相反,你想尝试以下AI(自Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

还有一个选项可以下推一些过滤

如果你没有一个均匀分布的积分列,你想通过指定自定义谓词(where语句)来创建一些自定义分区。例如,假设您有一个时间戳列,并希望按日期范围进行分区:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }
 predicates.foreach(println) 
// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

它将生成一个DataFrame,其中每个分区将包含与不同谓词关联的每个子查询的记录。

查看源代码:dataframerreader .scala

未序列化的表是否适合40gb ?如果它开始交换磁盘,性能将急剧下降。

无论如何,当您使用带有ansi SQL语法的标准JDBC时,您利用了DB引擎,因此,如果teradata(我不知道teradata)保存有关表的统计信息,那么经典的"select count(*) from table"将非常快。相反,spark用"select * from table"之类的东西在内存中加载1亿行,然后对RDD行执行计数。这是一个完全不同的工作量。

与其他解决方案不同的是,将oracle表中的数据保存在保存在hadoop上的avro文件中(在许多文件中分区)。这样,用spark读取avro文件将是一件很容易的事情,因为您将不再调用db。

相关内容

  • 没有找到相关文章

最新更新