burrodiarachidi
This commit is contained in:
parent
a5cccaca27
commit
dfb49cbf50
4 changed files with 87 additions and 113 deletions
169
src/Cron.fs
169
src/Cron.fs
|
@ -8,15 +8,13 @@ open Pentole
|
|||
open Datatypes
|
||||
|
||||
open Pentole.String
|
||||
open Pentole.Path
|
||||
|
||||
type User = string
|
||||
|
||||
let rnd = new System.Random 2
|
||||
let local_tz = DateTimeZoneProviders.Tzdb.GetSystemDefault ()
|
||||
let local_tz_net = System.TimeZoneInfo.Local
|
||||
|
||||
let private rand_int incl_ excl_ = rnd.NextInt64 (incl_, excl_)
|
||||
|
||||
let private parse_expr (now: Instant) text =
|
||||
let to_cron text =
|
||||
let mutable c: CronExpression = Unchecked.defaultof<CronExpression>
|
||||
|
@ -27,7 +25,6 @@ let private parse_expr (now: Instant) text =
|
|||
let to_pattern text =
|
||||
match Pentole.String.split " " text |> List.head with
|
||||
| Prefix "@after" job -> After job |> Ok
|
||||
| Prefix "@before" job -> Before job |> Ok
|
||||
| _ -> Error $"Can't parse as pattern: {text}"
|
||||
|
||||
match to_cron text, to_pattern text with
|
||||
|
@ -40,122 +37,74 @@ let private parse_expr (now: Instant) text =
|
|||
|> Result.bind (fun i ->
|
||||
if not i.HasValue then Error $"invalid cron expression: {text}"
|
||||
else Instant.FromDateTimeOffset i.Value |> Cron |> Ok)
|
||||
(*
|
||||
let private schedule (pt: PatternType) (now: Instant) (done_at: Instant option) =
|
||||
let now' = now.InZone local_tz
|
||||
if done_at.IsNone then
|
||||
let last_possible_moment =
|
||||
match pt with
|
||||
| After _ | Before _ ->
|
||||
"CronJobs with patterns should be in a different partition"
|
||||
|> System.InvalidOperationException |> raise
|
||||
| Hourly ->
|
||||
now'.Minute + (60 - now'.Minute) |> int64 |> Duration.FromMinutes
|
||||
| Daily ->
|
||||
now'.Hour + (24 - now'.Hour) |> Duration.FromHours
|
||||
| Weekly ->
|
||||
let sunday = now'.LocalDateTime.With(DateAdjusters.NextOrSame(IsoDayOfWeek.Sunday))
|
||||
sunday
|
||||
| Monthly ->
|
||||
let end_of_the_month = now'.LocalDateTime.With DateAdjusters.EndOfMonth
|
||||
end_of_the_month
|
||||
let ts = dur.TotalSeconds |> int
|
||||
rand_int 0 ts
|
||||
|
||||
else failwith ""
|
||||
|
||||
|
||||
let make (now: Instant) (env: Database.EnvVarsEntry list) (src: Database.CronTableEntry) =
|
||||
let resolve_ path =
|
||||
Path.of_string path
|
||||
|> Result.bind (FileSystem.resolve)
|
||||
|> Result.mapError (fun e -> $"{e} for path {path}")
|
||||
let private parse now (db: Database.Requirements) =
|
||||
let when_ = parse_expr now db.``when``
|
||||
let env =
|
||||
db.environment
|
||||
|> String.split "|"
|
||||
|> ResultList.collect (fun env_item ->
|
||||
let splitted = env_item |> String.split "="
|
||||
if splitted.Length <> 2 then
|
||||
Error $"Invalid format for env variables: {env_item}"
|
||||
else
|
||||
Ok (splitted.[0], splitted.[1]))
|
||||
|
||||
let get_env (job_name: string) =
|
||||
env
|
||||
|> List.filter (fun e -> e.job_name = job_name)
|
||||
|> List.map (fun e -> (e.value, e.variable))
|
||||
let executable = Path.of_string db.executable |> Result.bind FileSystem.resolve
|
||||
let workdir = Path.of_string db.workdir |> Result.bind FileSystem.resolve
|
||||
|
||||
let last_completed =
|
||||
db.done_at
|
||||
|> Option.map Instant.FromDateTimeUtc
|
||||
|> Option.defaultValue Instant.MinValue
|
||||
|
||||
(src.executable, src.workdir)
|
||||
|> Result.pairwise_map resolve_
|
||||
|> Result.bind (fun (what, where) ->
|
||||
parse_expr now src.``when`` |> Result.map (fun when_ -> (what, where, when_)))
|
||||
|> Result.map (fun (what, where, when_) -> {
|
||||
job_name = src.job_name
|
||||
user = src.user
|
||||
Result.zip when_ env
|
||||
|> Result.zip (Result.zip executable workdir)
|
||||
|> Result.map (fun ((executable, workdir), (when_, env)) ->
|
||||
{ job_name = db.job_name
|
||||
user = db.user
|
||||
when_ = when_
|
||||
executable = what
|
||||
args = src.args |> List.ofArray
|
||||
environment = get_env src.job_name
|
||||
workdir = where
|
||||
hostname = src.hostname })
|
||||
|> Result.mapError (fun e -> (src.job_name, e))
|
||||
executable = executable
|
||||
args = db.args |> List.ofArray
|
||||
environment = List.ofSeq env
|
||||
workdir = workdir
|
||||
hostname = db.hostname
|
||||
last_completed_at = last_completed })
|
||||
|
||||
type JobKey = {
|
||||
j: string; h: string
|
||||
}
|
||||
let sort_cron_jobs (now: Instant) (db_crons: Database.Requirements seq) =
|
||||
|
||||
|
||||
let build_sorted_jobs_table (logger: Serilog.ILogger) (hostname: string) (now: Instant)
|
||||
(requirements: Database.Requirements) =
|
||||
let cronjobs =
|
||||
requirements.cron
|
||||
|> List.map (Cron.make now requirements.environment)
|
||||
|> List.choose (function
|
||||
| Ok cj when cj.hostname <> hostname -> None
|
||||
| Ok cronjob -> Some cronjob
|
||||
| Error (j, e) ->
|
||||
logger.Warning $"Invalid job definition {j}, reason: {e}"
|
||||
None)
|
||||
let all_jobs =
|
||||
db_crons
|
||||
|> ResultList.collect (parse now)
|
||||
|
||||
let has_deps = function
|
||||
| {when_=Pattern (After _ | Before _)} -> true
|
||||
| {when_=Cron _} -> false
|
||||
| {when_=Pattern (Hourly | Daily | Weekly | Monthly)} -> false
|
||||
let build_deps (lst: CronJob list) =
|
||||
let standalone, deps =
|
||||
List.partition (function | {when_=Cron _} -> true | _ -> false) lst
|
||||
|
||||
let resolve_ts = function
|
||||
| {when_=Pattern (After _ | Before _)} ->
|
||||
"CronJobs with patterns should be in a different partition"
|
||||
|> System.InvalidOperationException |> raise
|
||||
|
||||
| {when_=Cron x} as cj -> (x, cj)
|
||||
| {when_=Pattern pt} as cj ->
|
||||
cj.job_name
|
||||
|> requirements.backlog.TryFind
|
||||
|> schedule pt now
|
||||
|
||||
|
||||
|
||||
|
||||
let standalone, with_deps =
|
||||
cronjobs
|
||||
|> List.partition has_deps
|
||||
|
||||
let standalone =
|
||||
let index =
|
||||
standalone
|
||||
|> List.map resolve_ts
|
||||
|
||||
let children =
|
||||
with_deps
|
||||
|> List.choose (function
|
||||
| {when_=Pattern (Before _child)} as parent -> Some parent
|
||||
| _ -> None)
|
||||
|> List.groupBy (function | {when_=Pattern (After child)} -> child)
|
||||
|> Map.ofList
|
||||
|
||||
let parents =
|
||||
with_deps
|
||||
|> List.choose (function
|
||||
| {when_=Pattern (After _parent)} as child -> Some child
|
||||
| _ -> None)
|
||||
|> List.groupBy (function | {when_=Pattern (After parent)} -> parent)
|
||||
|> Map.ofList
|
||||
(*
|
||||
let rec sort acc : CronJob list -> CronJob list = function
|
||||
|> Seq.map (fun x -> ({j=x.job_name; h=x.hostname}, x))
|
||||
|> Map.ofSeq
|
||||
in index, deps
|
||||
in
|
||||
let rec build_job_table acc (index: Map<JobKey, CronJob>) = function
|
||||
| [] -> acc
|
||||
| x::rest ->
|
||||
let parents_of_x = parents |> Map.tryFind x.hostname |> Option.defaultValue []
|
||||
let children_of_x = children |> Map.tryFind x.hostname |> Option.defaultValue []
|
||||
let current = parents_of_x @ [x] @ children_of_x
|
||||
| {when_=Pattern (After jb)} as x::xs ->
|
||||
let father = {j=jb; h=x.hostname}
|
||||
let previous =
|
||||
match Map.tryFind father acc with
|
||||
// TODO: error handling
|
||||
| None -> Map.find father index |> List.singleton
|
||||
| Some p -> p
|
||||
let acc' = Map.add father (x::previous) acc
|
||||
build_job_table acc' index xs
|
||||
| {when_=Cron _}::_ -> failwith "TODO" // TODO
|
||||
|
||||
let acc' = System.Diagnostics.Trace.Assert
|
||||
*)
|
||||
parents
|
||||
*)
|
||||
all_jobs
|
||||
|> Result.map build_deps
|
||||
|> Result.map (fun (standalone, deps) -> build_job_table Map.empty standalone deps)
|
||||
|
|
|
@ -190,3 +190,10 @@ type BacklogDefaultTS () =
|
|||
override x.Up() =
|
||||
"""ALTER TABLE backlog ALTER COLUMN done_at SET DEFAULT current_timestamp;"""
|
||||
|> x.Execute.Sql |> ignore
|
||||
|
||||
[<Migration(20241024_0002L)>]
|
||||
type BacklogAddStarted () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
x.Alter.Table("backlog").AddColumn("started_at").AsCustom "timestamptz"
|
||||
|> ignore
|
||||
|
|
|
@ -9,7 +9,7 @@ open Pentole.Path
|
|||
type Notification = | Time | Database
|
||||
|
||||
[<GenerateSerializer>]
|
||||
type PatternType = | Hourly | Daily | Weekly | Monthly | After of string | Before of string
|
||||
type PatternType = After of string
|
||||
|
||||
[<GenerateSerializer>]
|
||||
type WhenExpr = | Cron of Instant | Pattern of PatternType
|
||||
|
@ -25,4 +25,5 @@ type CronJob = {
|
|||
environment: (string * string) list
|
||||
workdir: Path
|
||||
hostname: string
|
||||
last_completed_at: Instant
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
namespace Pentole
|
||||
|
||||
module Result =
|
||||
|
||||
let inline protect ([<InlineIfLambda>]f) x =
|
||||
try
|
||||
Ok (f x)
|
||||
|
@ -14,12 +15,28 @@ module Result =
|
|||
|
||||
let of_option = function | Some s -> Ok s | None -> Error ()
|
||||
|
||||
let zip a b =
|
||||
match (a, b) with
|
||||
| Ok a, Ok b -> Ok (a, b)
|
||||
| Error e, _ -> Error e
|
||||
| _, Error e -> Error e
|
||||
|
||||
|
||||
type ToStringWrapper(toString) =
|
||||
override this.ToString() = toString ()
|
||||
override _.ToString() = toString ()
|
||||
|
||||
let Result l = ToStringWrapper(fun _ ->
|
||||
match l with
|
||||
| Ok o -> sprintf "Ok %O" o
|
||||
| _ -> failwith "")
|
||||
|
||||
module ResultList =
|
||||
let collect (lambda: 'a -> Result<'ok, 'err>) (seq_: 'a seq) =
|
||||
let rec iter_ acc seq_ =
|
||||
match Seq.tryHead seq_ with
|
||||
| None -> Ok acc
|
||||
| Some x ->
|
||||
match lambda x with
|
||||
| Error e -> Error e
|
||||
| Ok o -> iter_ (o::acc) (Seq.tail seq_)
|
||||
iter_ [] seq_
|
||||
|
|
Loading…
Reference in a new issue