bump pentole

This commit is contained in:
bparodi@lezzo.org 2024-12-12 11:35:55 +01:00
parent 631de11088
commit b146b64fcf
11 changed files with 188 additions and 62 deletions

View file

@ -10,3 +10,6 @@ test:
cd tests && dotnet test cd tests && dotnet test
restore: restore:
dotnet restore && cd src && dotnet build dotnet restore && cd src && dotnet build
clean:
rm src/bin src/obj tests/bin tests/obj entrypoint/bin entrypoint/obj -fr

View file

@ -3,7 +3,6 @@ module Bidello.Cron
open NodaTime open NodaTime
open Cronos open Cronos
open Pentole.Path
open Pentole open Pentole
open Datatypes open Datatypes
@ -19,10 +18,10 @@ type CronJobDefinition = {
job_name: string job_name: string
user: string user: string
when_: WhenExpr when_: WhenExpr
executable: Path executable: IPath
args: string list args: string list
environment: (string * string) list environment: (string * string) list
workdir: Path workdir: IPath
hostname: string hostname: string
} }

View file

@ -2,6 +2,7 @@ module Bidello.Database
open System.Reflection open System.Reflection
open System.Threading open System.Threading
open System.Threading.Tasks
open Microsoft.Extensions.DependencyInjection open Microsoft.Extensions.DependencyInjection
open FluentMigrator.Runner open FluentMigrator.Runner
@ -11,15 +12,41 @@ open Dapper
open Datatypes open Datatypes
type OptionHandler<'T>() =
inherit SqlMapper.TypeHandler<option<'T>>()
override _.SetValue(param, value) =
match value with
| Some x -> param.Value <- box x
| None -> param.Value <- null
override _.Parse value =
if isNull value //|| value = box DBNull.Value
then None
else Some (value :?> 'T)
do
SqlMapper.AddTypeHandler (OptionHandler<string>())
SqlMapper.AddTypeHandler (OptionHandler<int>())
let private connstring = let private connstring =
let c = Environment.Environment() let c = Environment.Environment()
$"Server={c.pg_host};Database={c.pg_dbname};" + $"Server={c.pg_host};Database={c.pg_dbname};" +
$"UserId={c.pg_user};Password={c.pg_password};" + $"UserId={c.pg_user};Password={c.pg_password};" +
"Tcp Keepalive=true" "Tcp Keepalive=true"
type t = { // type t = {
connection: NpgsqlConnection // connection: NpgsqlConnection
} // }
type t (conn: NpgsqlConnection) =
member _.connection = conn
interface System.IDisposable with
member _.Dispose () =
conn.Close ()
interface System.IAsyncDisposable with
member _.DisposeAsync () =
conn.CloseAsync () |> ValueTask
let run_migrations (logger: ILogger) = let run_migrations (logger: ILogger) =
@ -54,7 +81,13 @@ let make (logger: ILogger) =
logger.Information "Successfully connected to the database." logger.Information "Successfully connected to the database."
{connection = conn} new t(conn)
let make_from_grain (ct: CancellationToken) = task {
let conn = new NpgsqlConnection (connstring)
do! conn.OpenAsync ct
return new t(conn)
}
let wait_notification (ms: int) (ct: CancellationToken) (db: t) = let wait_notification (ms: int) (ct: CancellationToken) (db: t) =
@ -78,3 +111,25 @@ group by c.job_name, h.hostname """
commandTimeout=nl, commandType=cl, commandTimeout=nl, commandType=cl,
flags=CommandFlags.Buffered, cancellationToken=ct) flags=CommandFlags.Buffered, cancellationToken=ct)
|> db.connection.QueryAsync<Requirements> |> db.connection.QueryAsync<Requirements>
let write_to_backlog (entry: BacklogEntry) (ct: CancellationToken) (db: t) = task {
let query = """INSERT INTO backlog
(job_name, hostname, done_at, started_at, job,
"stdout", stderr, exit_code, failure_msg)
VALUES
(@job_name, @hostname, @done_at, @started_at, @job,
@stdout, @stderr, @exit_code, @failure_msg)"""
let nl = System.Nullable<int> ()
let cl = System.Nullable<System.Data.CommandType> ()
let! res =
new CommandDefinition (query, parameters=entry, transaction=null,
commandTimeout=nl, commandType=cl,
flags=CommandFlags.Buffered, cancellationToken=ct)
|> db.connection.ExecuteAsync
return
match res with
| 1 -> Ok ()
| x -> Error $"Unexpected query return, check the database: {x}|{entry}"
}

View file

@ -213,3 +213,15 @@ type RenameCmdInBacklog () =
override x.Up() = override x.Up() =
x.Rename.Column("cmd").OnTable("backlog").To("job") x.Rename.Column("cmd").OnTable("backlog").To("job")
|> ignore |> ignore
[<Migration(20241203_0002L)>]
type BacklogNoExitCode () =
inherit OnlyUp ()
override x.Up() =
x.Alter.Table("backlog").AlterColumn("exit_code").AsCustom("smallint").Nullable()
|> ignore
x.Create.Column("failure_msg").OnTable("backlog").AsString().Nullable()
|> ignore

View file

@ -3,34 +3,32 @@ module Bidello.Datatypes
open Orleans open Orleans
open NodaTime open NodaTime
open Pentole.Path open Pentole
[<GenerateSerializer>]
type Notification = | Time | Database
[<Immutable>]
[<GenerateSerializer>] [<GenerateSerializer>]
type CronJob = { type CronJob = {
job_name: string job_name: string
user: string user: string
executable: Path executable: IPath
args: string list args: string list
environment: (string * string) list environment: (string * string) list
workdir: Path workdir: IPath
hostname: string hostname: string
// last_completed_at: Instant
} with } with
member x.info_string () = member x.info_string () =
let cwd = x.workdir.ToString () let cwd = x.workdir.string_value
let executable = x.executable.string_value
let vars = let vars =
x.environment match x.environment with
|> List.map (fun (k, v) -> $"{k}='{v}' ") | [] -> "''"
|> String.concat "" | env -> env |> List.map (fun (k, v) -> $"{k}='{v}' ") |> String.concat ""
let args = let args =
x.args x.args
|> String.concat " " |> String.concat " "
let cmd = sprintf "%A %A" x.executable args let cmd = sprintf "%A %A" executable args
sprintf "User: %s\nCwd: %s\nEnvironment: %s\nCommand: %s" x.user cwd vars cmd sprintf "User: '%s' cwd: '%s' environment: %s command: %s" x.user cwd vars cmd
[<Immutable>] [<Immutable>]
@ -40,6 +38,14 @@ type ChainOfJobs = {
jobs: CronJob list jobs: CronJob list
} }
[<Immutable>]
[<GenerateSerializer>]
type RunResult =
| Success of string | Failure of (int * string)
| Unknown of string | NoShell of string
| NoPermissionOnFolder | NoPrivilegeToUser
(* Database types *)
[<CLIMutable>] [<CLIMutable>]
type Requirements = { type Requirements = {
job_name: string job_name: string
@ -53,7 +59,15 @@ type Requirements = {
done_at: System.DateTime option done_at: System.DateTime option
} }
type RunResult = [<CLIMutable>]
| Success of string | Failure of (int * string) type BacklogEntry = {
| Unknown of string | NoShell of string job_name: string
| NoPermissionOnFolder | NoPrivilegeToUser hostname: string
done_at: System.DateTime
started_at: System.DateTime
job: string
stdout: string option
stderr: string option
exit_code: int option
failure_msg: string option
}

View file

@ -4,6 +4,7 @@ open Orleans
open System.Threading open System.Threading
open System.Threading.Tasks open System.Threading.Tasks
open NodaTime open NodaTime
open System
open Bidello.Datatypes open Bidello.Datatypes
open Bidello.Shell open Bidello.Shell
@ -15,11 +16,67 @@ type IShellGrain =
type IDbGrain =
inherit IGrainWithGuidKey
abstract save_backlog: CancellationToken -> Instant * Instant -> CronJob -> RunResult -> ValueTask
type DbGrain () =
inherit Orleans.Grain ()
interface IDbGrain with
override _.save_backlog ct (start_, end_) (job: CronJob) rc =
try
let stdout = match rc with | Success stdout -> Some stdout | _ -> None
let code, stderr =
match rc with
| Failure (c, s) -> (Some c, Some s)
| Success _ -> (Some 0, None)
| _ -> (None, None)
let fmsg =
match rc with
| NoPermissionOnFolder -> Some "No permission on folder"
| NoPrivilegeToUser -> Some "No privilege to switch user"
| Unknown u -> Some $"Unknown failure '{u}'"
| _ -> None
let entry = {
started_at = start_.ToDateTimeUtc()
done_at = end_.ToDateTimeUtc()
stdout = stdout
stderr = stderr
exit_code = code
failure_msg = fmsg
job = job.info_string ()
hostname = job.hostname
job_name = job.job_name
}
let tsk = async {
use! db = Database.make_from_grain ct |> Async.AwaitTask
let! res = Database.write_to_backlog entry ct db |> Async.AwaitTask
return
match res with
| Ok () -> ()
| Error msg -> Logging.logger.Fatal msg
}
tsk |> Async.StartAsTask |> ValueTask
with exn -> printfn "%A" exn; ValueTask ()
type ShellGrain() = type ShellGrain() =
inherit Orleans.Grain () inherit Orleans.Grain ()
interface IShellGrain with interface IShellGrain with
member _.schedule (ct) (jobs: ChainOfJobs) = member x.schedule (ct) (jobs: ChainOfJobs) =
let db_actor = x.GrainFactory.GetGrain<IDbGrain>(Guid.NewGuid())
let log (job: CronJob) = function let log (job: CronJob) = function
| Success _stdout -> | Success _stdout ->
@ -45,33 +102,20 @@ type ShellGrain() =
let start_time = SystemClock.Instance.GetCurrentInstant () let start_time = SystemClock.Instance.GetCurrentInstant ()
let! rc = run_job ct hd |> Async.AwaitTask let! rc = run_job ct hd |> Async.AwaitTask
let end_time = SystemClock.Instance.GetCurrentInstant () let end_time = SystemClock.Instance.GetCurrentInstant ()
log hd rc log hd rc
match rc, tl with db_actor.save_backlog ct (start_time, end_time) hd rc |> ignore
| (Success stdout, hd'::tl')->
printfn "rc = Ok %A" stdout match tl with
return! run_ hd' tl' | hd'::tl'-> return! run_ hd' tl'
| (Success stdout, []) -> | [] -> return ()
printfn "rc = Ok %A" stdout
return ()
| (NoShell reason | Unknown reason), _ ->
printfn "Greve: %A" reason
return ()
| (NoPermissionOnFolder, _) ->
printfn "NO perms on folder"
return ()
| (NoPrivilegeToUser, _) ->
printfn "NO privilege to user"
return ()
| (Failure (_rc, stderr), _) ->
printfn "rc ERror = = stderr %A" stderr
return ()
} }
jobs.jobs jobs.jobs
|> function | [] -> None | hd::tl -> Some (hd, tl) |> function | [] -> None | hd::tl -> Some (hd, tl)
|> Option.map (fun jobs -> |> Option.map (fun jobs ->
let tsk = jobs ||> run_ let tsk = jobs ||> run_
Async.StartAsTask (tsk, TaskCreationOptions.LongRunning, ct) Async.StartAsTask (tsk, TaskCreationOptions.None, ct)
|> ValueTask) |> ValueTask)
|> Option.defaultValue (ValueTask ()) |> Option.defaultValue (ValueTask ())
(*TODO: try block?*)

View file

@ -24,7 +24,7 @@ type Bidello(client: IClusterClient) =
override _this.ExecuteAsync(ct: CancellationToken) = override _this.ExecuteAsync(ct: CancellationToken) =
let rnd = new Random (2) let rnd = new Random (2)
let db = Database.make logger let db = Database.make logger (* long lived, don't close *)
let schedule_jobs (jobs: ChainOfJobs) = let schedule_jobs (jobs: ChainOfJobs) =
let runner = rnd.Next () |> client.GetGrain<IShellGrain> let runner = rnd.Next () |> client.GetGrain<IShellGrain>

View file

@ -83,9 +83,8 @@ let which (executable: string) =
let run_job (ct: CancellationToken) (cj: CronJob) = task { let run_job (ct: CancellationToken) (cj: CronJob) = task {
let workdir = cj.workdir.string_value + "/"
let workdir = cj.workdir |> function Absolute a -> a + "/" let executable = cj.executable.string_value
let executable = cj.executable |> function Absolute a -> a
let user = cj.user let user = cj.user
let env = let env =
cj.environment cj.environment

View file

@ -16,9 +16,9 @@
<Compile Include="Datatypes.fs" /> <Compile Include="Datatypes.fs" />
<Compile Include="Logging.fs" /> <Compile Include="Logging.fs" />
<Compile Include="Shell.fs" /> <Compile Include="Shell.fs" />
<Compile Include="Grains.fs" />
<Compile Include="DatabaseMigrations.fs" /> <Compile Include="DatabaseMigrations.fs" />
<Compile Include="Database.fs" /> <Compile Include="Database.fs" />
<Compile Include="Grains.fs" />
<Compile Include="Cron.fs" /> <Compile Include="Cron.fs" />
<Compile Include="Library.fs" /> <Compile Include="Library.fs" />
</ItemGroup> </ItemGroup>
@ -38,7 +38,7 @@
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.2.0" /> <PackageReference Include="Microsoft.Orleans.Streaming" Version="8.2.0" />
<PackageReference Include="NodaTime" Version="3.2.0" /> <PackageReference Include="NodaTime" Version="3.2.0" />
<PackageReference Include="Npgsql" Version="8.0.5" /> <PackageReference Include="Npgsql" Version="8.0.5" />
<PackageReference Include="Pentole" Version="0.0.3" /> <PackageReference Include="Pentole" Version="0.0.4" />
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" /> <PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" /> <PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" /> <PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />

View file

@ -46,7 +46,7 @@ let job_deps_simple () =
let expected = [[("h1", "j1")]] let expected = [[("h1", "j1")]]
Assert.are_seq_equal expected cjs setEqual expected cjs
[<Test>] [<Test>]
let job_deps () = let job_deps () =
@ -61,7 +61,7 @@ let job_deps () =
let cjs = run_function requirements let cjs = run_function requirements
let expected = [[("h1", "j1"); ("h1", "j1_after")]; [("h2", "j1")]; [("h1", "j2")]] let expected = [[("h1", "j1"); ("h1", "j1_after")]; [("h2", "j1")]; [("h1", "j2")]]
Assert.are_seq_equal expected cjs setEqual expected cjs
[<Test>] [<Test>]
let job_deps2 () = let job_deps2 () =
@ -79,7 +79,7 @@ let job_deps2 () =
let expected = [[("h1", "j1"); ("h1", "j1_after")]; let expected = [[("h1", "j1"); ("h1", "j1_after")];
[("h1", "j2"); ("h1", "j2_after")]; [("h1", "j2"); ("h1", "j2_after")];
[("h2", "j1")]] [("h2", "j1")]]
Assert.are_seq_equal expected cjs setEqual expected cjs
[<Test>] [<Test>]
let should_fail_no_host () = let should_fail_no_host () =
@ -95,7 +95,7 @@ let should_fail_no_host () =
Cron.sort_jobs now requirements Cron.sort_jobs now requirements
|> Result.isError |> Result.isError
|> Assert.is_true |> isTrue
[<Test>] [<Test>]
let job_deps_chain0 () = let job_deps_chain0 () =
@ -112,7 +112,7 @@ let job_deps_chain0 () =
let expected = [[("h1", "j1"); ("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; let expected = [[("h1", "j1"); ("h1", "j2_after_j1"); ("h1", "j3_after_j2")];
[("h2", "j1")]] [("h2", "j1")]]
Assert.are_seq_equal expected cjs setEqual expected cjs
[<Test>] [<Test>]
let job_deps_chain1 () = let job_deps_chain1 () =
@ -131,7 +131,7 @@ let job_deps_chain1 () =
("h1", "j2_after_j1"); ("h1", "j3_after_j2")]; ("h1", "j2_after_j1"); ("h1", "j3_after_j2")];
[("h2", "j1")]] [("h2", "j1")]]
Assert.are_seq_equal expected cjs setEqual expected cjs
[<Test>] [<Test>]
let job_deps_chain_failure () = let job_deps_chain_failure () =
@ -142,4 +142,4 @@ let job_deps_chain_failure () =
Cron.sort_jobs now requirements Cron.sort_jobs now requirements
|> Result.isError |> Result.isError
|> Assert.is_true |> isTrue

View file

@ -15,11 +15,11 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" /> <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="NUnit" Version="4.2.1" /> <PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" /> <PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="NUnit.Analyzers" Version="3.6.1" /> <PackageReference Include="NUnit.Analyzers" Version="3.6.1" />
<PackageReference Include="coverlet.collector" Version="6.0.0" /> <PackageReference Include="coverlet.collector" Version="6.0.0" />
<PackageReference Include="Pentole" Version="0.0.3" /> <PackageReference Include="Pentole" Version="0.0.4" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>