open Amqp_client_lwt let handle_consumer_exn _exn = Lwt.return_unit (* TODO *) let no_routing_key = `Queue "" type rabbit_actor = {queue: Queue.t; channel: Channel.no_confirm Channel.t; connection: Connection.t} let init (mq_url, mq_user, mq_password) consume_fn = (* Init rabbitmq client *) let%lwt connection = Amqp.Connection.connect ~credentials:(mq_user, mq_password) ~id:"pam" mq_url in let%lwt channel = Amqp.Connection.open_channel ~id:"pam" Amqp.Channel.no_confirm connection in let%lwt queue = Amqp.Queue.declare channel "pam" in let _ = Pamlog.info "Initialized rabbitmq client" in (* Establish consumer *) let%lwt (_ampq_consumer, reader) = Amqp.Queue.consume ~no_ack:true ~id:"" channel queue in let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:consume_fn) in Lwt.return {queue=queue; channel=channel; connection=connection} let shutdown {queue=_queue; channel; connection} = let ignore_exn = fun _ -> Lwt.return_unit in let%lwt _ = Lwt.catch (fun () -> Amqp.Channel.close channel) ignore_exn in let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in Lwt.return_unit let mq_publish {queue=_; channel=channel; connection=_} msg = let _ = Pamlog.info [%string "GOT MSG FOR MQ:: %{msg}"] in let%lwt _ = Message.make msg |> Exchange.publish channel Exchange.default ~routing_key:"lanonna" in Lwt.return_unit let error_msg_to_json_format room_id error = let open Yojson.Basic in let content = [%string {|Pam รจ entrata in errore: %{error}| |}] in let d = `Assoc [ ("content", `String content); ("source_message_id", `Null); ("room_id", `String room_id); ("as_reply", `Bool false); ("as_markdown", `Bool true) ] in to_string d