This commit is contained in:
bparodi@lezzo.org 2024-12-03 11:01:29 +01:00
parent 4564c1624c
commit 96e5825406
8 changed files with 196 additions and 108 deletions

View file

@ -92,63 +92,84 @@ let private parse now (db: Requirements) =
environment = List.ofSeq env
workdir = workdir
hostname = db.hostname })
type JobKey = {
j: string; h: string
}
let sort_jobs (now: Instant) (db_crons: Requirements seq) =
let build_deps (lst: CronJobDefinition list) =
let standalone, deps =
List.partition (function | {when_=Cron _} -> true | _ -> false) lst
let index =
standalone
|> Seq.map (fun x -> ({j=x.job_name; h=x.hostname}, x))
|> Map.ofSeq
in index, deps
in
let rec build_dependencies acc (all_jobs: Map<JobKey, CronJobDefinition>) = function
| [] -> Ok acc
| {when_=Cron _}::_ -> invalidOp "The jobs should have been partitioned"
| {when_=Pattern (After jb)} as x::xs ->
let key = {j=jb; h=x.hostname}
(* Have I seen this job-hostname already? *)
match Map.tryFind key acc, Map.tryFind key all_jobs with
| Some p, _ ->
(* Yes. We have [initial_job; job_after_this; job_after_this; ...] *)
let acc' = (x::p, acc) ||> Map.add key
build_dependencies acc' all_jobs xs
type Key = {h: string; j: string;}
let sort_jobs (now: Instant) (db_crons: Requirements seq): Result<ChainOfJobs list, string> =
(* We are only interested in ``when``, hostname and job_name *)
let sort (job_list: CronJobDefinition list) =
let timetable =
job_list
|> List.choose (function
|{when_=Cron instant; hostname=h; job_name=j} ->
Some ({h=h; j=j}, instant)
| _ -> None)
|> Map.ofList
let job_with_parents =
job_list
|> List.map (function
| {when_=Cron _} as x ->
({h = x.hostname; j = x.job_name}, (None, convert x))
| {when_=Pattern (After after)} as x ->
let parent = Some {h=x.hostname; j=after}
({h=x.hostname; j=x.job_name}, (parent, convert x)))
|> Map.ofList
| None, Some f ->
let acc' = (x::f::[], acc) ||> Map.add key
build_dependencies acc' all_jobs xs
let rec n_parents acc = function
| (None, _) -> Ok acc
| (Some parent, _cronjob) ->
let parent = job_with_parents |> Map.find parent
match parent with
| Error e -> Error $"Is list malformed? {e}"
| Ok parent -> n_parents (1 + acc) parent
let scored =
job_with_parents
|> Map.map (fun _key (parent, job) ->
n_parents 0 (parent, job)
|> Result.map (fun score ->
(score, (parent, job))))
| None, None ->
$"Invalid job definition. No such job_name {jb} in host {x.hostname}"
|> Error
let sorted =
scored
|> Map.toList
|> ResultList.collect snd
|> Result.map (List.sortBy (fun (_key, (score, _)) -> score))
let rec build acc : list<int * (option<Key> * CronJob)> -> (int * CronJob list) list = function
| [] -> acc
| (score, (_, target))::rest ->
let hostname, jname = target.hostname, target.job_name
let children =
job_with_parents
|> Map.values
|> List.choose (function
| (Some {h=h; j=j}, cj) when j=jname && h=hostname -> Some cj
| _ -> None)
|> List.collect (fun child -> build [] [(0, (None, child))] |> List.collect snd)
let acc' = (score, target::children)::acc
build acc' rest
match sorted with
| Error e -> Error e
| Ok sorted ->
build [] sorted
|> List.rev
|> List.takeWhile (function (score, _) -> score = 0)
|> List.map snd
|> List.map (function
| [] -> invalidOp "Didn't expect an empty list"
| (hd::_) as jobs ->
let k = {h=hd.hostname; j=hd.job_name}
let fail _ =
$"Can't compute scheduled time {k}" |> invalidOp
let at = timetable |> Map.find k |> Result.defaultWith fail
{scheduled_at=at; jobs=jobs})
|> Ok
db_crons
|> ResultList.collect (parse now)
|> Result.bind (fun job_list ->
let standalone, deps = build_deps job_list
// printfn "all=%A with_deps=%A" standalone deps
let jobs_with_deps = build_dependencies Map.empty standalone deps
jobs_with_deps
|> Result.map (fun jobs_with_deps ->
let _, jobs_without_deps =
Map.partition (fun k _ -> Map.containsKey k jobs_with_deps) standalone
jobs_without_deps
|> Map.values
|> List.map List.singleton
|> List.append (Map.values jobs_with_deps |> List.map List.rev)
|> List.map (function
| {when_=Cron instant} as hd::tail ->
let hd' = convert hd
let tail' = List.map convert tail
{scheduled_at=instant; head=hd'; rest=tail'}
| x -> invalidOp $"List is malformed? {x}")))
|> Result.bind sort

View file

@ -197,3 +197,11 @@ type BacklogAddStarted () =
override x.Up() =
x.Alter.Table("backlog").AddColumn("started_at").AsCustom "timestamptz"
|> ignore
[<Migration(20241203_0000L)>]
type RemoveUselessTables () =
inherit OnlyUp ()
override x.Up() =
x.Delete.Table "current_jobs"
|> ignore

View file

@ -18,15 +18,26 @@ type CronJob = {
workdir: Path
hostname: string
// last_completed_at: Instant
}
} with
member x.info_string () =
let cwd = x.workdir.ToString ()
let vars =
x.environment
|> List.map (fun (k, v) -> $"{k}='{v}' ")
|> String.concat ""
let args =
x.args
|> String.concat " "
let cmd = sprintf "%A %A" x.executable args
sprintf "User: %s\nCwd: %s\nEnvironment: %s\nCommand: %s" x.user cwd vars cmd
[<Immutable>]
[<GenerateSerializer>]
type ChainOfJobs = {
scheduled_at: Instant
head: CronJob
rest: CronJob list
jobs: CronJob list
}
[<CLIMutable>]
@ -41,3 +52,8 @@ type Requirements = {
environment: string
done_at: System.DateTime option
}
type RunResult =
| Success of string | Failure of (int * string)
| Unknown of string | NoShell of string
| NoPermissionOnFolder | NoPrivilegeToUser

View file

@ -1,7 +1,6 @@
module Bidello.Grains
open Orleans
open System.Collections.Concurrent
open System.Threading
open System.Threading.Tasks
@ -11,34 +10,63 @@ open Bidello.Shell
type IShellGrain =
inherit IGrainWithIntegerKey
abstract schedule: CancellationToken -> ChainOfJobs -> Task
abstract schedule: CancellationToken -> ChainOfJobs -> ValueTask
type ShellGrain() =
inherit Orleans.Grain ()
interface IShellGrain with
member _.schedule (ct) (jobs: ChainOfJobs) = task {
let! rc =
jobs.head
|> run_job
(*
let rec run = function
| [] -> ()
| x::xs ->
let! rc = run_job x
match rc with
| NoShell reason | Unknown reason ->
member _.schedule (ct) (jobs: ChainOfJobs) =
let log (job: CronJob) = function
| Success _stdout ->
$"Action: {job.info_string()}, returned code = 0"
|> Logging.logger.Information
| Failure (rc, _stderr) ->
$"Action: {job.info_string()}, returned code = {rc}"
|> Logging.logger.Error
| NoShell reason ->
$"Can't call shell: {reason}"
|> Logging.logger.Fatal
| Unknown reason ->
$"Unknown exception in job runner: {reason}"
|> Logging.logger.Error
| NoPermissionOnFolder ->
$"Action: {job.info_string()}, failed because of insufficient permission on folder"
|> Logging.logger.Fatal
| NoPrivilegeToUser ->
$"Action: {job.info_string()}, failed because insufficient permissions to run command as user"
|> Logging.logger.Fatal
let rec run_ (hd: CronJob) (tl: CronJob list) = async {
let! rc = run_job ct hd |> Async.AwaitTask
log hd rc
match rc, tl with
| (Success stdout, hd'::tl')->
printfn "rc = Ok %A" stdout
return! run_ hd' tl'
| (Success stdout, []) ->
printfn "rc = Ok %A" stdout
return ()
| (NoShell reason | Unknown reason), _ ->
printfn "Greve: %A" reason
| Success stdout -> printfn "rc = Ok %A" stdout
| NoPermissionOnFolder -> printfn "NO perms on folder"
| NoPrivilegeToUser -> printfn "NO privilege to user"
| Failure (_rc, stderr) -> printfn "rc ERror = = stderr %A" stderr
run xs
run (jobs.head::jobs.rest)
*)
failwith "todoo"
}
return ()
| (NoPermissionOnFolder, _) ->
printfn "NO perms on folder"
return ()
| (NoPrivilegeToUser, _) ->
printfn "NO privilege to user"
return ()
| (Failure (_rc, stderr), _) ->
printfn "rc ERror = = stderr %A" stderr
return ()
}
jobs.jobs
|> function | [] -> None | hd::tl -> Some (hd, tl)
|> Option.map (fun jobs ->
let tsk = jobs ||> run_
Async.StartAsTask (tsk, TaskCreationOptions.LongRunning, ct)
|> ValueTask)
|> Option.defaultValue (ValueTask ())

View file

@ -17,15 +17,7 @@ open Pentole
open Bidello.Datatypes
open Bidello.Environment
let logger_config: LoggingHelpers.Configuration = {
files = []
template = LoggingHelpers.Default.debug_template
theme = LoggingHelpers.Default.theme
overrides = LoggingHelpers.Default.overrides
}
let logger = LoggingHelpers.from_config logger_config
open Bidello.Logging
type Bidello(client: IClusterClient) =
inherit BackgroundService()
@ -36,11 +28,6 @@ type Bidello(client: IClusterClient) =
let schedule_jobs (jobs: ChainOfJobs) =
let runner = rnd.Next () |> client.GetGrain<IShellGrain>
// let rc =
// Shell.run_job jobs.head
// |> Async.AwaitTask
// |> Async.RunSynchronously
// printfn "JOB= %A" rc
runner.schedule ct jobs |> ignore
task {

View file

@ -3,6 +3,7 @@ module Bidello.Shell
open FSharp.Control.LazyExtensions
open System.Collections.Generic
open System.Threading
open Sheller
open Pentole.Path
@ -80,12 +81,8 @@ let which (executable: string) =
shell.Value
|> Result.bind run
type RunResult =
| Success of string | Failure of (int * string)
| Unknown of string | NoShell of string
| NoPermissionOnFolder | NoPrivilegeToUser
let run_job (cj: CronJob) = task {
let run_job (ct: CancellationToken) (cj: CronJob) = task {
let workdir = cj.workdir |> function Absolute a -> a + "/"
let executable = cj.executable |> function Absolute a -> a
@ -107,6 +104,7 @@ let run_job (cj: CronJob) = task {
si.UserName <- user
)
.UseNoThrow()
.WithCancellationToken(ct)
.ExecuteAsync ()
if rc.ExitCode = 0 then

View file

@ -14,6 +14,7 @@
<Compile Include="Result.fs" />
<Compile Include="String.fs" />
<Compile Include="Datatypes.fs" />
<Compile Include="Logging.fs" />
<Compile Include="Shell.fs" />
<Compile Include="Grains.fs" />
<Compile Include="DatabaseMigrations.fs" />

View file

@ -30,9 +30,7 @@ let bj =
let run_function x =
let reduce (cjs: ChainOfJobs) =
let hd = cjs.head |> fun cj -> (cj.hostname, cj.job_name)
let tail = cjs.rest |> List.map (fun cj -> (cj.hostname, cj.job_name))
hd::tail
cjs.jobs |> List.map (fun cj -> (cj.hostname, cj.job_name))
Cron.sort_jobs now x
|> Result.map (List.map reduce)
@ -100,17 +98,48 @@ let should_fail_no_host () =
|> Assert.is_true
[<Test>]
let job_deps_chain () =
let job_deps_chain0 () =
let requirements = [
{bj with job_name = "j1"}
{bj with job_name = "j2_after_j1"; ``when``="@after j2"}
{bj with job_name = "j2_after_j1"; ``when``="@after j1"}
{bj with job_name = "j3_after_j2"; ``when``="@after j2_after_j1"}
{bj with job_name = "j1"; hostname="h2"}
]
let cjs = run_function requirements
printfn "GOT: %A" cjs
let expected = [[("h1", "j1"); ("h1", "j2_after_j1"); ("h1", "j3_after_j2")];
[("h2", "j1")]]
let expected = [[("h1", "j1"); ("h1", "j1_after")];
[("h1", "j2"); ("h1", "j2_after")];
[("h2", "j1")]]
Assert.are_seq_equal expected cjs
[<Test>]
let job_deps_chain1 () =
let requirements = [
{bj with job_name = "j1"}
{bj with job_name = "j2_after_j1"; ``when``="@after j1"}
{bj with job_name = "j3_after_j2"; ``when``="@after j2_after_j1"}
{bj with job_name = "j2'_after_j1"; ``when``="@after j1"}
{bj with job_name = "j3'_after_j2'"; ``when``="@after j2'_after_j1"}
{bj with job_name = "j1"; hostname="h2"}
]
let cjs = run_function requirements
printfn "GOT: %A" cjs
let expected = [[("h1", "j1"); ("h1", "j2'_after_j1"); ("h1", "j3'_after_j2'");
("h1", "j2_after_j1"); ("h1", "j3_after_j2")];
[("h2", "j1")]]
Assert.are_seq_equal expected cjs
[<Test>]
let job_deps_chain_failure () =
let requirements = [
{bj with job_name = "j1"}
{bj with job_name = "should_fail"; ``when``="@after j3"}
]
Cron.sort_jobs now requirements
|> Result.isError
|> Assert.is_true