Riak Java无法在Scala中反序列化MapReduce查询中的域对象



在一个简单的bucket上执行MapReduce查询。出于某种原因,我从杰克逊那里得到了一个例外:

Caused by: org.codehaus.jackson.map.JsonMappingException: Can not deserialize instance of com.threetierlogic.AccountService.models.User out of START_ARRAY token
 at [Source: java.io.StringReader@39494ff0; line: 1, column: 2]
        at org.codehaus.jackson.map.JsonMappingException.from(JsonMappingException.java:163)
        at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:219)
        at org.codehaus.jackson.map.deser.StdDeserializationContext.mappingException(StdDeserializationContext.java:212)
        ... 179 more

以下是我正在执行的MapReduce查询:

DB.client.mapReduce(bucketName)
    .addReducePhase(NamedErlangFunction.REDUCE_IDENTITY)
    .execute().getResult(classOf[User])

现在,如果我使用getResultRaw()调用JSON,它将返回一个JSON字符串,该字符串不包含值,只包含键:

[["accounts-user","8f0bb6e41592690d701225e263807a5e"],["accounts-user","2687cf9444013c45ba2637e9f6d3d3ad"],["accounts-user","3507e2e1f3d2818fdd276214d594c8e"],["accounts-user","fd186b0293ab7eb737f8b66e353fe4a6"],["accounts-user","bf6ce6bca0f642abfe74f2e2281e676c"],["accounts-user","b58d356551a8df6d3bbaf65e577f4b12"],["accounts-user","8126d599d259fd43f701c90787096049"],["accounts-user","33b9ae3befb23b7030b609158bb762d"],["accounts-user","770a897d5ce8c8e118ae121fc01f4c80"],["accounts-user","edae605390c35256b5df055f5574734d"],["accounts-user","ef19ad34a2be4ab8de111d1590a8768b"],["accounts-user","89a9f29ac937595038d37169f9ba7c8"],["accounts-user","85be26f43f7bb74eefa7683dcc74c555"]]

那么我在这里俯瞰着什么呢?我需要调用某种域转换器吗?还是MapReduce查询本身有问题?

编辑

如果有帮助的话,下面是我如何在Riak:中存储IRiakObjects

def store(o: User) = bucket.store(o).withConverter(new UserConverter(bucketName)).execute()

注释中提到的第一个问题是REDUCE_IDENTITY返回了一个bucket/密钥对列表,这不是您想要的。第二个问题是。。。如果您存储的不是JSON,则不能直接从mapreduce转到POJO。第三个(排序)问题是。。。Riak mapreduce实际上不是为二进制值而设计的。

以下使用了食谱中示例中的KryoPersonConverterPerson类,该示例演示了如何在自定义转换器中使用Kryo。

我将中断代码样本并内联我的评论:

public class App 
{
    public static void main( String[] args ) throws RiakException
    {
        List<Person> personList = new ArrayList<Person>();
        Person p = new Person("Bob","111 Elm Street","555-1212");
        personList.add(p);
        p = new Person("Jenny","122 Spruce Lane","867-5309");
        personList.add(p);
        p = new Person("Steve","333 Oak place","555-1111");
        personList.add(p);

        IRiakClient client = RiakFactory.pbcClient();
        Bucket b = client.fetchBucket("people").execute();
        KryoPersonConverter converter = new KryoPersonConverter("people");
        for (Person p2 : personList)
        {
            b.store(p2).withConverter(converter).execute();
        }
        p = new Person();
        p.setName("Jenny");
        p = b.fetch(p).withConverter(converter).execute();
        assert(p.getPhone().equals("867-5309")); // I got your number

一切都到现在了吗?好吧!在使用Kryo对其进行序列化后,我们在Riak中存储了一个POJO,并对其进行了检索

        MapReduceResult result = client.mapReduce("people")
                                       .addMapPhase(NamedErlangFunction.MAP_OBJECT_VALUE)
                                       .execute();
        System.out.println(result.getResultRaw());

这里我们看到了问题,因为println()的输出是:

["u0001u0001u000e111 Elm Streetu0001u0003Bobu0001b555-1212","u0001u0001r333 Oak placeu0001u0005Steveu0001b555-1111","u0001u0001u000f122 Spruce Laneu0001u0005Jennyu0001b867-5309"]

不幸的是,在谈到存储数据时,Riak中的mapreduce实际上是用于JSON数据(或者只是纯字符串)。我们有一个JSON数组,其中包含我们存储的字节的JSON字符串。

要处理此问题,您必须将String s作为Collection,然后使用Kryo转换字节。

        Collection<String> objects = result.getResult(String.class);

        Kryo kryo = new Kryo();
        kryo.register(Person.class);
        ObjectBuffer buffer = new ObjectBuffer(kryo);
        for (String s : objects)
        {
            Person p3 = buffer.readObject(s.getBytes(), Person.class);
            System.out.println(p3.getName() + " " + p3.getPhone());
        }
        client.shutdown();     
    }
}

你会得到输出:

Bob 555-1212
Steve 555-1111
Jenny 867-5309

相关内容

  • 没有找到相关文章

最新更新