44 lines
1.9 KiB
OCaml
44 lines
1.9 KiB
OCaml
|
open Amqp_client_lwt
|
||
|
|
||
|
let handler pid { Message.message = (_content, body); _ } =
|
||
|
Riot.send pid (Datatypes.ReceivedFromMq ("GOT RABBITs: "^body)) (*TODO: log*)
|
||
|
|
||
|
let handle_consumer_exn _exn =
|
||
|
Lwt.return_unit (* TODO *)
|
||
|
|
||
|
let no_routing_key = `Queue ""
|
||
|
|
||
|
type rabbit_actor = {queue: Queue.t; channel: Channel.no_confirm Channel.t; connection: Connection.t}
|
||
|
|
||
|
let init riot_consumer =
|
||
|
(* Init rabbitmq client *)
|
||
|
let%lwt connection = Amqp.Connection.connect ~credentials:Datatypes.CREDS ~id:"Pam" MQURL in (*TODO: Creds*)
|
||
|
let%lwt channel = Amqp.Connection.open_channel ~id:"Pam" Amqp.Channel.no_confirm connection in
|
||
|
let%lwt exchange = Amqp.Exchange.declare channel Exchange.direct_t "pam" in
|
||
|
(* let%lwt lanonna_exchange = Amqp.Exchange.declare channel Exchange.direct_t "lanonna" in *)
|
||
|
let%lwt queue = Amqp.Queue.declare channel "pam" in
|
||
|
let _ = Amqp.Queue.bind channel queue exchange no_routing_key in
|
||
|
(* Amqp.Queue.publish channel queue (Amqp.Message.make "My Message Payload") >>= function `Ok -> *)
|
||
|
let _ = print_endline "Initialized rabbitmq client" in
|
||
|
|
||
|
(* Establish consumer *)
|
||
|
let%lwt (_ampq_consumer, reader) = Amqp.Queue.consume ~no_ack:true ~id:"" channel queue in
|
||
|
let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:(handler riot_consumer)) in
|
||
|
Lwt.return {queue=queue; channel=channel; connection=connection}
|
||
|
|
||
|
let shutdown {queue=_queue; channel; connection} =
|
||
|
let ignore_exn = fun _ -> Lwt.return_unit in
|
||
|
let%lwt _ = Lwt.catch (fun () -> Amqp.Channel.close channel) ignore_exn in
|
||
|
let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in
|
||
|
Lwt.return_unit
|
||
|
|
||
|
let mq_publish {queue=_; channel; connection=_} msg =
|
||
|
let%lwt _ =
|
||
|
Message.make msg
|
||
|
|> Exchange.publish channel Exchange.default ~routing_key:"lanonna"
|
||
|
in Lwt.return_unit
|
||
|
|
||
|
let rec mq_publish_all mq = function
|
||
|
| [] -> Lwt.return_unit
|
||
|
| r::rest -> let%lwt _ = mq_publish mq r in mq_publish_all mq rest
|