有没有办法在 Kiba 作业结束时返回一些数据?



如果有一种方法可以从 Kiba ETL 运行中获取某种返回对象,以便我可以使用其中的数据来返回有关管道运行情况的报告,那就太好了。

我们有一个每 10 分钟运行一次的作业,平均处理 20 - 50k 条记录,并将它们压缩为摘要记录,其中一些是创建的,其中一些是更新的。问题是,如果不拖网大量日志文件,就很难知道发生了什么,显然,日志对最终用户也很有用。

有没有办法在管道运行时用任意数据填充某种结果对象? 例如

  • 在源中找到 25.7k 行
  • 此转换器丢弃了 782 条记录
  • 插入 100 条记录
  • 已更新 150 条记录
  • 20 条记录有错误(在这里(
  • 该记录具有最高的统计数据
  • 1200 条记录属于此 VIP 客户
  • 等。

最后,使用该数据发送电子邮件摘要、填充网页、呈现一些控制台输出等。

目前,我现在可以看到它工作的唯一方法是在设置期间发送一个对象,并在它流经源、转换器和目的地时对其进行变异。运行完成后,然后检查变量并对现在所在的数据执行一些操作。

这是应该做的,还是有更好的方法?

编辑

只是想补充一点,我不想在post_process块中处理这个问题,因为管道是通过许多不同的媒介使用的,我希望每个用例都处理自己的反馈机制。对于 ETL 管道来说,它更干净 (imo(,不必担心它的使用位置,以及该使用场景的反馈期望是什么......

答案高度依赖于上下文,但这里有一些准则。

如果结果对象不是太大,实际上我建议您传递一个空的结果对象(通常是Hash(,然后在运行期间填充它(您也可以使用某种形式的中间件来跟踪异常本身(。

如何填充它将取决于上下文和您的实际需求,但这可以通过与作业无关的方式完成(也许使用 DSL 扩展 https://github.com/thbar/kiba/wiki/How-to-extend-the-Kiba-DSL,您可以实现一些相当高级的扩展,这些扩展将注册所需的转换或块以实现您需要的(。

该对象可以按原样使用,也可以序列化为 JSON 或类似内容,如果您稍后需要提供一些丰富的输出(或者您可以使用它来准备其他内容(,甚至可以存储到数据库中。

如果需要,您甚至可以在特定数据库中拥有一些结构合理的内容,用于此目的(例如,如果您需要一种简单的方法来向客户公开它(。

请注意,您可以通过编程方式定义post_process,而无需作业意识到这一点(无需耦合(。这是一个非常简单的例子:

module ETL
module DSLExtensions
module EmailReport
def setup_email_report
pre_process do
@email_report_stats = Hash.new(0)
end
post_process do
# Do the actual email sending
end
end
def track_event!(event:)
@email_report_stats[event] += 1
end
end
end
end
Kiba.parse do
extend ETL::DSLExtensions::EmailReport
# this will register the pre/post process
setup_email_report
source ...
track_event!(event: 'row_read')
transform
transform
transform
track_event!(event: 'row_written')
destination ...
end

如果执行此操作,请确保使用非常好的命名空间变量,以避免任何冲突。

请注意,如前所述,这不包括失败的情况,但你明白了!

最新更新