Nest for .NET BulkAllRequest BufferToBulk send a Dictionary<string, object>



我有一个围绕Nest的助手类,用于与Elastic Search交互。此方法采用BlockingCollection并用于执行批量操作。

IndexDocument看起来像:

public class IndexDocument
{
public string ID { get; set; } // ID of the record in ElasticSearch
public IndexFile File { get; set; } // Underlying file that this record came from
public string Index { get; set; } // Index that should be posted to
public string Document { get; set; } // JSON of the actual record
}

批量操作是首选操作,因为我们可以指定1个或多个索引。为了提高性能,我已经将所有索引中的所有数据合并到一个操作中。问题是Nest不喜欢原始JSON。通常,低级别客户端就是您想要的。与其重新发明这种批量操作的轮子,我想使用它。

public static void Bulk(IEnumerable<IndexDocument> documents)
{
var request = new BulkAllRequest<IndexDocument>(documents);
Client.BulkAll(documents, func =>
{
return func.Index(null).BufferToBulk((descriptor, buffer) =>
{
foreach (var document in buffer)
{
descriptor.Index<Dictionary<string, object>>(operation =>
{
var product = JsonConvert.DeserializeObject<Dictionary<string, object>>(document.Document);
return operation.Index(document.Index)
.Document(product)
.Id(document.ID);
});
}
})
.BackOffTime("10s")
.Size(1)  // if I can't get one to work....
.RefreshOnCompleted()
.MaxDegreeOfParallelism(10)
.BackOffRetries(2);
}).Wait(TimeSpan.FromMinutes(5), next =>
{
});
}

我遇到的问题是如何将RAW json转换为Nest可以接受的对象?我的原始JSON包括:

  • 一组核心字段。(必需(
  • 一组公共字段。(可选(
  • 一组属性。(可选(
  • 一组试验田。(可选(

这些字段表示为Dictionary<字符串,对象>今天这些数据是在我们的开发组之外管理的,所以我不能修改它。我不能创建一个具体的类,因为字段是动态的。我在字典上创建了一个具体的类,但我收到了一条错误消息:

Elasticsearch.Net.UnexpectedElasticsearchClientException
HResult=0x80131500
Message=GenericArguments[0], 'Newtonsoft.Json.Linq.JContainer', on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.
Source=Nest
StackTrace:
at Nest.BlockingSubscribeExtensions.WaitOnObservable[TObservable,TObserve,TObserver](TObservable observable, TimeSpan maximumRunTime, Func`3 factory)
at Nest.BlockingSubscribeExtensions.Wait[T](BulkAllObservable`1 observable, TimeSpan maximumRunTime, Action`1 onNext)
at Ced.Search.Services.Indexing.Helpers.ElasticHelper.Bulk2(IEnumerable`1 documents) in 
[STACK]
Inner Exception 1:
TypeLoadException: GenericArguments[0], 'Newtonsoft.Json.Linq.JContainer', on 'Elasticsearch.Net.Utf8Json.Formatters.NonGenericListFormatter`1[T]' violates the constraint of type parameter 'T'.

我试图使用Newtonsoft.JObject,但它发布了一个奇怪的无效值:

[Request]
{"index":{"_id":"cpn_0007473CPN46","_index":"trade_1"}}
[[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[]],[[[]]]] // WTF IS THIS!!!!
[Response]
{"took":1,"errors":true,"items":[{"index":{"_index":"trade_1","_type":"_doc","_id":"cpn_0007473CPN46","status":400,"error":{"type":"mapper_parsing_exception","reason":"failed to parse","caused_by":{"type":"not_x_content_exception","reason":"not_x_content_exception: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"}}}}]}

这个例外是有道理的。它是垃圾邮件。

我还能尝试什么?

。。。我找到了答案。我偶然发现了这里描述的自定义JSON序列化程序功能:

https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/custom-serialization.html

问题的根源是内置的序列化程序不知道如何转换泛型字典甚至JObject。所以我最初的ElasticClient看起来像:

private static ElasticClient Client = new ElasticClient(ServiceConfiguration.Instance.ElasticSearch.Uri);

现在看起来像:

private static ElasticClient Client = null;
static ElasticHelper()
{
var pool = new SingleNodeConnectionPool(ServiceConfiguration.Instance.ElasticSearch.Uri);
var connectionSettings = new ConnectionSettings(pool, sourceSerializer: (builtin, settings) =>
{
return new JsonNetSerializer(builtin, settings, () =>
{
return new JsonSerializerSettings { };
},
resolver => resolver.NamingStrategy = new DefaultNamingStrategy());
});
Client = new ElasticClient(connectionSettings);
}

现在,我的批量操作正在按预期进行序列化。请确保将JsonNetSerializer nuget包添加到您的项目中。

using Nest.JsonNetSerializer;

相关内容

最新更新