diff --git a/Makefile b/Makefile index 187eb53..ebaf115 100644 --- a/Makefile +++ b/Makefile @@ -10,3 +10,6 @@ test: cd tests && dotnet test restore: dotnet restore && cd src && dotnet build + +clean: + rm src/bin src/obj tests/bin tests/obj entrypoint/bin entrypoint/obj -fr diff --git a/src/Cron.fs b/src/Cron.fs index 2f954f4..b0bea80 100644 --- a/src/Cron.fs +++ b/src/Cron.fs @@ -3,7 +3,6 @@ module Bidello.Cron open NodaTime open Cronos -open Pentole.Path open Pentole open Datatypes @@ -19,10 +18,10 @@ type CronJobDefinition = { job_name: string user: string when_: WhenExpr - executable: Path + executable: IPath args: string list environment: (string * string) list - workdir: Path + workdir: IPath hostname: string } diff --git a/src/Database.fs b/src/Database.fs index c7c2a4b..d2c20eb 100644 --- a/src/Database.fs +++ b/src/Database.fs @@ -2,6 +2,7 @@ module Bidello.Database open System.Reflection open System.Threading +open System.Threading.Tasks open Microsoft.Extensions.DependencyInjection open FluentMigrator.Runner @@ -11,15 +12,41 @@ open Dapper open Datatypes +type OptionHandler<'T>() = + inherit SqlMapper.TypeHandler>() + + override _.SetValue(param, value) = + match value with + | Some x -> param.Value <- box x + | None -> param.Value <- null + + override _.Parse value = + if isNull value //|| value = box DBNull.Value + then None + else Some (value :?> 'T) +do + SqlMapper.AddTypeHandler (OptionHandler()) + SqlMapper.AddTypeHandler (OptionHandler()) + + let private connstring = let c = Environment.Environment() $"Server={c.pg_host};Database={c.pg_dbname};" + $"UserId={c.pg_user};Password={c.pg_password};" + "Tcp Keepalive=true" -type t = { - connection: NpgsqlConnection -} +// type t = { + // connection: NpgsqlConnection +// } + +type t (conn: NpgsqlConnection) = + member _.connection = conn + interface System.IDisposable with + member _.Dispose () = + conn.Close () + interface System.IAsyncDisposable with + member _.DisposeAsync () = + conn.CloseAsync () |> ValueTask let run_migrations (logger: ILogger) = @@ -54,7 +81,13 @@ let make (logger: ILogger) = logger.Information "Successfully connected to the database." - {connection = conn} + new t(conn) + +let make_from_grain (ct: CancellationToken) = task { + let conn = new NpgsqlConnection (connstring) + do! conn.OpenAsync ct + return new t(conn) +} let wait_notification (ms: int) (ct: CancellationToken) (db: t) = @@ -78,3 +111,25 @@ group by c.job_name, h.hostname """ commandTimeout=nl, commandType=cl, flags=CommandFlags.Buffered, cancellationToken=ct) |> db.connection.QueryAsync + +let write_to_backlog (entry: BacklogEntry) (ct: CancellationToken) (db: t) = task { + let query = """INSERT INTO backlog + (job_name, hostname, done_at, started_at, job, + "stdout", stderr, exit_code, failure_msg) + VALUES + (@job_name, @hostname, @done_at, @started_at, @job, + @stdout, @stderr, @exit_code, @failure_msg)""" + + let nl = System.Nullable () + let cl = System.Nullable () + let! res = + new CommandDefinition (query, parameters=entry, transaction=null, + commandTimeout=nl, commandType=cl, + flags=CommandFlags.Buffered, cancellationToken=ct) + |> db.connection.ExecuteAsync + + return + match res with + | 1 -> Ok () + | x -> Error $"Unexpected query return, check the database: {x}|{entry}" +} diff --git a/src/DatabaseMigrations.fs b/src/DatabaseMigrations.fs index 1d6be91..d3de637 100644 --- a/src/DatabaseMigrations.fs +++ b/src/DatabaseMigrations.fs @@ -213,3 +213,15 @@ type RenameCmdInBacklog () = override x.Up() = x.Rename.Column("cmd").OnTable("backlog").To("job") |> ignore + + +[] +type BacklogNoExitCode () = + + inherit OnlyUp () + override x.Up() = + x.Alter.Table("backlog").AlterColumn("exit_code").AsCustom("smallint").Nullable() + |> ignore + + x.Create.Column("failure_msg").OnTable("backlog").AsString().Nullable() + |> ignore diff --git a/src/Datatypes.fs b/src/Datatypes.fs index f8714a3..e7c22ab 100644 --- a/src/Datatypes.fs +++ b/src/Datatypes.fs @@ -3,34 +3,32 @@ module Bidello.Datatypes open Orleans open NodaTime -open Pentole.Path - -[] -type Notification = | Time | Database +open Pentole +[] [] type CronJob = { job_name: string user: string - executable: Path + executable: IPath args: string list environment: (string * string) list - workdir: Path + workdir: IPath hostname: string - // last_completed_at: Instant } with member x.info_string () = - let cwd = x.workdir.ToString () + let cwd = x.workdir.string_value + let executable = x.executable.string_value let vars = - x.environment - |> List.map (fun (k, v) -> $"{k}='{v}' ") - |> String.concat "" + match x.environment with + | [] -> "''" + | env -> env |> List.map (fun (k, v) -> $"{k}='{v}' ") |> String.concat "" let args = x.args |> String.concat " " - let cmd = sprintf "%A %A" x.executable args + let cmd = sprintf "%A %A" executable args - sprintf "User: %s\nCwd: %s\nEnvironment: %s\nCommand: %s" x.user cwd vars cmd + sprintf "User: '%s' cwd: '%s' environment: %s command: %s" x.user cwd vars cmd [] @@ -40,6 +38,14 @@ type ChainOfJobs = { jobs: CronJob list } +[] +[] +type RunResult = + | Success of string | Failure of (int * string) + | Unknown of string | NoShell of string + | NoPermissionOnFolder | NoPrivilegeToUser + +(* Database types *) [] type Requirements = { job_name: string @@ -53,7 +59,15 @@ type Requirements = { done_at: System.DateTime option } -type RunResult = - | Success of string | Failure of (int * string) - | Unknown of string | NoShell of string - | NoPermissionOnFolder | NoPrivilegeToUser +[] +type BacklogEntry = { + job_name: string + hostname: string + done_at: System.DateTime + started_at: System.DateTime + job: string + stdout: string option + stderr: string option + exit_code: int option + failure_msg: string option +} diff --git a/src/Grains.fs b/src/Grains.fs index e72cc85..0f56eb2 100644 --- a/src/Grains.fs +++ b/src/Grains.fs @@ -4,6 +4,7 @@ open Orleans open System.Threading open System.Threading.Tasks open NodaTime +open System open Bidello.Datatypes open Bidello.Shell @@ -15,11 +16,67 @@ type IShellGrain = +type IDbGrain = + inherit IGrainWithGuidKey + abstract save_backlog: CancellationToken -> Instant * Instant -> CronJob -> RunResult -> ValueTask + +type DbGrain () = + inherit Orleans.Grain () + + interface IDbGrain with + override _.save_backlog ct (start_, end_) (job: CronJob) rc = + try + let stdout = match rc with | Success stdout -> Some stdout | _ -> None + + let code, stderr = + match rc with + | Failure (c, s) -> (Some c, Some s) + | Success _ -> (Some 0, None) + | _ -> (None, None) + + let fmsg = + match rc with + | NoPermissionOnFolder -> Some "No permission on folder" + | NoPrivilegeToUser -> Some "No privilege to switch user" + | Unknown u -> Some $"Unknown failure '{u}'" + | _ -> None + + + let entry = { + started_at = start_.ToDateTimeUtc() + done_at = end_.ToDateTimeUtc() + + stdout = stdout + stderr = stderr + exit_code = code + failure_msg = fmsg + + job = job.info_string () + hostname = job.hostname + job_name = job.job_name + } + + let tsk = async { + use! db = Database.make_from_grain ct |> Async.AwaitTask + let! res = Database.write_to_backlog entry ct db |> Async.AwaitTask + + return + match res with + | Ok () -> () + | Error msg -> Logging.logger.Fatal msg + } + tsk |> Async.StartAsTask |> ValueTask + + with exn -> printfn "%A" exn; ValueTask () + + type ShellGrain() = inherit Orleans.Grain () interface IShellGrain with - member _.schedule (ct) (jobs: ChainOfJobs) = + member x.schedule (ct) (jobs: ChainOfJobs) = + + let db_actor = x.GrainFactory.GetGrain(Guid.NewGuid()) let log (job: CronJob) = function | Success _stdout -> @@ -45,33 +102,20 @@ type ShellGrain() = let start_time = SystemClock.Instance.GetCurrentInstant () let! rc = run_job ct hd |> Async.AwaitTask let end_time = SystemClock.Instance.GetCurrentInstant () - + log hd rc - match rc, tl with - | (Success stdout, hd'::tl')-> - printfn "rc = Ok %A" stdout - return! run_ hd' tl' - | (Success stdout, []) -> - printfn "rc = Ok %A" stdout - return () - | (NoShell reason | Unknown reason), _ -> - printfn "Greve: %A" reason - return () - | (NoPermissionOnFolder, _) -> - printfn "NO perms on folder" - return () - | (NoPrivilegeToUser, _) -> - printfn "NO privilege to user" - return () - | (Failure (_rc, stderr), _) -> - printfn "rc ERror = = stderr %A" stderr - return () + db_actor.save_backlog ct (start_time, end_time) hd rc |> ignore + + match tl with + | hd'::tl'-> return! run_ hd' tl' + | [] -> return () } - + jobs.jobs |> function | [] -> None | hd::tl -> Some (hd, tl) |> Option.map (fun jobs -> let tsk = jobs ||> run_ - Async.StartAsTask (tsk, TaskCreationOptions.LongRunning, ct) + Async.StartAsTask (tsk, TaskCreationOptions.None, ct) |> ValueTask) |> Option.defaultValue (ValueTask ()) +(*TODO: try block?*) diff --git a/src/Library.fs b/src/Library.fs index 8b0a706..ed1f9c8 100644 --- a/src/Library.fs +++ b/src/Library.fs @@ -24,7 +24,7 @@ type Bidello(client: IClusterClient) = override _this.ExecuteAsync(ct: CancellationToken) = let rnd = new Random (2) - let db = Database.make logger + let db = Database.make logger (* long lived, don't close *) let schedule_jobs (jobs: ChainOfJobs) = let runner = rnd.Next () |> client.GetGrain diff --git a/src/Shell.fs b/src/Shell.fs index be928cc..0c35192 100644 --- a/src/Shell.fs +++ b/src/Shell.fs @@ -83,9 +83,8 @@ let which (executable: string) = let run_job (ct: CancellationToken) (cj: CronJob) = task { - - let workdir = cj.workdir |> function Absolute a -> a + "/" - let executable = cj.executable |> function Absolute a -> a + let workdir = cj.workdir.string_value + "/" + let executable = cj.executable.string_value let user = cj.user let env = cj.environment diff --git a/src/src.fsproj b/src/src.fsproj index 03f0fd4..e43cc44 100644 --- a/src/src.fsproj +++ b/src/src.fsproj @@ -16,9 +16,9 @@ - + @@ -38,7 +38,7 @@ - + diff --git a/tests/UnitTest1.fs b/tests/UnitTest1.fs index e335280..66aad8a 100644 --- a/tests/UnitTest1.fs +++ b/tests/UnitTest1.fs @@ -46,7 +46,7 @@ let job_deps_simple () = let expected = [[("h1", "j1")]] - Assert.are_seq_equal expected cjs + setEqual expected cjs [] let job_deps () = @@ -61,7 +61,7 @@ let job_deps () = let cjs = run_function requirements let expected = [[("h1", "j1"); ("h1", "j1_after")]; [("h2", "j1")]; [("h1", "j2")]] - Assert.are_seq_equal expected cjs + setEqual expected cjs [] let job_deps2 () = @@ -79,7 +79,7 @@ let job_deps2 () = let expected = [[("h1", "j1"); ("h1", "j1_after")]; [("h1", "j2"); ("h1", "j2_after")]; [("h2", "j1")]] - Assert.are_seq_equal expected cjs + setEqual expected cjs [] let should_fail_no_host () = @@ -95,7 +95,7 @@ let should_fail_no_host () = Cron.sort_jobs now requirements |> Result.isError - |> Assert.is_true + |> isTrue [] let job_deps_chain0 () = @@ -112,7 +112,7 @@ let job_deps_chain0 () = let expected = [[("h1", "j1"); ("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; [("h2", "j1")]] - Assert.are_seq_equal expected cjs + setEqual expected cjs [] let job_deps_chain1 () = @@ -131,7 +131,7 @@ let job_deps_chain1 () = ("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; [("h2", "j1")]] - Assert.are_seq_equal expected cjs + setEqual expected cjs [] let job_deps_chain_failure () = @@ -142,4 +142,4 @@ let job_deps_chain_failure () = Cron.sort_jobs now requirements |> Result.isError - |> Assert.is_true + |> isTrue diff --git a/tests/tests.fsproj b/tests/tests.fsproj index ac00cf8..780d58d 100644 --- a/tests/tests.fsproj +++ b/tests/tests.fsproj @@ -15,11 +15,11 @@ - + - +