burrodiarachidi
This commit is contained in:
parent
68b01dd287
commit
275c985e48
3 changed files with 34 additions and 32 deletions
|
@ -67,7 +67,7 @@ let extract_pagination resp =
|
||||||
.@$("headers")
|
.@$("headers")
|
||||||
.&("get") [|key|] (* not really a pydict, so need to use `get` *)
|
.&("get") [|key|] (* not really a pydict, so need to use `get` *)
|
||||||
|> Py.Object.to_string
|
|> Py.Object.to_string
|
||||||
|> int_of_string
|
|> int_of_string_opt
|
||||||
|
|
||||||
let make_get_request {requests; repos} =
|
let make_get_request {requests; repos} =
|
||||||
let get = Py.Module.get_function_with_keywords requests "get" in
|
let get = Py.Module.get_function_with_keywords requests "get" in
|
||||||
|
@ -82,16 +82,14 @@ let make_get_request {requests; repos} =
|
||||||
resp.@$("text")
|
resp.@$("text")
|
||||||
|> Py.String.to_string
|
|> Py.String.to_string
|
||||||
in
|
in
|
||||||
let _ = print_endline "linea 62" in
|
|
||||||
let target_items_total = extract_pagination resp in
|
let target_items_total = extract_pagination resp in
|
||||||
let _ = print_endline (url |> ForgejoUrl.to_string) in
|
|
||||||
let _ = print_endline "linea 64" in
|
|
||||||
let matrix_room = Datatypes.MatrixRoom.make m_room_string in
|
let matrix_room = Datatypes.MatrixRoom.make m_room_string in
|
||||||
let issues = issues_of_json matrix_room jsontext in
|
let issues = issues_of_json matrix_room jsontext in
|
||||||
match issues with
|
match target_items_total, issues with
|
||||||
| Error err ->
|
| None, _ -> Error "Can't extract pagination"
|
||||||
|
| _, Error err ->
|
||||||
Error err
|
Error err
|
||||||
| Ok target_issues ->
|
| Some target_items_total, Ok target_issues ->
|
||||||
let n_received = List.length target_issues in
|
let n_received = List.length target_issues in
|
||||||
let accum' = target_issues@accum in
|
let accum' = target_issues@accum in
|
||||||
let urls =
|
let urls =
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
open Pam.Datatypes
|
open Pam.Datatypes
|
||||||
open Pam.Issue_parser
|
open Pam
|
||||||
open Batteries
|
open Batteries
|
||||||
open Utils
|
open Utils
|
||||||
|
|
||||||
|
@ -13,34 +13,41 @@ let _HTTP_CLIENT = "http_client"
|
||||||
let http_client (repos: Config.repo_data list) =
|
let http_client (repos: Config.repo_data list) =
|
||||||
let http_actor = Httpclient.init repos in
|
let http_actor = Httpclient.init repos in
|
||||||
|
|
||||||
let now = Ptime_clock.now () in
|
let _ = Pamlog.info "Initialized http client" in
|
||||||
|
|
||||||
let _ = print_endline "Initialized http client" in
|
|
||||||
|
|
||||||
|
let check_alert_time now issues =
|
||||||
|
let rec aux acc = function
|
||||||
|
| [] -> Ok acc
|
||||||
|
| issue :: rest ->
|
||||||
|
match Issuelib.to_datetime issue with
|
||||||
|
| Error e -> Error e
|
||||||
|
| Ok None -> aux acc rest
|
||||||
|
| Ok (Some alert_times) ->
|
||||||
|
let acc' =
|
||||||
|
if Issuelib.should_alert now alert_times then
|
||||||
|
issue::acc
|
||||||
|
else
|
||||||
|
acc
|
||||||
|
in aux acc' rest
|
||||||
|
in aux [] issues
|
||||||
in
|
in
|
||||||
let rec loop () =
|
let rec loop () =
|
||||||
let _ =
|
let _ =
|
||||||
match Riot.receive_any () with
|
match Riot.receive_any () with
|
||||||
| ListIssues ->
|
| ListIssues ->
|
||||||
|
let now = Ptime_clock.now () in
|
||||||
let issues = Httpclient.make_get_request http_actor
|
let issues = Httpclient.make_get_request http_actor
|
||||||
in
|
in
|
||||||
(* Result.bind issues pair_to_alert_time *) TODO: usa Pam.Issue_parser.should_alert
|
Result.bind issues (check_alert_time now)
|
||||||
|> Result.map (List.filter_map filter_issue_by_time)
|
|
||||||
|> Result.map_error internal_failure
|
|> Result.map_error internal_failure
|
||||||
|> Result.fold ~ok:forgejo_issues ~error:List.singleton
|
|> Result.fold ~ok:forgejo_issues ~error:List.singleton
|
||||||
|> List.map (Riot.send_by_name ~name:_MQ_CLIENT)
|
|> List.map (Riot.send_by_name ~name:_MQ_CLIENT)
|
||||||
|
|
||||||
|
|
||||||
| m -> unhandled m
|
| m -> unhandled m
|
||||||
in
|
in
|
||||||
loop ()
|
loop ()
|
||||||
in loop ()
|
in loop ()
|
||||||
|
|
||||||
let mq_client (mq_url, mq_user, mq_password) =
|
let mq_client (mq_url, mq_user, mq_password) =
|
||||||
let pprint rem =
|
|
||||||
[%string "%{rem.title}|%{rem.matrix_target}"]
|
|
||||||
in
|
|
||||||
|
|
||||||
let call_consumer { Amqp_client_lwt.Message.message = (_content, body); _ } =
|
let call_consumer { Amqp_client_lwt.Message.message = (_content, body); _ } =
|
||||||
Pamlog.error [%string "Received msg from rabbitmq: %{body}. PAM will ignore."]
|
Pamlog.error [%string "Received msg from rabbitmq: %{body}. PAM will ignore."]
|
||||||
in
|
in
|
||||||
|
@ -56,9 +63,8 @@ let mq_client (mq_url, mq_user, mq_password) =
|
||||||
let _ = Pamlog.error [%string "Got error from Forgejo: %{err}"] in
|
let _ = Pamlog.error [%string "Got error from Forgejo: %{err}"] in
|
||||||
Mq.mq_publish mq err
|
Mq.mq_publish mq err
|
||||||
| Reminder reminder ->
|
| Reminder reminder ->
|
||||||
let _ = [%string "Got reminders: %{Batteries.dump reminder}"] |> print_endline in
|
let formatted = Issuelib.issue_data_to_json reminder in
|
||||||
let rems = pprint reminder in
|
Mq.mq_publish mq formatted
|
||||||
Mq.mq_publish mq rems
|
|
||||||
| m -> unhandled m
|
| m -> unhandled m
|
||||||
with | Riot.Receive_timeout -> Lwt.return_unit
|
with | Riot.Receive_timeout -> Lwt.return_unit
|
||||||
in
|
in
|
||||||
|
@ -73,7 +79,7 @@ let main (config: Config.config) =
|
||||||
let _ = Riot.register _HTTP_CLIENT http_client_pid in
|
let _ = Riot.register _HTTP_CLIENT http_client_pid in
|
||||||
let _ = Riot.register _MQ_CLIENT mq_client_pid
|
let _ = Riot.register _MQ_CLIENT mq_client_pid
|
||||||
in
|
in
|
||||||
let timeout = 6.0 in
|
let timeout = 3.0 in
|
||||||
let rec loop_ () =
|
let rec loop_ () =
|
||||||
let _ = send http_client_pid ListIssues in
|
let _ = send http_client_pid ListIssues in
|
||||||
sleep timeout |> loop_
|
sleep timeout |> loop_
|
||||||
|
@ -83,5 +89,6 @@ let main (config: Config.config) =
|
||||||
|
|
||||||
|
|
||||||
let () =
|
let () =
|
||||||
|
if true then failwith "Testa che manda gli errori su matrix" else 2 |> ignore in
|
||||||
let config = Config.configuration () |> Result.fold ~error:exit2 ~ok:identity in
|
let config = Config.configuration () |> Result.fold ~error:exit2 ~ok:identity in
|
||||||
Riot.run (fun () -> main config)
|
Riot.run (fun () -> main config)
|
||||||
|
|
|
@ -25,13 +25,10 @@ let shutdown {queue=_queue; channel; connection} =
|
||||||
let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in
|
let%lwt _ = Lwt.catch (fun () -> Amqp.Connection.close connection) ignore_exn in
|
||||||
Lwt.return_unit
|
Lwt.return_unit
|
||||||
|
|
||||||
let mq_publish {queue=_; channel=_; connection=_} msg =
|
|
||||||
let _ = Pamlog.info [%string "GOT MSG FOR MQ:: %{msg}"]
|
|
||||||
(* let%lwt _ = *)
|
|
||||||
(* Message.make msg *)
|
|
||||||
(* |> Exchange.publish channel Exchange.default ~routing_key:"lanonna" *)
|
|
||||||
in Lwt.return_unit
|
|
||||||
|
|
||||||
let rec mq_publish_all mq = function
|
let mq_publish {queue=_; channel=channel; connection=_} msg =
|
||||||
| [] -> Lwt.return_unit
|
let _ = Pamlog.info [%string "GOT MSG FOR MQ:: %{msg}"] in
|
||||||
| r::rest -> let%lwt _ = mq_publish mq r in mq_publish_all mq rest
|
let%lwt _ =
|
||||||
|
Message.make msg
|
||||||
|
|> Exchange.publish channel Exchange.default ~routing_key:"lanonna"
|
||||||
|
in Lwt.return_unit
|
||||||
|
|
Loading…
Reference in a new issue