From 47c378bfecb7baef8908f3d8aa0278b1d44cc0a4 Mon Sep 17 00:00:00 2001 From: "bparodi@lezzo.org" Date: Wed, 6 Nov 2024 18:43:54 +0100 Subject: [PATCH] more --- .gitignore | 1 + src/Cron.fs | 59 +++++++++++++++++++++++++++++++++++----------- src/Database.fs | 4 +--- src/Datatypes.fs | 20 ++++++++-------- src/Grains.fs | 10 +++++--- src/Library.fs | 41 ++++++++++++++++++++++---------- src/Which.fs | 17 ++++++++----- src/src.fsproj | 1 + tests/UnitTest1.fs | 13 ++++++---- 9 files changed, 113 insertions(+), 53 deletions(-) diff --git a/.gitignore b/.gitignore index d1984a3..b3e478e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.secret +*.jpeg ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. diff --git a/src/Cron.fs b/src/Cron.fs index e149a8f..8be832b 100644 --- a/src/Cron.fs +++ b/src/Cron.fs @@ -2,7 +2,6 @@ module Bidello.Cron open NodaTime open Cronos -open System.Collections.Immutable open Pentole.Path open Pentole @@ -12,9 +11,40 @@ open Pentole.String open Pentole.Map open Pentole -type User = string + + + +type PatternType = After of string + + +type WhenExpr = | Cron of Instant | Pattern of PatternType + + +type CronJobDefinition = { + job_name: string + user: string + when_: WhenExpr + executable: Path + args: string list + environment: (string * string) list + workdir: Path + hostname: string +} + +let private convert def = { + job_name = def.job_name + hostname = def.hostname + user = def.user + executable = def.executable + args = def.args + environment = def.environment + workdir = def.workdir +} + + let local_tz = DateTimeZoneProviders.Tzdb.GetSystemDefault () + let local_tz_net = System.TimeZoneInfo.Local let private parse_expr (now: Instant) text = @@ -45,6 +75,7 @@ let private parse now (db: Requirements) = let when_ = parse_expr now db.``when`` let env = db.environment + |> fun s -> if isNull s then "" else s |> String.split "|" |> ResultList.collect (fun env_item -> let splitted = env_item |> String.split "=" @@ -56,11 +87,6 @@ let private parse now (db: Requirements) = let executable = Which.which db.executable let workdir = Path.of_string db.workdir |> Result.bind FileSystem.resolve - let last_completed = - db.done_at - |> Option.map Instant.FromDateTimeUtc - |> Option.defaultValue Instant.MinValue - Result.zip when_ env |> Result.zip (Result.zip executable workdir) |> Result.map (fun ((executable, workdir), (when_, env)) -> @@ -71,15 +97,14 @@ let private parse now (db: Requirements) = args = db.args |> List.ofArray environment = List.ofSeq env workdir = workdir - hostname = db.hostname - last_completed_at = last_completed }) - + hostname = db.hostname }) + type JobKey = { j: string; h: string } -let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) = +let sort_jobs (now: Instant) (db_crons: Requirements seq) = - let build_deps (lst: CronJob list) = + let build_deps (lst: CronJobDefinition list) = let standalone, deps = List.partition (function | {when_=Cron _} -> true | _ -> false) lst @@ -89,7 +114,7 @@ let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) = |> Map.ofSeq in index, deps in - let rec build_dependencies acc (all_jobs: Map) = function + let rec build_dependencies acc (all_jobs: Map) = function | [] -> Ok acc | {when_=Cron _}::_ -> invalidOp "The jobs should have been partitioned" | {when_=Pattern (After jb)} as x::xs -> @@ -126,4 +151,10 @@ let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) = jobs_without_deps |> Map.values |> List.map List.singleton - |> List.append (Map.values jobs_with_deps |> List.map List.rev))) + |> List.append (Map.values jobs_with_deps |> List.map List.rev) + |> List.map (function + | {when_=Cron instant} as hd::tail -> + let hd' = convert hd + let tail' = List.map convert tail + {scheduled_at=instant; head=hd'; rest=tail'} + | x -> invalidOp $"List is malformed? {x}"))) diff --git a/src/Database.fs b/src/Database.fs index d4c6c44..a39b376 100644 --- a/src/Database.fs +++ b/src/Database.fs @@ -63,11 +63,9 @@ let wait_notification (ct: CancellationToken) (db: t) = let gather_requirements (hostname: string) (ct: CancellationToken) (db: t) = let query = """select c.job_name, c."when", c.executable, c.user, c.workdir, c.args, h.hostname, - STRING_AGG(e.variable || '=' || e.value, '|') AS environment_variables, - max(b.done_at) as done_at + STRING_AGG(e.variable || '=' || e.value, '|') AS environment_variables from cron c left join environment e on c.job_name = e.job_name -left join backlog b on b.job_name = c.job_name and b.hostname = @hostname join hosts h on h.job_name = c.job_name where h.hostname = @hostname group by c.job_name, h.hostname """ diff --git a/src/Datatypes.fs b/src/Datatypes.fs index a870d54..29d32ba 100644 --- a/src/Datatypes.fs +++ b/src/Datatypes.fs @@ -8,24 +8,24 @@ open Pentole.Path [] type Notification = | Time | Database -[] -type PatternType = After of string - -[] -type WhenExpr = | Cron of Instant | Pattern of PatternType - -[] -[] type CronJob = { job_name: string user: string - when_: WhenExpr executable: Path args: string list environment: (string * string) list workdir: Path hostname: string - last_completed_at: Instant + // last_completed_at: Instant +} + + +[] +[] +type ChainOfJobs = { + scheduled_at: Instant + head: CronJob + rest: CronJob list } [] diff --git a/src/Grains.fs b/src/Grains.fs index cfd42b3..342701e 100644 --- a/src/Grains.fs +++ b/src/Grains.fs @@ -10,13 +10,17 @@ open Bidello.Datatypes type IShellGrain = inherit IGrainWithIntegerKey - abstract schedule: CancellationToken -> CronJob -> Task + abstract schedule: CancellationToken -> ChainOfJobs -> Task type ShellGrain() = inherit Orleans.Grain () interface IShellGrain with - member _.schedule (ct) (job: CronJob) = task { - printfn "Grain view: %A" job + member _.schedule (ct) (jobs: ChainOfJobs) = task { + printfn "Grain view: --------" + printfn "HEAD= %A" jobs.head + jobs.rest |> List.iter (printfn "%A") + printfn "--------" + } diff --git a/src/Library.fs b/src/Library.fs index 5f7e70d..c06a7e4 100644 --- a/src/Library.fs +++ b/src/Library.fs @@ -1,6 +1,7 @@ module Bidello.Main open System.Threading +open System.Threading.Tasks open System open NodaTime @@ -29,9 +30,7 @@ let logger = LoggingHelpers.from_config logger_config (* Features: - per user cron jobs - environment variables -- output management (email, syslog) -- special time specs: @weekly -- randomized execution times +- special time specs: @weekly -> NOPE - conditional cron jobs: check something, then run - concurrency management - job dependency @@ -46,9 +45,9 @@ type Bidello(client: IClusterClient) = let rnd = new Random (2) let db = Database.make logger - let schedule_jobs (job: CronJob) = + let schedule_jobs (jobs: ChainOfJobs) = let runner = rnd.Next () |> client.GetGrain - runner.schedule ct job |> ignore + runner.schedule ct jobs |> ignore task { while not ct.IsCancellationRequested do @@ -59,15 +58,33 @@ type Bidello(client: IClusterClient) = Database.gather_requirements hostname ct db let now = SystemClock.Instance.GetCurrentInstant () + let one_min_ago = Duration.FromMinutes 1L |> now.Minus + + (* the granularity is one minute *) + let todo_list = + Cron.sort_jobs one_min_ago requirements + |> Result.map (List.filter (fun cj -> cj.scheduled_at <= now)) - // let cronjobs = - // Cron.build_sorted_jobs_table logger hostname now requirements + let next_job_at = + Cron.sort_jobs now requirements (* in the future *) + |> Result.toOption + |> Option.bind (function | [] -> None | xs -> Some xs) + |> Option.map (fun xs -> List.minBy (_.scheduled_at) xs |> _.scheduled_at) + + match todo_list with + | Error e -> logger.Error $"Can't schedule cronjobs. Reason: {e}" + | Ok cronjobs -> + cronjobs + |> Seq.iter schedule_jobs - // cronjobs |> Seq.iter schedule_jobs - printfn "%A" requirements - - - let! _wake_up = db |> Database.wait_notification ct + let! _wake_up = + match next_job_at with + | None -> db |> Database.wait_notification ct + | Some time -> + let by_ = time - now |> _.TotalMilliseconds |> floor |> int32 + let db_change = db |> Database.wait_notification ct + let timer = Task.Delay (by_, ct) + Task.WhenAny [timer; db_change] () } diff --git a/src/Which.fs b/src/Which.fs index 273c97c..3975db5 100644 --- a/src/Which.fs +++ b/src/Which.fs @@ -18,18 +18,23 @@ let which (executable: string) = let shell = List.tryFind (fun sh -> sh |> Path.of_string |> Result.isOk) possible_shells if shell |> Option.isSome then - try + let rc = Builder .UseShell(shell.Value) .UseExecutable("which") .WithArgument(executable) + .UseNoThrow() .ExecuteAsync () |> Async.AwaitTask |> Async.RunSynchronously - |> _.StandardOutput - |> _.Trim() - |> Path.of_string - |> Result.bind FileSystem.resolve - with exn -> exn.Message |> Error + if rc.ExitCode = 0 then + rc.StandardOutput + |> _.Trim() + |> Path.of_string + elif rc.ExitCode = 1 then + Error $"Can't find executable path for \"{executable}\"." + else + rc.StandardError + |> Error else Error "Can't find the system shell. Check your system $PATH." diff --git a/src/src.fsproj b/src/src.fsproj index a2d0bed..081c662 100644 --- a/src/src.fsproj +++ b/src/src.fsproj @@ -33,6 +33,7 @@ + diff --git a/tests/UnitTest1.fs b/tests/UnitTest1.fs index 5f6ba79..4aef5f1 100644 --- a/tests/UnitTest1.fs +++ b/tests/UnitTest1.fs @@ -16,7 +16,7 @@ let string_prefix_active_pattern () = | _ -> Assert.Pass () match "@after job " with - | Prefix "@after" j -> Assert.Pass () + | Prefix "@after" _ -> Assert.Pass () | _ -> Assert.Pass () let bj = @@ -29,10 +29,13 @@ let bj = done_at = None } let run_function x = - let reduce (cj: CronJob) = (cj.hostname, cj.job_name) + let reduce (cjs: ChainOfJobs) = + let hd = cjs.head |> fun cj -> (cj.hostname, cj.job_name) + let tail = cjs.rest |> List.map (fun cj -> (cj.hostname, cj.job_name)) + hd::tail - Cron.sort_cron_jobs now x - |> Result.map (List.map (List.map reduce)) + Cron.sort_jobs now x + |> Result.map (List.map reduce) |> Pentole.Result.get [] @@ -92,6 +95,6 @@ let should_fail_no_host () = ] - Cron.sort_cron_jobs now requirements + Cron.sort_jobs now requirements |> Result.isError |> Assert.is_true