diff --git a/src/Cron.fs b/src/Cron.fs index fc31318..96528b4 100644 --- a/src/Cron.fs +++ b/src/Cron.fs @@ -8,15 +8,13 @@ open Pentole open Datatypes open Pentole.String +open Pentole.Path type User = string -let rnd = new System.Random 2 let local_tz = DateTimeZoneProviders.Tzdb.GetSystemDefault () let local_tz_net = System.TimeZoneInfo.Local -let private rand_int incl_ excl_ = rnd.NextInt64 (incl_, excl_) - let private parse_expr (now: Instant) text = let to_cron text = let mutable c: CronExpression = Unchecked.defaultof @@ -27,7 +25,6 @@ let private parse_expr (now: Instant) text = let to_pattern text = match Pentole.String.split " " text |> List.head with | Prefix "@after" job -> After job |> Ok - | Prefix "@before" job -> Before job |> Ok | _ -> Error $"Can't parse as pattern: {text}" match to_cron text, to_pattern text with @@ -40,122 +37,74 @@ let private parse_expr (now: Instant) text = |> Result.bind (fun i -> if not i.HasValue then Error $"invalid cron expression: {text}" else Instant.FromDateTimeOffset i.Value |> Cron |> Ok) -(* -let private schedule (pt: PatternType) (now: Instant) (done_at: Instant option) = - let now' = now.InZone local_tz - if done_at.IsNone then - let last_possible_moment = - match pt with - | After _ | Before _ -> - "CronJobs with patterns should be in a different partition" - |> System.InvalidOperationException |> raise - | Hourly -> - now'.Minute + (60 - now'.Minute) |> int64 |> Duration.FromMinutes - | Daily -> - now'.Hour + (24 - now'.Hour) |> Duration.FromHours - | Weekly -> - let sunday = now'.LocalDateTime.With(DateAdjusters.NextOrSame(IsoDayOfWeek.Sunday)) - sunday - | Monthly -> - let end_of_the_month = now'.LocalDateTime.With DateAdjusters.EndOfMonth - end_of_the_month - let ts = dur.TotalSeconds |> int - rand_int 0 ts - - else failwith "" - - -let make (now: Instant) (env: Database.EnvVarsEntry list) (src: Database.CronTableEntry) = - let resolve_ path = - Path.of_string path - |> Result.bind (FileSystem.resolve) - |> Result.mapError (fun e -> $"{e} for path {path}") - - let get_env (job_name: string) = - env - |> List.filter (fun e -> e.job_name = job_name) - |> List.map (fun e -> (e.value, e.variable)) - (src.executable, src.workdir) - |> Result.pairwise_map resolve_ - |> Result.bind (fun (what, where) -> - parse_expr now src.``when`` |> Result.map (fun when_ -> (what, where, when_))) - |> Result.map (fun (what, where, when_) -> { - job_name = src.job_name - user = src.user - when_ = when_ - executable = what - args = src.args |> List.ofArray - environment = get_env src.job_name - workdir = where - hostname = src.hostname }) - |> Result.mapError (fun e -> (src.job_name, e)) +let private parse now (db: Database.Requirements) = + let when_ = parse_expr now db.``when`` + let env = + db.environment + |> String.split "|" + |> ResultList.collect (fun env_item -> + let splitted = env_item |> String.split "=" + if splitted.Length <> 2 then + Error $"Invalid format for env variables: {env_item}" + else + Ok (splitted.[0], splitted.[1])) + let executable = Path.of_string db.executable |> Result.bind FileSystem.resolve + let workdir = Path.of_string db.workdir |> Result.bind FileSystem.resolve -let build_sorted_jobs_table (logger: Serilog.ILogger) (hostname: string) (now: Instant) - (requirements: Database.Requirements) = - let cronjobs = - requirements.cron - |> List.map (Cron.make now requirements.environment) - |> List.choose (function - | Ok cj when cj.hostname <> hostname -> None - | Ok cronjob -> Some cronjob - | Error (j, e) -> - logger.Warning $"Invalid job definition {j}, reason: {e}" - None) + let last_completed = + db.done_at + |> Option.map Instant.FromDateTimeUtc + |> Option.defaultValue Instant.MinValue - let has_deps = function - | {when_=Pattern (After _ | Before _)} -> true - | {when_=Cron _} -> false - | {when_=Pattern (Hourly | Daily | Weekly | Monthly)} -> false - - let resolve_ts = function - | {when_=Pattern (After _ | Before _)} -> - "CronJobs with patterns should be in a different partition" - |> System.InvalidOperationException |> raise - - | {when_=Cron x} as cj -> (x, cj) - | {when_=Pattern pt} as cj -> - cj.job_name - |> requirements.backlog.TryFind - |> schedule pt now - - + Result.zip when_ env + |> Result.zip (Result.zip executable workdir) + |> Result.map (fun ((executable, workdir), (when_, env)) -> + { job_name = db.job_name + user = db.user + when_ = when_ + executable = executable + args = db.args |> List.ofArray + environment = List.ofSeq env + workdir = workdir + hostname = db.hostname + last_completed_at = last_completed }) +type JobKey = { + j: string; h: string +} +let sort_cron_jobs (now: Instant) (db_crons: Database.Requirements seq) = - let standalone, with_deps = - cronjobs - |> List.partition has_deps - let standalone = - standalone - |> List.map resolve_ts + let all_jobs = + db_crons + |> ResultList.collect (parse now) - let children = - with_deps - |> List.choose (function - | {when_=Pattern (Before _child)} as parent -> Some parent - | _ -> None) - |> List.groupBy (function | {when_=Pattern (After child)} -> child) - |> Map.ofList + let build_deps (lst: CronJob list) = + let standalone, deps = + List.partition (function | {when_=Cron _} -> true | _ -> false) lst - let parents = - with_deps - |> List.choose (function - | {when_=Pattern (After _parent)} as child -> Some child - | _ -> None) - |> List.groupBy (function | {when_=Pattern (After parent)} -> parent) - |> Map.ofList -(* - let rec sort acc : CronJob list -> CronJob list = function + let index = + standalone + |> Seq.map (fun x -> ({j=x.job_name; h=x.hostname}, x)) + |> Map.ofSeq + in index, deps + in + let rec build_job_table acc (index: Map) = function | [] -> acc - | x::rest -> - let parents_of_x = parents |> Map.tryFind x.hostname |> Option.defaultValue [] - let children_of_x = children |> Map.tryFind x.hostname |> Option.defaultValue [] - let current = parents_of_x @ [x] @ children_of_x + | {when_=Pattern (After jb)} as x::xs -> + let father = {j=jb; h=x.hostname} + let previous = + match Map.tryFind father acc with + // TODO: error handling + | None -> Map.find father index |> List.singleton + | Some p -> p + let acc' = Map.add father (x::previous) acc + build_job_table acc' index xs + | {when_=Cron _}::_ -> failwith "TODO" // TODO - let acc' = System.Diagnostics.Trace.Assert -*) - parents -*) + all_jobs + |> Result.map build_deps + |> Result.map (fun (standalone, deps) -> build_job_table Map.empty standalone deps) diff --git a/src/DatabaseMigrations.fs b/src/DatabaseMigrations.fs index 8004b17..25ea37d 100644 --- a/src/DatabaseMigrations.fs +++ b/src/DatabaseMigrations.fs @@ -190,3 +190,10 @@ type BacklogDefaultTS () = override x.Up() = """ALTER TABLE backlog ALTER COLUMN done_at SET DEFAULT current_timestamp;""" |> x.Execute.Sql |> ignore + +[] +type BacklogAddStarted () = + inherit OnlyUp () + override x.Up() = + x.Alter.Table("backlog").AddColumn("started_at").AsCustom "timestamptz" + |> ignore diff --git a/src/Datatypes.fs b/src/Datatypes.fs index 5ff70b1..97ec5eb 100644 --- a/src/Datatypes.fs +++ b/src/Datatypes.fs @@ -9,7 +9,7 @@ open Pentole.Path type Notification = | Time | Database [] -type PatternType = | Hourly | Daily | Weekly | Monthly | After of string | Before of string +type PatternType = After of string [] type WhenExpr = | Cron of Instant | Pattern of PatternType @@ -25,4 +25,5 @@ type CronJob = { environment: (string * string) list workdir: Path hostname: string + last_completed_at: Instant } diff --git a/src/Result.fs b/src/Result.fs index fb45750..a60e265 100644 --- a/src/Result.fs +++ b/src/Result.fs @@ -1,6 +1,7 @@ namespace Pentole module Result = + let inline protect ([]f) x = try Ok (f x) @@ -14,12 +15,28 @@ module Result = let of_option = function | Some s -> Ok s | None -> Error () + let zip a b = + match (a, b) with + | Ok a, Ok b -> Ok (a, b) + | Error e, _ -> Error e + | _, Error e -> Error e + type ToStringWrapper(toString) = - override this.ToString() = toString () + override _.ToString() = toString () let Result l = ToStringWrapper(fun _ -> match l with | Ok o -> sprintf "Ok %O" o | _ -> failwith "") +module ResultList = + let collect (lambda: 'a -> Result<'ok, 'err>) (seq_: 'a seq) = + let rec iter_ acc seq_ = + match Seq.tryHead seq_ with + | None -> Ok acc + | Some x -> + match lambda x with + | Error e -> Error e + | Ok o -> iter_ (o::acc) (Seq.tail seq_) + iter_ [] seq_