此表达式的类型为 H2_lwt_unix。客户端.t,但表达式类型为"弱702 Lwt.t



如何处理一个函数中的Lwt对象?

我的代码是
Array.map (fun conn -> let* resp = (call_server conn 
(RequestVoteArg({
candidateNumber = myState.myPersistentState.id;
term = myState.myPersistentState.currentTerm;
lastlogIndex = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).index;
lastlogTerm = (Array.get myState.myPersistentState.logs ((Array.length myState.myPersistentState.logs) - 1)).term
}))) in (match resp with
| Error(s) -> Printf.printf "requestVote: connection failed: %s" s
| Ok(repl, s) -> 
(match repl with
| RequestVoteRet(repl) ->
if repl.voteGranted then current_vote := !current_vote + 1; 
if not (repl.term = (-1l)) then myState.myPersistentState.currentTerm <- repl.term;
Printf.printf "requestVote: status: %s" s
| _ -> failwith "Should not reach here")); conn) peers

但是有一个错误:这个表达式(conn在最后)的类型是H2_lwt_unix.Client。

peers是一个连接数组(类型:H2_lwt_unix.Client.t)。

call_server的定义是:
val call_server: H2_lwt_unix.Client.t -> protobufArg -> (Types.protobufRet * Grpc.Status.t, Grpc.Status.t) result Lwt.t

let build_connection addr port =
let* addrs = 
Lwt_unix.getaddrinfo addr (string_of_int port)
[ Unix.(AI_FAMILY PF_INET) ]
in
let socket = Lwt_unix.socket Unix.PF_INET Unix.SOCK_STREAM 0 in
let* () = Lwt_unix.connect socket (List.hd addrs).Unix.ai_addr in 
let error_handler _ = print_endline "error" in
let connection =
H2_lwt_unix.Client.create_connection ~error_handler socket
in connection
let call_server connection req = 
let enc = Pbrt.Encoder.create() in
match req with
| RequestVoteArg(s) -> 
let proto_s = 
Proto.Proto_types.default_request_vote_arg ~candidate_number:s.candidateNumber ~term:s.term ~lastlog_term:s.lastlogTerm ~lastlog_index:s.lastlogIndex ()
in
Proto.Proto_pb.encode_request_vote_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"RequestVote"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
(let+ decoder = decoder in
(match decoder with 
| Some (decoder) -> 
let decoder = Pbrt.Decoder.of_string decoder in
let reply = Proto.Proto_pb.decode_request_vote_reply decoder in
RequestVoteRet( { term=reply.term; voteGranted=reply.vote_granted } )
| None -> RequestVoteRet( { term=(-1l); voteGranted=false } ))))) ()
| AppendEntriesArg(s) -> 
let proto_s = 
Proto.Proto_types.default_append_entries_arg ~term: s.term ~leader_id:s.leaderID ~next_log_index:s.nextLogIndex ~next_log_term:s.nextLogTerm ~entries:(List.map (fun lg -> Proto.Proto_types.default_log ~command: lg.command ~term:lg.term ~index: lg.index ()) (Array.to_list (s.entries)) ) ()
in
Proto.Proto_pb.encode_append_entries_arg proto_s enc;
Client.call ~service:"raft.Proto" ~rpc:"AppendEntries"
~do_request:(H2_lwt_unix.Client.request connection ~error_handler:ignore)
~handler:
(Client.Rpc.unary (Pbrt.Encoder.to_string enc) ~f:(fun decoder ->
let+ decoder = decoder in 
match decoder with
| Some(decoder) ->
let decoder = Pbrt.Decoder.of_string decoder in 
let reply = Proto.Proto_pb.decode_append_entries_reply decoder in 
AppendEntriesRet( { term=reply.term; success=reply.success } )
| None -> AppendEntriesRet( { term=(-1l); success=false } ))) ()

您的代码可以简化为:

let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; conn

失败,因为conn是一个连接,而不是一个返回连接的承诺。

最简单的修复方法是将该值包装在Lwt承诺中,使用
let f (conn:H2_lwt_unix.Client.t) =
let* resp = ... in
...; Lwt.return conn

let f (conn:H2_lwt_unix.Client.t) =
let+ resp = ... in
...; conn

最新更新