diff --git a/pam/bin/datatypes.ml b/pam/bin/datatypes.ml index 6d37181..86f4dc0 100644 --- a/pam/bin/datatypes.ml +++ b/pam/bin/datatypes.ml @@ -9,7 +9,7 @@ end type reminder = { (* from an issue in forgejo get a reminder *) url: string; title: string; - due_date: string ; + due_date: string option; body: string; matrix_target: MatrixRoom.t } @@ -17,7 +17,6 @@ type reminder = { (* from an issue in forgejo get a reminder *) type Riot.Message.t += | RegisterClient of (client_id * Riot.Pid.t) | LookupClient of client_id - | ReceivedFromMq of string | ListIssues | ForgejoIssues of reminder list | ForgejoError of string diff --git a/pam/bin/dune b/pam/bin/dune index 03c26c4..13f54ac 100644 --- a/pam/bin/dune +++ b/pam/bin/dune @@ -1,7 +1,7 @@ (executable (public_name pam) (name main) - (libraries riot amqp-client-lwt pyml yojson batteries otoml ) + (libraries riot amqp-client-lwt pyml yojson batteries otoml syslog ptime) (preprocess (pps lwt_ppx ppx_string )) ) diff --git a/pam/bin/httpclient.ml b/pam/bin/httpclient.ml index b945932..e7c6ade 100644 --- a/pam/bin/httpclient.ml +++ b/pam/bin/httpclient.ml @@ -5,30 +5,35 @@ open Yojson.Safe open Config open Utils -let forgejo_url repo_id = [%string - "https://salsa.lezzo.org/api/v1/repos/%{repo_id}/issues?state=open&type=issues"] - -let issue_of_json (m_room: Datatypes.MatrixRoom.t) (json): Datatypes.reminder option = +let issue_of_json (m_room: Datatypes.MatrixRoom.t) (json): Datatypes.reminder = let open Yojson.Safe.Util in let due_date = json |> member "due_date" |> to_option to_string in - match due_date with - | None -> None - | Some due_date -> - let record: Datatypes.reminder = { - url = json |> member "url" |> to_string; - title = json |> member "title" |> to_string; - body = json |> member "body" |> to_string; - matrix_target = m_room; - due_date = due_date - } - in Some record + let record: Datatypes.reminder = { + url = json |> member "url" |> to_string; + title = json |> member "title" |> to_string; + body = json |> member "body" |> to_string; + matrix_target = m_room; + due_date = due_date + } + in record let issues_of_json matrix_room json_str = let open Yojson.Safe.Util in - try json_str |> from_string |> to_list |> List.map (issue_of_json matrix_room) |> List.filter_map identity |> Result.ok - with | Yojson.Json_error msg -> Error [%string "JSON parsing error: %{msg}"] + try + json_str |> from_string |> to_list |> List.map (issue_of_json matrix_room) |> Result.ok + with + | Yojson.Json_error msg -> Error [%string "JSON parsing error: %{msg}"] -type repo_pytuple = {url: pyobject; headers: pyobject} +module ForgejoUrl = struct + type t = {url: string; page: int} + let from_id repo_id = + { url = [%string "https://salsa.lezzo.org/api/v1/repos/%{repo_id}/issues?state=open&type=issues&limit=50"]; + page = 0 } + let next_page t = {t with page=t.page+1} + let to_string t = [%string "%{t.url}&page=%{t.page#Int}"] +end + +type repo_pytuple = {url: ForgejoUrl.t; headers: pyobject} type http_actor = {requests: pyobject; repos: repo_pytuple StringMap.t} let make_headers base64_password = @@ -45,25 +50,56 @@ let init (repos: Config.repo_data list) = let urls = repos |> List.map (fun {forgejo_id; base64_password; matrix_room} -> - (matrix_room, {url=forgejo_url forgejo_id |> Py.String.of_string; headers=make_headers base64_password})) + (matrix_room, {url=ForgejoUrl.from_id forgejo_id; headers=make_headers base64_password})) |> StringMap.of_list in {requests=requests; repos=urls} +let pyprint () = + let builtins = Py.Eval.get_builtins () in + let p = Py.Dict.find_string builtins "print" in + Py.Callable.to_function p + +let extract_pagination resp = + let key = Py.String.of_string "X-Total-Count" in + resp + .@$("headers") + .&("get") [|key|] (* not really a pydict, so need to use `get` *) + |> Py.Object.to_string + |> int_of_string + let make_get_request {requests; repos} = let get = Py.Module.get_function_with_keywords requests "get" in - let fold_fn = (fun m_room_string {url; headers} acc -> - let resp = get [|url|] [("headers", headers)] in + + + let rec fold_fn (accum: (Datatypes.reminder list)) = function + | [] -> Ok accum + | (m_room_string, {url; headers})::rest -> + let pyurl = url |> ForgejoUrl.to_string |> Py.String.of_string in + let resp = get [|pyurl|] [("headers", headers)] in let jsontext = resp.@$("text") |> Py.String.to_string in - let _ = resp.@$("headers") |> Py.Dict.to_bindings_string |> List.iter (fun e -> e |> Batteries.dump |> print_endline) - in + let _ = print_endline "linea 62" in + let target_items_total = extract_pagination resp in + let _ = print_endline (url |> ForgejoUrl.to_string) in + let _ = print_endline "linea 64" in let matrix_room = Datatypes.MatrixRoom.make m_room_string in - let value = - issues_of_json matrix_room jsontext - |> Result.map Datatypes.forgejo_issues - |> Result.map_error Datatypes.forgejo_error (*TODO: maybe not really a forgejo error, more like internal *) - in value::acc) - in StringMap.fold fold_fn repos [] + let issues = issues_of_json matrix_room jsontext in + match issues with + | Error err -> + Error err + | Ok target_issues -> + let n_received = List.length target_issues in + let accum' = target_issues@accum in + let urls = + if n_received <> target_items_total then + (m_room_string, {url=ForgejoUrl.next_page url; headers=headers})::rest + else + rest + in fold_fn accum' urls + + in + StringMap.to_list repos + |> fold_fn [] diff --git a/pam/bin/main.ml b/pam/bin/main.ml index 254fb93..49eb260 100644 --- a/pam/bin/main.ml +++ b/pam/bin/main.ml @@ -1,4 +1,3 @@ -open Riot open Datatypes open Batteries open Utils @@ -6,6 +5,22 @@ open Utils let _MQ_CLIENT = "mq_client" let _HTTP_CLIENT = "http_client" + +let generate_alert = function + (* + * if the deadline has passed: alert every day at noon + * else parse the alert + **) + | {due_date=None; _} -> None + | {due_date=Some date; body; _} -> + let now = Ptime_clock.now () in + let today = now |> Ptime.to_date in + Ptime.of_rfc3339 date + |> Ptime.rfc3339_string_error + |> Result.map (fun (timestamp, _, _) -> Ptime.to_date timestamp = today) + |> Option.some + + let http_client (repos: Config.repo_data list) = let http_actor = Httpclient.init repos in let _ = print_endline "Initialized http client" in @@ -15,23 +30,27 @@ let http_client (repos: Config.repo_data list) = match Riot.receive () with | ListIssues -> Httpclient.make_get_request http_actor - |> List.map (Result.fold ~ok:identity ~error:identity) - |> List.iter (Riot.send_by_name ~name:_MQ_CLIENT) + |> Result.map Datatypes.forgejo_issues + |> Result.map_error Datatypes.forgejo_error + |> Result.fold ~ok:identity ~error:identity + |> Riot.send_by_name ~name:_MQ_CLIENT + + | m -> unhandled m in loop () in loop () -let mq_client (mq_url, mq_user, mq_password) consumer = +let mq_client (mq_url, mq_user, mq_password) = let pprint rem = [%string "%{rem.title}|%{rem.matrix_target}"] in - let call_consumer pid { Amqp_client_lwt.Message.message = (_content, body); _ } = - Riot.send pid (Datatypes.ReceivedFromMq ("GOT RABBITs: "^body)) (*TODO: log*) + let call_consumer { Amqp_client_lwt.Message.message = (_content, body); _ } = + Pamlog.error [%string "Received msg from rabbitmq: %{body}. PAM will ignore."] in - let%lwt mq = Mq.init (mq_url, mq_user, mq_password) consumer call_consumer in + let%lwt mq = Mq.init (mq_url, mq_user, mq_password) call_consumer in let rec loop () = let%lwt _ = Lwt_unix.sleep 1.0 in @@ -39,8 +58,7 @@ let mq_client (mq_url, mq_user, mq_password) consumer = try%lwt match Riot.receive ~after:one_second () with | ForgejoError err -> - let _ = print_endline [%string "Got error from Forgejo: %{err}"] in - (* Lwt.return (Mq.mq_publish mq err ) *) + let _ = Pamlog.error [%string "Got error from Forgejo: %{err}"] in Mq.mq_publish mq err | ForgejoIssues reminders -> let _ = [%string "Got reminders: %{Batteries.dump reminders}"] |> print_endline in @@ -54,8 +72,9 @@ let mq_client (mq_url, mq_user, mq_password) consumer = loop () let main (config: Config.config) = + let open Riot in let http_client_pid = spawn (fun () -> http_client config.repos) in - let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (config.mq_url, config.mq_user, config.mq_password) http_client_pid)) in + let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (config.mq_url, config.mq_user, config.mq_password))) in let _ = Riot.register _HTTP_CLIENT http_client_pid in let _ = Riot.register _MQ_CLIENT mq_client_pid in diff --git a/pam/bin/mq.ml b/pam/bin/mq.ml index 69fdb57..c4791ae 100644 --- a/pam/bin/mq.ml +++ b/pam/bin/mq.ml @@ -7,16 +7,16 @@ let no_routing_key = `Queue "" type rabbit_actor = {queue: Queue.t; channel: Channel.no_confirm Channel.t; connection: Connection.t} -let init (mq_url, mq_user, mq_password) riot_consumer consume_fn = +let init (mq_url, mq_user, mq_password) consume_fn = (* Init rabbitmq client *) let%lwt connection = Amqp.Connection.connect ~credentials:(mq_user, mq_password) ~id:"pam" mq_url in let%lwt channel = Amqp.Connection.open_channel ~id:"pam" Amqp.Channel.no_confirm connection in let%lwt queue = Amqp.Queue.declare channel "pam" in - let _ = print_endline "Initialized rabbitmq client" in + let _ = Pamlog.info "Initialized rabbitmq client" in (* Establish consumer *) let%lwt (_ampq_consumer, reader) = Amqp.Queue.consume ~no_ack:true ~id:"" channel queue in - let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:(consume_fn riot_consumer)) in + let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:consume_fn) in Lwt.return {queue=queue; channel=channel; connection=connection} let shutdown {queue=_queue; channel; connection} = @@ -25,10 +25,11 @@ let shutdown {queue=_queue; channel; connection} = let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in Lwt.return_unit -let mq_publish {queue=_; channel; connection=_} msg = - let%lwt _ = - Message.make msg - |> Exchange.publish channel Exchange.default ~routing_key:"lanonna" +let mq_publish {queue=_; channel=_; connection=_} msg = + let _ = Pamlog.info [%string "GOT MSG FOR MQ:: %{msg}"] + (* let%lwt _ = *) + (* Message.make msg *) + (* |> Exchange.publish channel Exchange.default ~routing_key:"lanonna" *) in Lwt.return_unit let rec mq_publish_all mq = function