This commit is contained in: 2024-11-06 18:43:54 +01:00
parent fb3f8d6892
commit 47c378bfec
9 changed files with 113 additions and 53 deletions

.gitignore vendored
View file

@ -1,4 +1,5 @@
## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.

View file

@ -2,7 +2,6 @@ module Bidello.Cron
open NodaTime
open Cronos
open System.Collections.Immutable
open Pentole.Path
open Pentole
@ -12,9 +11,40 @@ open Pentole.String
open Pentole.Map
open Pentole
type User = string
type PatternType = After of string
type WhenExpr = | Cron of Instant | Pattern of PatternType
type CronJobDefinition = {
job_name: string
user: string
when_: WhenExpr
executable: Path
args: string list
environment: (string * string) list
workdir: Path
hostname: string
let private convert def = {
job_name = def.job_name
hostname = def.hostname
user = def.user
executable = def.executable
args = def.args
environment = def.environment
workdir = def.workdir
let local_tz = DateTimeZoneProviders.Tzdb.GetSystemDefault ()
let local_tz_net = System.TimeZoneInfo.Local
let private parse_expr (now: Instant) text =
@ -45,6 +75,7 @@ let private parse now (db: Requirements) =
let when_ = parse_expr now db.``when``
let env =
|> fun s -> if isNull s then "" else s
|> String.split "|"
|> ResultList.collect (fun env_item ->
let splitted = env_item |> String.split "="
@ -56,11 +87,6 @@ let private parse now (db: Requirements) =
let executable = Which.which db.executable
let workdir = Path.of_string db.workdir |> Result.bind FileSystem.resolve
let last_completed =
|> Instant.FromDateTimeUtc
|> Option.defaultValue Instant.MinValue when_ env
|> ( executable workdir)
|> (fun ((executable, workdir), (when_, env)) ->
@ -71,15 +97,14 @@ let private parse now (db: Requirements) =
args = db.args |> List.ofArray
environment = List.ofSeq env
workdir = workdir
hostname = db.hostname
last_completed_at = last_completed })
hostname = db.hostname })
type JobKey = {
j: string; h: string
let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) =
let sort_jobs (now: Instant) (db_crons: Requirements seq) =
let build_deps (lst: CronJob list) =
let build_deps (lst: CronJobDefinition list) =
let standalone, deps =
List.partition (function | {when_=Cron _} -> true | _ -> false) lst
@ -89,7 +114,7 @@ let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) =
|> Map.ofSeq
in index, deps
let rec build_dependencies acc (all_jobs: Map<JobKey, CronJob>) = function
let rec build_dependencies acc (all_jobs: Map<JobKey, CronJobDefinition>) = function
| [] -> Ok acc
| {when_=Cron _}::_ -> invalidOp "The jobs should have been partitioned"
| {when_=Pattern (After jb)} as x::xs ->
@ -126,4 +151,10 @@ let sort_cron_jobs (now: Instant) (db_crons: Requirements seq) =
|> Map.values
|> List.singleton
|> List.append (Map.values jobs_with_deps |> List.rev)))
|> List.append (Map.values jobs_with_deps |> List.rev)
|> (function
| {when_=Cron instant} as hd::tail ->
let hd' = convert hd
let tail' = convert tail
{scheduled_at=instant; head=hd'; rest=tail'}
| x -> invalidOp $"List is malformed? {x}")))

View file

@ -63,11 +63,9 @@ let wait_notification (ct: CancellationToken) (db: t) =
let gather_requirements (hostname: string) (ct: CancellationToken) (db: t) =
let query = """select
c.job_name, c."when", c.executable, c.user, c.workdir, c.args, h.hostname,
STRING_AGG(e.variable || '=' || e.value, '|') AS environment_variables,
max(b.done_at) as done_at
STRING_AGG(e.variable || '=' || e.value, '|') AS environment_variables
from cron c
left join environment e on c.job_name = e.job_name
left join backlog b on b.job_name = c.job_name and b.hostname = @hostname
join hosts h on h.job_name = c.job_name
where h.hostname = @hostname
group by c.job_name, h.hostname """

View file

@ -8,24 +8,24 @@ open Pentole.Path
type Notification = | Time | Database
type PatternType = After of string
type WhenExpr = | Cron of Instant | Pattern of PatternType
type CronJob = {
job_name: string
user: string
when_: WhenExpr
executable: Path
args: string list
environment: (string * string) list
workdir: Path
hostname: string
last_completed_at: Instant
// last_completed_at: Instant
type ChainOfJobs = {
scheduled_at: Instant
head: CronJob
rest: CronJob list

View file

@ -10,13 +10,17 @@ open Bidello.Datatypes
type IShellGrain =
inherit IGrainWithIntegerKey
abstract schedule: CancellationToken -> CronJob -> Task
abstract schedule: CancellationToken -> ChainOfJobs -> Task
type ShellGrain() =
inherit Orleans.Grain ()
interface IShellGrain with
member _.schedule (ct) (job: CronJob) = task {
printfn "Grain view: %A" job
member _.schedule (ct) (jobs: ChainOfJobs) = task {
printfn "Grain view: --------"
printfn "HEAD= %A" jobs.head |> List.iter (printfn "%A")
printfn "--------"

View file

@ -1,6 +1,7 @@
module Bidello.Main
open System.Threading
open System.Threading.Tasks
open System
open NodaTime
@ -29,9 +30,7 @@ let logger = LoggingHelpers.from_config logger_config
(* Features:
- per user cron jobs
- environment variables
- output management (email, syslog)
- special time specs: @weekly
- randomized execution times
- special time specs: @weekly -> NOPE
- conditional cron jobs: check something, then run
- concurrency management
- job dependency
@ -46,9 +45,9 @@ type Bidello(client: IClusterClient) =
let rnd = new Random (2)
let db = Database.make logger
let schedule_jobs (job: CronJob) =
let schedule_jobs (jobs: ChainOfJobs) =
let runner = rnd.Next () |> client.GetGrain<IShellGrain>
runner.schedule ct job |> ignore
runner.schedule ct jobs |> ignore
task {
while not ct.IsCancellationRequested do
@ -59,15 +58,33 @@ type Bidello(client: IClusterClient) =
Database.gather_requirements hostname ct db
let now = SystemClock.Instance.GetCurrentInstant ()
let one_min_ago = Duration.FromMinutes 1L |> now.Minus
(* the granularity is one minute *)
let todo_list =
Cron.sort_jobs one_min_ago requirements
|> (List.filter (fun cj -> cj.scheduled_at <= now))
// let cronjobs =
// Cron.build_sorted_jobs_table logger hostname now requirements
let next_job_at =
Cron.sort_jobs now requirements (* in the future *)
|> Result.toOption
|> Option.bind (function | [] -> None | xs -> Some xs)
|> (fun xs -> List.minBy (_.scheduled_at) xs |> _.scheduled_at)
match todo_list with
| Error e -> logger.Error $"Can't schedule cronjobs. Reason: {e}"
| Ok cronjobs ->
|> Seq.iter schedule_jobs
// cronjobs |> Seq.iter schedule_jobs
printfn "%A" requirements
let! _wake_up = db |> Database.wait_notification ct
let! _wake_up =
match next_job_at with
| None -> db |> Database.wait_notification ct
| Some time ->
let by_ = time - now |> _.TotalMilliseconds |> floor |> int32
let db_change = db |> Database.wait_notification ct
let timer = Task.Delay (by_, ct)
Task.WhenAny [timer; db_change]

View file

@ -18,18 +18,23 @@ let which (executable: string) =
let shell = List.tryFind (fun sh -> sh |> Path.of_string |> Result.isOk) possible_shells
if shell |> Option.isSome then
let rc =
.ExecuteAsync ()
|> Async.AwaitTask
|> Async.RunSynchronously
|> _.StandardOutput
|> _.Trim()
|> Path.of_string
|> Result.bind FileSystem.resolve
with exn -> exn.Message |> Error
if rc.ExitCode = 0 then
|> _.Trim()
|> Path.of_string
elif rc.ExitCode = 1 then
Error $"Can't find executable path for \"{executable}\"."
|> Error
Error "Can't find the system shell. Check your system $PATH."

View file

@ -33,6 +33,7 @@
<PackageReference Include="FluentMigrator.Runner.Postgres" Version="6.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageReference Include="Microsoft.Orleans.Core" Version="8.2.0" />
<PackageReference Include="Microsoft.Orleans.Serialization.FSharp" Version="8.2.0" />
<PackageReference Include="Microsoft.Orleans.Server" Version="8.2.0" />
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.2.0" />
<PackageReference Include="NodaTime" Version="3.2.0" />

View file

@ -16,7 +16,7 @@ let string_prefix_active_pattern () =
| _ -> Assert.Pass ()
match "@after job " with
| Prefix "@after" j -> Assert.Pass ()
| Prefix "@after" _ -> Assert.Pass ()
| _ -> Assert.Pass ()
let bj =
@ -29,10 +29,13 @@ let bj =
done_at = None }
let run_function x =
let reduce (cj: CronJob) = (cj.hostname, cj.job_name)
let reduce (cjs: ChainOfJobs) =
let hd = cjs.head |> fun cj -> (cj.hostname, cj.job_name)
let tail = |> (fun cj -> (cj.hostname, cj.job_name))
Cron.sort_cron_jobs now x
|> ( ( reduce))
Cron.sort_jobs now x
|> ( reduce)
|> Pentole.Result.get
@ -92,6 +95,6 @@ let should_fail_no_host () =
Cron.sort_cron_jobs now requirements
Cron.sort_jobs now requirements
|> Result.isError
|> Assert.is_true