This commit is contained in:
Benedetta 2024-03-25 14:17:34 +01:00
parent 1d96f80ee6
commit 07c60e79bf
7 changed files with 61 additions and 62 deletions

View file

@ -2,8 +2,7 @@ open Yojson.Safe
let url = "https://bugs.lezzo.org/api/v1/repos/bparodi/Documenti/issues?state=open&type=issues"
let headers =
let base64password = Datatypes.forgejo in
let headers base64password =
[("accept", "application/json");
("authorization", [%string "Basic %{base64password}"])]

View file

@ -1,3 +1,8 @@
let configuration () =
let conf = Otoml.Parser.from_file "/etc/lanonna.toml" in
Otoml.find conf Otoml.get_string ["lanonna"; "mq_url"]
let mq_url = Otoml.find conf Otoml.get_string ["pam"; "mq_url"] in
let mq_user = Otoml.find conf Otoml.get_string ["pam"; "mq_user"] in
let mq_password = Otoml.find conf Otoml.get_string ["pam"; "mq_password"] in
let base64_creds = Otoml.find conf Otoml.get_string ["pam"; "forgejo_base64"] in
(* config_file {mq_url = mq_url; mq_user = mq_user; mq_password = mq_password; base64_creds = base64_creds} *)
(mq_url, mq_user, mq_password, base64_creds)

View file

@ -1,19 +1,19 @@
type reminder = { (* from an issue in forgejo get a reminder *)
url: string;
title: string;
due_date: string ;
}
type client_id = | MqClient | HttpClient
type Riot.Message.t +=
| RegisterClient of (client_id * Riot.Pid.t)
| LookupClient of client_id
| ReceivedFromMq of string
| WebReq
| ListIssues
| ForgejoIssues of reminder list
| ForgejoError of string
| InternalFailure of string
let forgejo_issues lst = ForgejoIssues lst
let forgejo_error reason = ForgejoError reason
let of_reminder r =
[%string "%{r.title}"]

View file

@ -8,12 +8,12 @@ let reminder_of_issue : Api.issue -> Datatypes.reminder option = function
| {due_date=None; _} -> None
| {url=url; title=title; due_date=Some due_date; body=_} -> Some {url=url; title=title; due_date=due_date}
let init () =
let init base64password =
let _ = Py.initialize () in
let requests = Py.import "requests" in
let url = Py.String.of_string Api.url in
let headers =
Api.headers
Api.headers base64password
|> List.map (fun (k, v) -> (k, Py.String.of_string v))
|> Py.Dict.of_bindings_string in

View file

@ -1,77 +1,75 @@
open Riot
open Datatypes
open Batteries
open Util
open Utils
let http mq_pid =
let http_actor = Httpclient.init () in
let _MQ_CLIENT = "mq_client"
let _HTTP_CLIENT = "http_client"
let http_client base64password =
let http_actor = Httpclient.init base64password in
let _ = print_endline "Initialized http client" in
let rec loop () =
let _ =
match Riot.receive () with
| WebReq ->
let _ = print_endline "Got webreq" in
| ListIssues ->
let reminders = Httpclient.make_get_request http_actor in
let msg =
reminders
|> Result.map (fun f -> Batteries.dump f |> print_endline; f)
|> result_unpack
let msg = result_unpack reminders
in
Riot.send mq_pid msg
| _ -> failwith "Unknown msg"
Riot.send_by_name ~name:_MQ_CLIENT msg
| m -> unhandled m
in
loop ()
in loop ()
let rabbit consumer =
let%lwt mq = Mq.init consumer in
let mq_client (mq_url, mq_user, mq_password) consumer =
let pprint rem =
[%string "%{r.title}"]
in
let call_consumer pid { Amqp_client_lwt.Message.message = (_content, body); _ } =
Riot.send pid (Datatypes.ReceivedFromMq ("GOT RABBITs: "^body)) (*TODO: log*)
in
let%lwt mq = Mq.init (mq_url, mq_user, mq_password) consumer call_consumer in
let rec loop () =
let%lwt _ = Lwt_unix.sleep 1.0 in
let _ =
try%lwt
match Riot.receive ~after:1000L () with (* TODO: Somehow the 1000L doesn't work. Report it.*)
match Riot.receive ~after:one_second () with
| ForgejoError err ->
let _ = print_endline [%string "Got error from Forgejo: %{err}"] in
Lwt.return (Mq.mq_publish mq err )
(* Lwt.return (Mq.mq_publish mq err ) *)
Mq.mq_publish mq err
| ForgejoIssues reminders ->
let _ = [%string "Got reminders: %{Batteries.dump reminders}"] |> print_endline in
let rems = List.map of_reminder reminders in
Lwt.return (Mq.mq_publish_all mq rems)
| _ ->
failwith "Unhandled msg"
with | Riot.Receive_timeout -> failwith "dunno"
let rems = List.map pprint reminders in
Mq.mq_publish_all mq rems
| m -> unhandled m
with | Riot.Receive_timeout -> Lwt.return_unit
in
loop ()
in
loop ()
let loop http_pid =
let main (mq_url, mq_user, mq_password, base64_creds) =
let http_client_pid = spawn (fun () -> http_client base64_creds) in
let mq_client_pid = spawn (fun () -> Lwt_main.run (mq_client (mq_url, mq_user, mq_password) http_client_pid)) in
let _ = Riot.register _HTTP_CLIENT http_client_pid in
let _ = Riot.register _MQ_CLIENT mq_client_pid
in
let timeout = 6.0 in
let rec loop_ () =
let _ = send http_pid WebReq in
let _ = sleep 60.0 in
loop_ ()
let _ = send http_client_pid ListIssues in
sleep timeout |> loop_
in
loop_()
let main () =
let _own = self () in
let http = spawn (fun () -> http _own) in (*TODO: nope*)
let _mq = spawn (fun () -> Lwt_main.run (rabbit http)) in
sleep 2.0
;
send _mq (Datatypes.ForgejoIssues []);
let _ = spawn (fun () -> loop http) in
let _ = print_endline "Now looping"
in
let rec forever () = (* TODO: is there something better? Like monitoring processes ?*)
sleep 22.2; forever ()
in forever ()
sleep timeout |> loop_
let () =
Config.configuration () |> print_endline;
Riot.run main
let (mq_url, mq_user, mq_password, base64_creds) = Config.configuration () in
Riot.run (fun () -> main (mq_url, mq_user, mq_password, base64_creds))

View file

@ -1,8 +1,5 @@
open Amqp_client_lwt
let handler pid { Message.message = (_content, body); _ } =
Riot.send pid (Datatypes.ReceivedFromMq ("GOT RABBITs: "^body)) (*TODO: log*)
let handle_consumer_exn _exn =
Lwt.return_unit (* TODO *)
@ -10,20 +7,16 @@ let no_routing_key = `Queue ""
type rabbit_actor = {queue: Queue.t; channel: Channel.no_confirm Channel.t; connection: Connection.t}
let init riot_consumer =
let init (mq_url, mq_user, mq_password) riot_consumer consume_fn =
(* Init rabbitmq client *)
let%lwt connection = Amqp.Connection.connect ~credentials:Datatypes.CREDS ~id:"Pam" MQURL in (*TODO: Creds*)
let%lwt channel = Amqp.Connection.open_channel ~id:"Pam" Amqp.Channel.no_confirm connection in
let%lwt exchange = Amqp.Exchange.declare channel Exchange.direct_t "pam" in
(* let%lwt lanonna_exchange = Amqp.Exchange.declare channel Exchange.direct_t "lanonna" in *)
let%lwt connection = Amqp.Connection.connect ~credentials:(mq_user, mq_password) ~id:"pam" mq_url in
let%lwt channel = Amqp.Connection.open_channel ~id:"pam" Amqp.Channel.no_confirm connection in
let%lwt queue = Amqp.Queue.declare channel "pam" in
let _ = Amqp.Queue.bind channel queue exchange no_routing_key in
(* Amqp.Queue.publish channel queue (Amqp.Message.make "My Message Payload") >>= function `Ok -> *)
let _ = print_endline "Initialized rabbitmq client" in
(* Establish consumer *)
let%lwt (_ampq_consumer, reader) = Amqp.Queue.consume ~no_ack:true ~id:"" channel queue in
let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:(handler riot_consumer)) in
let _ = Thread.spawn ~exn_handler:handle_consumer_exn (Thread.Pipe.iter_without_pushback reader ~f:(consume_fn riot_consumer)) in
Lwt.return {queue=queue; channel=channel; connection=connection}
let shutdown {queue=_queue; channel; connection} =

View file

@ -14,3 +14,7 @@ let now () =
year month day hour minute second
let result_unpack = function | Ok o -> o | Error e -> e
let unhandled m = [%string "Runtime failure: unhandled %{Batteries.dump m}"] |> print_endline; exit 2
let one_second = 1_000_000L