我有一个Hive表,用于保存客户呼叫的数据。为简单起见,考虑它有 2 列,第一列保存客户 ID,第二列保存调用的时间戳(unix 时间戳)。
我可以查询此表以查找每个客户的所有呼叫:
SELECT * FROM mytable SORT BY customer_id, call_time;
结果是:
Customer1 timestamp11
Customer1 timestamp12
Customer1 timestamp13
Customer2 timestamp21
Customer3 timestamp31
Customer3 timestamp32
...
是否可以创建一个 Hive 查询,从第二次调用开始为每个客户返回两个成功调用之间的时间间隔?对于上面的示例,查询应返回:
Customer1 timestamp12-timestamp11
Customer1 timestamp13-timestamp12
Customer3 timestamp32-timestamp31
...
我试图从 sql 解决方案中调整解决方案,但我坚持使用 Hive 的限制:它只接受 FROM 中的子查询,并且连接必须只包含相等。
谢谢。
编辑1:
我尝试使用Hive UDF函数:
public class DeltaComputerUDF extends UDF {
private String previousCustomerId;
private long previousCallTime;
public String evaluate(String customerId, LongWritable callTime) {
long callTimeValue = callTime.get();
String timeDifference = null;
if (customerId.equals(previousCustomerId)) {
timeDifference = new Long(callTimeValue - previousCallTime).toString();
}
previousCustomerId = customerId;
previousCallTime = callTimeValue;
return timeDifference;
}}
并将其与名称"delta"一起使用。
但似乎(从日志和结果来看)它正在 MAP 时使用。 由此产生了 2 个问题:
首先:在使用此功能之前,必须按客户 ID 和时间戳对表数据进行排序。查询:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time;
不起作用,因为排序部分是在 REDUCE 时间执行的,在我的函数被使用很久之后。
我可以在使用函数之前对表数据进行排序,但我对此不满意,因为这是我希望避免的开销。
第二:在分布式Hadoop配置的情况下,数据在可用的作业跟踪器之间分配。所以我相信这个函数会有多个实例,每个映射器一个,所以可以在 2 个映射器之间拆分相同的客户数据。在这种情况下,我将丢失客户电话,这是不可接受的。
我不知道如何解决这个问题。我知道 DISTRIBUTED BY 确保所有具有特定值的数据都被发送到同一个化简器(从而确保 SORT 按预期工作),有谁知道映射器是否有类似的东西?
接下来,我计划遵循 libjack 的建议,使用 reduce 脚本。在其他一些 Hive 查询之间需要这种"计算",所以我想尝试 Hive 提供的所有内容,然后再移动到另一个工具,正如 Balaswamy vaddeman 所建议的那样。
编辑2:
我开始研究自定义脚本解决方案。但是,在编程Hive书第14章的第一页(本章介绍了自定义脚本),我找到了以下段落:
流式处理通常不如编码类似的 UDF 或 输入格式对象。序列化和反序列化数据以将其传入和 出管效率相对低下。调试整体也更难 统一编程。但是,它对于快速原型设计很有用 以及利用不是用 Java 编写的现有代码。对于蜂巢 不想写Java代码的用户,可以非常有效 方法。
因此,很明显,就效率而言,自定义脚本不是最佳解决方案。
但是我应该如何保留我的UDF功能,但确保它在分布式Hadoop配置中按预期工作?我在语言手册UDF wiki页面的UDF内部部分找到了这个问题的答案。如果我编写查询:
SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
它在 REDUCE 时间执行,DISTRIBUTIONBY 和 SORT BY 构造保证来自同一客户的所有记录都由同一个化简器按调用顺序处理。
所以上面的UDF和这个查询结构解决了我的问题。
(很抱歉没有添加链接,但我不允许这样做,因为我没有足够的声誉点)
这是一个古老的问题,但为了将来的参考,我在这里写另一个命题:
Hive 窗口函数允许在查询中使用上一个/下一个值。
类似的代码查询可能是:
SELECT customer_id, call_time - LAG(call_time, 1, 0) OVER (PARTITION BY customer_id ORDER BY call_time) FROM mytable;
显式MAP-REDUCE
与其他编程语言(如Java或Python)一起使用。从映射{cutomer_id,call_time}
和化简器发出的地方,你会得到{customer_id,list{time_stamp}}
,在化简器中,你可以对这些时间戳进行排序并可以处理数据。
也许有人遇到类似的要求,我找到的解决方案如下:
1) 创建自定义函数:
package com.example;
// imports (they depend on the hive version)
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) "
+ "- computes the time passed between two succesive records from the same customer. "
+ "It generates 3 columns: first contains the customer id, second contains call time "
+ "and third contains the time passed from the previous call. This function returns only "
+ "the records that have a previous call from the same customer (requirements are not applicable "
+ "to the first call)", extended = "Example:n> SELECT _FUNC_(customer_id, call_time) AS"
+ "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable "
+ "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;")
public class DeltaComputerUDTF extends GenericUDTF {
private static final int NUM_COLS = 3;
private Text[] retCols; // array of returned column values
private ObjectInspector[] inputOIs; // input ObjectInspectors
private String prevCustomerId;
private Long prevCallTime;
@Override
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException {
if (ois.length != 2) {
throw new UDFArgumentException(
"There must be 2 arguments: customer Id column name and call time column name");
}
inputOIs = ois;
// construct the output column data holders
retCols = new Text[NUM_COLS];
for (int i = 0; i < NUM_COLS; ++i) {
retCols[i] = new Text();
}
// construct output object inspector
List<String> fieldNames = new ArrayList<String>(NUM_COLS);
List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS);
for (int i = 0; i < NUM_COLS; ++i) {
// column name can be anything since it will be named by UDTF as clause
fieldNames.add("c" + i);
// all returned type will be Text
fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}
@Override
public void process(Object[] args) throws HiveException {
String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]);
Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]);
if (customerId.equals(prevCustomerId)) {
retCols[0].set(customerId);
retCols[1].set(callTime.toString());
retCols[2].set(new Long(callTime - prevCallTime).toString());
forward(retCols);
}
// Store the current customer data, for the next line
prevCustomerId = customerId;
prevCallTime = callTime;
}
@Override
public void close() throws HiveException {
// TODO Auto-generated method stub
}
}
2) 创建一个包含此函数的 jar。假设jarname是myjar.jar。
3)将罐子复制到带有Hive的机器上。假设它被放置在/tmp 中
4) 在 Hive 中定义自定义函数:
ADD JAR /tmp/myjar.jar;
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF';
5) 执行查询:
SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM
(SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;
言论:
一个。我假设call_time列将数据存储为 bigint。如果它是字符串,在进程函数中,我们将其检索为字符串(就像我们对 customerId 所做的那样),然后将其解析为 Long
二.我决定使用 UDTF 而不是 UDF,因为这样它会生成所需的所有数据。否则(使用 UDF)需要过滤生成的数据以跳过 NULL 值。因此,使用原始帖子的第一次编辑中描述的UDF函数(DeltaComputerUDF),查询将是:
SELECT customer_id, call_time, time_difference
FROM
(
SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference)
FROM
(
SELECT customer_id, call_time FROM mytable
DISTRIBUTE BY customer_id
SORT BY customer_id, call_time
) t
) u
WHERE time_difference IS NOT NULL;
三.无论表中行的顺序如何,这两个函数(UDF 和 UDTF)都按预期工作(因此在使用增量函数之前,不需要按客户 ID 和调用时间对表数据进行排序)