burro di arachidi
This commit is contained in:
parent
f25852d903
commit
f8933d8c07
|
@ -9,7 +9,7 @@ end
|
|||
type reminder = { (* from an issue in forgejo get a reminder *)
|
||||
url: string;
|
||||
title: string;
|
||||
due_date: string ;
|
||||
due_date: string option;
|
||||
body: string;
|
||||
matrix_target: MatrixRoom.t
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ type reminder = { (* from an issue in forgejo get a reminder *)
|
|||
type Riot.Message.t +=
|
||||
| RegisterClient of (client_id * Riot.Pid.t)
|
||||
| LookupClient of client_id
|
||||
| ReceivedFromMq of string
|
||||
| ListIssues
|
||||
| ForgejoIssues of reminder list
|
||||
| ForgejoError of string
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
(executable
|
||||
(public_name pam)
|
||||
(name main)
|
||||
(libraries riot amqp-client-lwt pyml yojson batteries otoml )
|
||||
(libraries riot amqp-client-lwt pyml yojson batteries otoml syslog ptime)
|
||||
(preprocess
|
||||
(pps lwt_ppx ppx_string ))
|
||||
)
|
||||
|
|
|
@ -5,30 +5,35 @@ open Yojson.Safe
|
|||
open Config
|
||||
open Utils
|
||||
|
||||
let forgejo_url repo_id = [%string
|
||||
"https://salsa.lezzo.org/api/v1/repos/%{repo_id}/issues?state=open&type=issues"]
|
||||
|
||||
let issue_of_json (m_room: Datatypes.MatrixRoom.t) (json): Datatypes.reminder option =
|
||||
let issue_of_json (m_room: Datatypes.MatrixRoom.t) (json): Datatypes.reminder =
|
||||
let open Yojson.Safe.Util in
|
||||
let due_date = json |> member "due_date" |> to_option to_string in
|
||||
match due_date with
|
||||
| None -> None
|
||||
| Some due_date ->
|
||||
let record: Datatypes.reminder = {
|
||||
url = json |> member "url" |> to_string;
|
||||
title = json |> member "title" |> to_string;
|
||||
body = json |> member "body" |> to_string;
|
||||
matrix_target = m_room;
|
||||
due_date = due_date
|
||||
}
|
||||
in Some record
|
||||
let record: Datatypes.reminder = {
|
||||
url = json |> member "url" |> to_string;
|
||||
title = json |> member "title" |> to_string;
|
||||
body = json |> member "body" |> to_string;
|
||||
matrix_target = m_room;
|
||||
due_date = due_date
|
||||
}
|
||||
in record
|
||||
|
||||
let issues_of_json matrix_room json_str =
|
||||
let open Yojson.Safe.Util in
|
||||
try json_str |> from_string |> to_list |> List.map (issue_of_json matrix_room) |> List.filter_map identity |> Result.ok
|
||||
with | Yojson.Json_error msg -> Error [%string "JSON parsing error: %{msg}"]
|
||||
try
|
||||
json_str |> from_string |> to_list |> List.map (issue_of_json matrix_room) |> Result.ok
|
||||
with
|
||||
| Yojson.Json_error msg -> Error [%string "JSON parsing error: %{msg}"]
|
||||
|
||||
type repo_pytuple = {url: pyobject; headers: pyobject}
|
||||
module ForgejoUrl = struct
|
||||
type t = {url: string; page: int}
|
||||
let from_id repo_id =
|
||||
{ url = [%string "https://salsa.lezzo.org/api/v1/repos/%{repo_id}/issues?state=open&type=issues&limit=50"];
|
||||
page = 0 }
|
||||
let next_page t = {t with page=t.page+1}
|
||||
let to_string t = [%string "%{t.url}&page=%{t.page#Int}"]
|
||||
end
|
||||
|
||||
type repo_pytuple = {url: ForgejoUrl.t; headers: pyobject}
|
||||
type http_actor = {requests: pyobject; repos: repo_pytuple StringMap.t}
|
||||
|
||||
let make_headers base64_password =
|
||||
|
@ -45,25 +50,56 @@ let init (repos: Config.repo_data list) =
|
|||
let urls =
|
||||
repos
|
||||
|> List.map (fun {forgejo_id; base64_password; matrix_room} ->
|
||||
(matrix_room, {url=forgejo_url forgejo_id |> Py.String.of_string; headers=make_headers base64_password}))
|
||||
(matrix_room, {url=ForgejoUrl.from_id forgejo_id; headers=make_headers base64_password}))
|
||||
|> StringMap.of_list
|
||||
in
|
||||
{requests=requests; repos=urls}
|
||||
|
||||
let pyprint () =
|
||||
let builtins = Py.Eval.get_builtins () in
|
||||
let p = Py.Dict.find_string builtins "print" in
|
||||
Py.Callable.to_function p
|
||||
|
||||
let extract_pagination resp =
|
||||
let key = Py.String.of_string "X-Total-Count" in
|
||||
resp
|
||||
.@$("headers")
|
||||
.&("get") [|key|] (* not really a pydict, so need to use `get` *)
|
||||
|> Py.Object.to_string
|
||||
|> int_of_string
|
||||
|
||||
let make_get_request {requests; repos} =
|
||||
let get = Py.Module.get_function_with_keywords requests "get" in
|
||||
let fold_fn = (fun m_room_string {url; headers} acc ->
|
||||
let resp = get [|url|] [("headers", headers)] in
|
||||
|
||||
|
||||
let rec fold_fn (accum: (Datatypes.reminder list)) = function
|
||||
| [] -> Ok accum
|
||||
| (m_room_string, {url; headers})::rest ->
|
||||
let pyurl = url |> ForgejoUrl.to_string |> Py.String.of_string in
|
||||
let resp = get [|pyurl|] [("headers", headers)] in
|
||||
let jsontext =
|
||||
resp.@$("text")
|
||||
|> Py.String.to_string
|
||||
in
|
||||
let _ = resp.@$("headers") |> Py.Dict.to_bindings_string |> List.iter (fun e -> e |> Batteries.dump |> print_endline)
|
||||
in
|
||||
let _ = print_endline "linea 62" in
|
||||
let target_items_total = extract_pagination resp in
|
||||
let _ = print_endline (url |> ForgejoUrl.to_string) in
|
||||
let _ = print_endline "linea 64" in
|
||||
let matrix_room = Datatypes.MatrixRoom.make m_room_string in
|
||||
let value =
|
||||
issues_of_json matrix_room jsontext
|
||||
|> Result.map Datatypes.forgejo_issues
|
||||
|> Result.map_error Datatypes.forgejo_error (*TODO: maybe not really a forgejo error, more like internal *)
|
||||
in value::acc)
|
||||
in StringMap.fold fold_fn repos []
|
||||
let issues = issues_of_json matrix_room jsontext in
|
||||
match issues with
|
||||
| Error err ->
|
||||
Error err
|
||||
| Ok target_issues ->
|
||||
let n_received = List.length target_issues in
|
||||
let accum' = target_issues@accum in
|
||||
let urls =
|
||||
if n_received <> target_items_total then
|
||||
(m_room_string, {url=ForgejoUrl.next_page url; headers=headers})::rest
|
||||
else
|
||||
rest
|
||||
in fold_fn accum' urls
|
||||
|
||||
in
|
||||
StringMap.to_list repos
|
||||
|> fold_fn []
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
open Riot
|
||||
open Datatypes
|
||||
open Batteries
|
||||
open Utils
|
||||
|
@ -6,6 +5,22 @@ open Utils
|
|||
let _MQ_CLIENT = "mq_client"
|
||||
let _HTTP_CLIENT = "http_client"
|
||||
|
||||
|
||||
let generate_alert = function
|
||||
(*
|
||||
* if the deadline has passed: alert every day at noon
|
||||
* else parse the alert
|
||||
**)
|
||||
| {due_date=None; _} -> None
|
||||
| {due_date=Some date; body; _} ->
|
||||
let now = Ptime_clock.now () in
|
||||
let today = now |> Ptime.to_date in
|
||||
Ptime.of_rfc3339 date
|
||||
|> Ptime.rfc3339_string_error
|
||||
|> Result.map (fun (timestamp, _, _) -> Ptime.to_date timestamp = today)
|
||||
|> Option.some
|
||||
|
||||
|
||||
let http_client (repos: Config.repo_data list) =
|
||||
let http_actor = Httpclient.init repos in
|
||||
let _ = print_endline "Initialized http client" in
|
||||
|
@ -15,23 +30,27 @@ let http_client (repos: Config.repo_data list) =
|
|||
match Riot.receive () with
|
||||
| ListIssues ->
|
||||
Httpclient.make_get_request http_actor
|
||||
|> List.map (Result.fold ~ok:identity ~error:identity)
|
||||
|> List.iter (Riot.send_by_name ~name:_MQ_CLIENT)
|
||||
|> Result.map Datatypes.forgejo_issues
|
||||
|> Result.map_error Datatypes.forgejo_error
|
||||
|> Result.fold ~ok:identity ~error:identity
|
||||
|> Riot.send_by_name ~name:_MQ_CLIENT
|
||||
|
||||
|
||||
| m -> unhandled m
|
||||
in
|
||||
loop ()
|
||||
in loop ()
|
||||
|
||||
let mq_client (mq_url, mq_user, mq_password) consumer =
|
||||
let mq_client (mq_url, mq_user, mq_password) =
|
||||
let pprint rem =
|
||||
[%string "%{rem.title}|%{rem.matrix_target}"]
|
||||
in
|
||||
|
||||
let call_consumer pid { Amqp_client_lwt.Message.message = (_content, body); _ } =
|
||||
Riot.send pid (Datatypes.ReceivedFromMq ("GOT RABBITs: "^body)) (*TODO: log*)
|
||||
let call_consumer { Amqp_client_lwt.Message.message = (_content, body); _ } =
|
||||
Pamlog.error [%string "Received msg from rabbitmq: %{body}. PAM will ignore."]
|
||||
in
|
||||
|
||||
let%lwt mq = Mq.init (mq_url, mq_user, mq_password) consumer call_consumer in
|
||||
let%lwt mq = Mq.init (mq_url, mq_user, mq_password) call_consumer in
|
||||
|
||||
let rec loop () =
|
||||
let%lwt _ = Lwt_unix.sleep 1.0 in
|
||||
|
@ -39,8 +58,7 @@ let mq_client (mq_url, mq_user, mq_password) consumer =
|
|||
try%lwt
|
||||
match Riot.receive ~after:one_second () with
|
||||
| ForgejoError err ->
|
||||
let _ = print_endline [%string "Got error from Forgejo: %{err}"] in
|
||||
(* Lwt.return (Mq.mq_publish mq err ) *)
|
||||
let _ = Pamlog.error [%string "Got error from Forgejo: %{err}"] in
|
||||
Mq.mq_publish mq err
|
||||
| ForgejoIssues reminders ->
|
||||
let _ = [%string "Got reminders: %{Batteries.dump reminders}"] |> print_endline in
|
||||
|
@ -54,8 +72,9 @@ let mq_client (mq_url, mq_user, mq_password) consumer =
|
|||
loop ()
|
||||
|
||||
let main (config: Config.config) =
|
||||
let open Riot in
|
||||
let http_client_pid = spawn (fun () -> http_client config.repos) in
|
||||
let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (config.mq_url, config.mq_user, config.mq_password) http_client_pid)) in
|
||||
let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (config.mq_url, config.mq_user, config.mq_password))) in
|
||||
let _ = Riot.register _HTTP_CLIENT http_client_pid in
|
||||
let _ = Riot.register _MQ_CLIENT mq_client_pid
|
||||
in
|
||||
|
|
|
@ -7,16 +7,16 @@ let no_routing_key = `Queue ""
|
|||
|
||||
type rabbit_actor = {queue: Queue.t; channel: Channel.no_confirm Channel.t; connection: Connection.t}
|
||||
|
||||
let init (mq_url, mq_user, mq_password) riot_consumer consume_fn =
|
||||
let init (mq_url, mq_user, mq_password) consume_fn =
|
||||
(* Init rabbitmq client *)
|
||||
let%lwt connection = Amqp.Connection.connect ~credentials:(mq_user, mq_password) ~id:"pam" mq_url in
|
||||
let%lwt channel = Amqp.Connection.open_channel ~id:"pam" Amqp.Channel.no_confirm connection in
|
||||
let%lwt queue = Amqp.Queue.declare channel "pam" in
|
||||
let _ = print_endline "Initialized rabbitmq client" in
|
||||
let _ = Pamlog.info "Initialized rabbitmq client" in
|
||||
|
||||
(* Establish consumer *)
|
||||
let%lwt (_ampq_consumer, reader) = Amqp.Queue.consume ~no_ack:true ~id:"" channel queue in
|
||||
let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:(consume_fn riot_consumer)) in
|
||||
let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:consume_fn) in
|
||||
Lwt.return {queue=queue; channel=channel; connection=connection}
|
||||
|
||||
let shutdown {queue=_queue; channel; connection} =
|
||||
|
@ -25,10 +25,11 @@ let shutdown {queue=_queue; channel; connection} =
|
|||
let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in
|
||||
Lwt.return_unit
|
||||
|
||||
let mq_publish {queue=_; channel; connection=_} msg =
|
||||
let%lwt _ =
|
||||
Message.make msg
|
||||
|> Exchange.publish channel Exchange.default ~routing_key:"lanonna"
|
||||
let mq_publish {queue=_; channel=_; connection=_} msg =
|
||||
let _ = Pamlog.info [%string "GOT MSG FOR MQ:: %{msg}"]
|
||||
(* let%lwt _ = *)
|
||||
(* Message.make msg *)
|
||||
(* |> Exchange.publish channel Exchange.default ~routing_key:"lanonna" *)
|
||||
in Lwt.return_unit
|
||||
|
||||
let rec mq_publish_all mq = function
|
||||
|
|
Loading…
Reference in a new issue