93 lines
2.7 KiB
OCaml
93 lines
2.7 KiB
OCaml
open Pam.Datatypes
|
|
open Pam
|
|
open Batteries
|
|
open Utils
|
|
|
|
let _MQ_CLIENT = "mq_client"
|
|
let _HTTP_CLIENT = "http_client"
|
|
|
|
(*
|
|
* if the deadline has passed: alert every day at noon
|
|
* else parse the alert
|
|
**)
|
|
let http_client (repos: Config.repo_data list) =
|
|
let http_actor = Httpclient.init repos in
|
|
|
|
let _ = Pamlog.info "Initialized http client" in
|
|
|
|
let check_alert_time now issues =
|
|
let rec aux acc = function
|
|
| [] -> Ok acc
|
|
| issue :: rest ->
|
|
match Issuelib.to_datetime issue with
|
|
| Error e -> Error e
|
|
| Ok None -> aux acc rest
|
|
| Ok (Some alert_times) ->
|
|
let acc' =
|
|
if Issuelib.should_alert now alert_times then
|
|
issue::acc
|
|
else
|
|
acc
|
|
in aux acc' rest
|
|
in aux [] issues
|
|
in
|
|
let rec loop () =
|
|
let _ =
|
|
match Riot.receive_any () with
|
|
| ListIssues ->
|
|
let now = Ptime_clock.now () in
|
|
let issues = Httpclient.make_get_request http_actor
|
|
in
|
|
Result.bind issues (check_alert_time now)
|
|
|> Result.map_error internal_failure
|
|
|> Result.fold ~ok:forgejo_issues ~error:List.singleton
|
|
|> List.map (Riot.send_by_name ~name:_MQ_CLIENT)
|
|
| m -> unhandled m
|
|
in
|
|
loop ()
|
|
in loop ()
|
|
|
|
let mq_client (mq_url, mq_user, mq_password) =
|
|
let call_consumer { Amqp_client_lwt.Message.message = (_content, body); _ } =
|
|
Pamlog.error [%string "Received msg from rabbitmq: %{body}. PAM will ignore."]
|
|
in
|
|
|
|
let%lwt mq = Mq.init (mq_url, mq_user, mq_password) call_consumer in
|
|
|
|
let rec loop () =
|
|
let%lwt _ = Lwt_unix.sleep 1.0 in
|
|
let _ =
|
|
try%lwt
|
|
match Riot.receive_any ~after:one_second () with
|
|
| InternalFailure err ->
|
|
let _ = Pamlog.error [%string "Got error from Forgejo: %{err}"] in
|
|
Mq.mq_publish mq err
|
|
| Reminder reminder ->
|
|
let formatted = Issuelib.issue_data_to_json reminder in
|
|
Mq.mq_publish mq formatted
|
|
| m -> unhandled m
|
|
with | Riot.Receive_timeout -> Lwt.return_unit
|
|
in
|
|
loop ()
|
|
in
|
|
loop ()
|
|
|
|
let main (config: Config.config) =
|
|
let open Riot in
|
|
let http_client_pid = spawn (fun () -> http_client config.repos) in
|
|
let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (config.mq_url, config.mq_user, config.mq_password))) in
|
|
let _ = Riot.register _HTTP_CLIENT http_client_pid in
|
|
let _ = Riot.register _MQ_CLIENT mq_client_pid
|
|
in
|
|
let timeout = 30.0 in
|
|
let rec loop_ () =
|
|
let _ = send http_client_pid ListIssues in
|
|
sleep timeout |> loop_
|
|
in
|
|
sleep timeout |> loop_
|
|
|
|
|
|
|
|
let () =
|
|
let config = Config.configuration () |> Result.fold ~error:exit2 ~ok:identity in
|
|
Riot.run (fun () -> main config)
|