diff --git a/src/Cron.fs b/src/Cron.fs index 22cb974..2f954f4 100644 --- a/src/Cron.fs +++ b/src/Cron.fs @@ -92,63 +92,84 @@ let private parse now (db: Requirements) = environment = List.ofSeq env workdir = workdir hostname = db.hostname }) - -type JobKey = { - j: string; h: string -} -let sort_jobs (now: Instant) (db_crons: Requirements seq) = - let build_deps (lst: CronJobDefinition list) = - let standalone, deps = - List.partition (function | {when_=Cron _} -> true | _ -> false) lst - let index = - standalone - |> Seq.map (fun x -> ({j=x.job_name; h=x.hostname}, x)) - |> Map.ofSeq - in index, deps - in - let rec build_dependencies acc (all_jobs: Map) = function - | [] -> Ok acc - | {when_=Cron _}::_ -> invalidOp "The jobs should have been partitioned" - | {when_=Pattern (After jb)} as x::xs -> - let key = {j=jb; h=x.hostname} - (* Have I seen this job-hostname already? *) - match Map.tryFind key acc, Map.tryFind key all_jobs with - | Some p, _ -> - (* Yes. We have [initial_job; job_after_this; job_after_this; ...] *) - let acc' = (x::p, acc) ||> Map.add key - build_dependencies acc' all_jobs xs +type Key = {h: string; j: string;} +let sort_jobs (now: Instant) (db_crons: Requirements seq): Result = + (* We are only interested in ``when``, hostname and job_name *) + let sort (job_list: CronJobDefinition list) = + let timetable = + job_list + |> List.choose (function + |{when_=Cron instant; hostname=h; job_name=j} -> + Some ({h=h; j=j}, instant) + | _ -> None) + |> Map.ofList + + + let job_with_parents = + job_list + |> List.map (function + | {when_=Cron _} as x -> + ({h = x.hostname; j = x.job_name}, (None, convert x)) + | {when_=Pattern (After after)} as x -> + let parent = Some {h=x.hostname; j=after} + ({h=x.hostname; j=x.job_name}, (parent, convert x))) + |> Map.ofList - | None, Some f -> - let acc' = (x::f::[], acc) ||> Map.add key - build_dependencies acc' all_jobs xs + let rec n_parents acc = function + | (None, _) -> Ok acc + | (Some parent, _cronjob) -> + let parent = job_with_parents |> Map.find parent + match parent with + | Error e -> Error $"Is list malformed? {e}" + | Ok parent -> n_parents (1 + acc) parent + let scored = + job_with_parents + |> Map.map (fun _key (parent, job) -> + n_parents 0 (parent, job) + |> Result.map (fun score -> + (score, (parent, job)))) - | None, None -> - $"Invalid job definition. No such job_name {jb} in host {x.hostname}" - |> Error + let sorted = + scored + |> Map.toList + |> ResultList.collect snd + |> Result.map (List.sortBy (fun (_key, (score, _)) -> score)) + + let rec build acc : list * CronJob)> -> (int * CronJob list) list = function + | [] -> acc + | (score, (_, target))::rest -> + let hostname, jname = target.hostname, target.job_name + let children = + job_with_parents + |> Map.values + |> List.choose (function + | (Some {h=h; j=j}, cj) when j=jname && h=hostname -> Some cj + | _ -> None) + |> List.collect (fun child -> build [] [(0, (None, child))] |> List.collect snd) + + let acc' = (score, target::children)::acc + build acc' rest + + match sorted with + | Error e -> Error e + | Ok sorted -> + build [] sorted + |> List.rev + |> List.takeWhile (function (score, _) -> score = 0) + |> List.map snd + |> List.map (function + | [] -> invalidOp "Didn't expect an empty list" + | (hd::_) as jobs -> + let k = {h=hd.hostname; j=hd.job_name} + let fail _ = + $"Can't compute scheduled time {k}" |> invalidOp + + let at = timetable |> Map.find k |> Result.defaultWith fail + {scheduled_at=at; jobs=jobs}) + |> Ok db_crons |> ResultList.collect (parse now) - |> Result.bind (fun job_list -> - let standalone, deps = build_deps job_list - - // printfn "all=%A with_deps=%A" standalone deps - - let jobs_with_deps = build_dependencies Map.empty standalone deps - - jobs_with_deps - |> Result.map (fun jobs_with_deps -> - let _, jobs_without_deps = - Map.partition (fun k _ -> Map.containsKey k jobs_with_deps) standalone - - jobs_without_deps - |> Map.values - |> List.map List.singleton - |> List.append (Map.values jobs_with_deps |> List.map List.rev) - |> List.map (function - | {when_=Cron instant} as hd::tail -> - let hd' = convert hd - let tail' = List.map convert tail - {scheduled_at=instant; head=hd'; rest=tail'} - | x -> invalidOp $"List is malformed? {x}"))) + |> Result.bind sort diff --git a/src/DatabaseMigrations.fs b/src/DatabaseMigrations.fs index 25ea37d..3c78d1c 100644 --- a/src/DatabaseMigrations.fs +++ b/src/DatabaseMigrations.fs @@ -197,3 +197,11 @@ type BacklogAddStarted () = override x.Up() = x.Alter.Table("backlog").AddColumn("started_at").AsCustom "timestamptz" |> ignore + +[] +type RemoveUselessTables () = + + inherit OnlyUp () + override x.Up() = + x.Delete.Table "current_jobs" + |> ignore diff --git a/src/Datatypes.fs b/src/Datatypes.fs index bfece5b..f8714a3 100644 --- a/src/Datatypes.fs +++ b/src/Datatypes.fs @@ -18,15 +18,26 @@ type CronJob = { workdir: Path hostname: string // last_completed_at: Instant -} +} with + member x.info_string () = + let cwd = x.workdir.ToString () + let vars = + x.environment + |> List.map (fun (k, v) -> $"{k}='{v}' ") + |> String.concat "" + let args = + x.args + |> String.concat " " + let cmd = sprintf "%A %A" x.executable args + + sprintf "User: %s\nCwd: %s\nEnvironment: %s\nCommand: %s" x.user cwd vars cmd [] [] type ChainOfJobs = { scheduled_at: Instant - head: CronJob - rest: CronJob list + jobs: CronJob list } [] @@ -41,3 +52,8 @@ type Requirements = { environment: string done_at: System.DateTime option } + +type RunResult = + | Success of string | Failure of (int * string) + | Unknown of string | NoShell of string + | NoPermissionOnFolder | NoPrivilegeToUser diff --git a/src/Grains.fs b/src/Grains.fs index 9d07e7b..7bc037d 100644 --- a/src/Grains.fs +++ b/src/Grains.fs @@ -1,7 +1,6 @@ module Bidello.Grains open Orleans -open System.Collections.Concurrent open System.Threading open System.Threading.Tasks @@ -11,34 +10,63 @@ open Bidello.Shell type IShellGrain = inherit IGrainWithIntegerKey - abstract schedule: CancellationToken -> ChainOfJobs -> Task + abstract schedule: CancellationToken -> ChainOfJobs -> ValueTask type ShellGrain() = inherit Orleans.Grain () interface IShellGrain with - member _.schedule (ct) (jobs: ChainOfJobs) = task { - let! rc = - jobs.head - |> run_job -(* - let rec run = function - | [] -> () - | x::xs -> - let! rc = run_job x - - match rc with - | NoShell reason | Unknown reason -> + member _.schedule (ct) (jobs: ChainOfJobs) = + + let log (job: CronJob) = function + | Success _stdout -> + $"Action: {job.info_string()}, returned code = 0" + |> Logging.logger.Information + | Failure (rc, _stderr) -> + $"Action: {job.info_string()}, returned code = {rc}" + |> Logging.logger.Error + | NoShell reason -> + $"Can't call shell: {reason}" + |> Logging.logger.Fatal + | Unknown reason -> + $"Unknown exception in job runner: {reason}" + |> Logging.logger.Error + | NoPermissionOnFolder -> + $"Action: {job.info_string()}, failed because of insufficient permission on folder" + |> Logging.logger.Fatal + | NoPrivilegeToUser -> + $"Action: {job.info_string()}, failed because insufficient permissions to run command as user" + |> Logging.logger.Fatal + + let rec run_ (hd: CronJob) (tl: CronJob list) = async { + let! rc = run_job ct hd |> Async.AwaitTask + log hd rc + match rc, tl with + | (Success stdout, hd'::tl')-> + printfn "rc = Ok %A" stdout + return! run_ hd' tl' + | (Success stdout, []) -> + printfn "rc = Ok %A" stdout + return () + | (NoShell reason | Unknown reason), _ -> printfn "Greve: %A" reason - | Success stdout -> printfn "rc = Ok %A" stdout - | NoPermissionOnFolder -> printfn "NO perms on folder" - | NoPrivilegeToUser -> printfn "NO privilege to user" - | Failure (_rc, stderr) -> printfn "rc ERror = = stderr %A" stderr - - run xs - - run (jobs.head::jobs.rest) - *) - failwith "todoo" -} + return () + | (NoPermissionOnFolder, _) -> + printfn "NO perms on folder" + return () + | (NoPrivilegeToUser, _) -> + printfn "NO privilege to user" + return () + | (Failure (_rc, stderr), _) -> + printfn "rc ERror = = stderr %A" stderr + return () + } + + jobs.jobs + |> function | [] -> None | hd::tl -> Some (hd, tl) + |> Option.map (fun jobs -> + let tsk = jobs ||> run_ + Async.StartAsTask (tsk, TaskCreationOptions.LongRunning, ct) + |> ValueTask) + |> Option.defaultValue (ValueTask ()) diff --git a/src/Library.fs b/src/Library.fs index 005a601..8b0a706 100644 --- a/src/Library.fs +++ b/src/Library.fs @@ -17,15 +17,7 @@ open Pentole open Bidello.Datatypes open Bidello.Environment - -let logger_config: LoggingHelpers.Configuration = { - files = [] - template = LoggingHelpers.Default.debug_template - theme = LoggingHelpers.Default.theme - overrides = LoggingHelpers.Default.overrides -} - -let logger = LoggingHelpers.from_config logger_config +open Bidello.Logging type Bidello(client: IClusterClient) = inherit BackgroundService() @@ -36,11 +28,6 @@ type Bidello(client: IClusterClient) = let schedule_jobs (jobs: ChainOfJobs) = let runner = rnd.Next () |> client.GetGrain - // let rc = - // Shell.run_job jobs.head - // |> Async.AwaitTask - // |> Async.RunSynchronously - // printfn "JOB= %A" rc runner.schedule ct jobs |> ignore task { diff --git a/src/Shell.fs b/src/Shell.fs index 67b99b8..be928cc 100644 --- a/src/Shell.fs +++ b/src/Shell.fs @@ -3,6 +3,7 @@ module Bidello.Shell open FSharp.Control.LazyExtensions open System.Collections.Generic +open System.Threading open Sheller open Pentole.Path @@ -80,12 +81,8 @@ let which (executable: string) = shell.Value |> Result.bind run -type RunResult = - | Success of string | Failure of (int * string) - | Unknown of string | NoShell of string - | NoPermissionOnFolder | NoPrivilegeToUser -let run_job (cj: CronJob) = task { +let run_job (ct: CancellationToken) (cj: CronJob) = task { let workdir = cj.workdir |> function Absolute a -> a + "/" let executable = cj.executable |> function Absolute a -> a @@ -107,6 +104,7 @@ let run_job (cj: CronJob) = task { si.UserName <- user ) .UseNoThrow() + .WithCancellationToken(ct) .ExecuteAsync () if rc.ExitCode = 0 then diff --git a/src/src.fsproj b/src/src.fsproj index 804e42b..03f0fd4 100644 --- a/src/src.fsproj +++ b/src/src.fsproj @@ -14,6 +14,7 @@ + diff --git a/tests/UnitTest1.fs b/tests/UnitTest1.fs index 8b7ab5b..e335280 100644 --- a/tests/UnitTest1.fs +++ b/tests/UnitTest1.fs @@ -30,9 +30,7 @@ let bj = let run_function x = let reduce (cjs: ChainOfJobs) = - let hd = cjs.head |> fun cj -> (cj.hostname, cj.job_name) - let tail = cjs.rest |> List.map (fun cj -> (cj.hostname, cj.job_name)) - hd::tail + cjs.jobs |> List.map (fun cj -> (cj.hostname, cj.job_name)) Cron.sort_jobs now x |> Result.map (List.map reduce) @@ -100,17 +98,48 @@ let should_fail_no_host () = |> Assert.is_true [] -let job_deps_chain () = +let job_deps_chain0 () = let requirements = [ {bj with job_name = "j1"} - {bj with job_name = "j2_after_j1"; ``when``="@after j2"} + {bj with job_name = "j2_after_j1"; ``when``="@after j1"} {bj with job_name = "j3_after_j2"; ``when``="@after j2_after_j1"} + {bj with job_name = "j1"; hostname="h2"} ] let cjs = run_function requirements + printfn "GOT: %A" cjs + let expected = [[("h1", "j1"); ("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; + [("h2", "j1")]] - let expected = [[("h1", "j1"); ("h1", "j1_after")]; - [("h1", "j2"); ("h1", "j2_after")]; - [("h2", "j1")]] Assert.are_seq_equal expected cjs + +[] +let job_deps_chain1 () = + let requirements = [ + {bj with job_name = "j1"} + {bj with job_name = "j2_after_j1"; ``when``="@after j1"} + {bj with job_name = "j3_after_j2"; ``when``="@after j2_after_j1"} + {bj with job_name = "j2'_after_j1"; ``when``="@after j1"} + {bj with job_name = "j3'_after_j2'"; ``when``="@after j2'_after_j1"} + {bj with job_name = "j1"; hostname="h2"} + ] + + let cjs = run_function requirements + printfn "GOT: %A" cjs + let expected = [[("h1", "j1"); ("h1", "j2'_after_j1"); ("h1", "j3'_after_j2'"); + ("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; + [("h2", "j1")]] + + Assert.are_seq_equal expected cjs + +[] +let job_deps_chain_failure () = + let requirements = [ + {bj with job_name = "j1"} + {bj with job_name = "should_fail"; ``when``="@after j3"} + ] + + Cron.sort_jobs now requirements + |> Result.isError + |> Assert.is_true