first
This commit is contained in:
commit
ef97881270
18 changed files with 1055 additions and 0 deletions
110
.gitignore
vendored
Normal file
110
.gitignore
vendored
Normal file
|
@ -0,0 +1,110 @@
|
|||
*.secret
|
||||
|
||||
## Ignore Visual Studio temporary files, build results, and
|
||||
## files generated by popular Visual Studio add-ons.
|
||||
##
|
||||
## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore
|
||||
|
||||
# User-specific files
|
||||
*.suo
|
||||
*.user
|
||||
*.userosscache
|
||||
*.sln.docstates
|
||||
|
||||
# Build results
|
||||
[Dd]ebug/
|
||||
[Dd]ebugPublic/
|
||||
[Rr]elease/
|
||||
x64/
|
||||
x86/
|
||||
bld/
|
||||
[Bb]in/
|
||||
[Oo]bj/
|
||||
[Ll]og/
|
||||
|
||||
# .NET Core
|
||||
project.lock.json
|
||||
project.fragment.lock.json
|
||||
artifacts/
|
||||
**/Properties/launchSettings.json
|
||||
|
||||
*_i.c
|
||||
*_p.c
|
||||
*_i.h
|
||||
*.ilk
|
||||
*.meta
|
||||
*.obj
|
||||
*.pch
|
||||
*.pdb
|
||||
*.pgc
|
||||
*.pgd
|
||||
*.rsp
|
||||
*.sbr
|
||||
*.tlb
|
||||
*.tli
|
||||
*.tlh
|
||||
*.tmp
|
||||
*.tmp_proj
|
||||
*.log
|
||||
*.vspscc
|
||||
*.vssscc
|
||||
.builds
|
||||
*.pidb
|
||||
*.svclog
|
||||
*.scc
|
||||
|
||||
# Chutzpah Test files
|
||||
_Chutzpah*
|
||||
|
||||
# Visual C++ cache files
|
||||
ipch/
|
||||
*.aps
|
||||
*.ncb
|
||||
*.opendb
|
||||
*.opensdf
|
||||
*.sdf
|
||||
*.cachefile
|
||||
*.VC.db
|
||||
*.VC.VC.opendb
|
||||
|
||||
# Visual Studio profiler
|
||||
*.psess
|
||||
*.vsp
|
||||
*.vspx
|
||||
*.sap
|
||||
|
||||
# TFS 2012 Local Workspace
|
||||
$tf/
|
||||
|
||||
# Guidance Automation Toolkit
|
||||
*.gpState
|
||||
|
||||
# ReSharper is a .NET coding add-in
|
||||
_ReSharper*/
|
||||
*.[Rr]e[Ss]harper
|
||||
*.DotSettings.user
|
||||
|
||||
|
||||
publish/
|
||||
|
||||
*.[Pp]ublish.xml
|
||||
*.azurePubxml
|
||||
# TODO: Comment the next line if you want to checkin your web deploy settings
|
||||
# but database connection strings (with potential passwords) will be unencrypted
|
||||
*.pubxml
|
||||
*.publishproj
|
||||
|
||||
# Microsoft Azure Web App publish settings. Comment the next line if you want to
|
||||
# checkin your Azure Web App publish settings, but sensitive information contained
|
||||
# in these scripts will be unencrypted
|
||||
PublishScripts/
|
||||
|
||||
# NuGet Packages
|
||||
*.nupkg
|
||||
# The packages folder can be ignored because of Package Restore
|
||||
**/packages/*
|
||||
# except build/, which is used as an MSBuild target.
|
||||
!**/packages/build/
|
||||
# Python Tools for Visual Studio (PTVS)
|
||||
__pycache__/
|
||||
*.pyc
|
10
Makefile
Normal file
10
Makefile
Normal file
|
@ -0,0 +1,10 @@
|
|||
USER := $(shell cat user.secret)
|
||||
PASSWORD := $(shell cat password.secret)
|
||||
HOST := $(shell cat host.secret)
|
||||
|
||||
build:
|
||||
cd src && dotnet build
|
||||
run:
|
||||
cd entrypoint && dotnet run -- --user ${USER} --password ${PASSWORD} -H ${HOST}
|
||||
test:
|
||||
dotnet test
|
14
bidello.sln
Normal file
14
bidello.sln
Normal file
|
@ -0,0 +1,14 @@
|
|||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio Version 17
|
||||
VisualStudioVersion = 17.0.31903.59
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Release|Any CPU = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
EndGlobal
|
39
entrypoint/Program.cs
Normal file
39
entrypoint/Program.cs
Normal file
|
@ -0,0 +1,39 @@
|
|||
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Orleans;
|
||||
using Orleans.Hosting;
|
||||
using Serilog;
|
||||
using Orleans.Serialization;
|
||||
using Orleans.Serialization.Buffers;
|
||||
using Orleans.Serialization.Cloning;
|
||||
using Orleans.Serialization.Serializers;
|
||||
using Orleans.Serialization.WireProtocol;
|
||||
|
||||
using static Bidello.Grains;
|
||||
[assembly: GenerateCodeForDeclaringAssembly(typeof(Bidello.Grains.ShellGrain))]
|
||||
namespace Bidello.Entrypoint;
|
||||
|
||||
// [assembly: KnownType(typeof(Verbale.Grains.LoggingGrain))] [assembly: KnownAssembly(typeof(Verbale.Grains.LoggingGrain))]
|
||||
|
||||
|
||||
class Entrypoint {
|
||||
static void Main(string[] args) {
|
||||
var logger = new LoggerConfiguration()
|
||||
.MinimumLevel.Debug()
|
||||
.WriteTo.Console()
|
||||
.CreateLogger();
|
||||
var host = new HostBuilder()
|
||||
.UseOrleans(builder =>
|
||||
{
|
||||
builder.UseLocalhostClustering()
|
||||
.ConfigureLogging(s => s.AddSerilog().AddConsole())
|
||||
;
|
||||
})
|
||||
.UseConsoleLifetime();
|
||||
|
||||
Bidello.Main.main(args, host);
|
||||
|
||||
}
|
||||
}
|
20
entrypoint/entrypoint.csproj
Normal file
20
entrypoint/entrypoint.csproj
Normal file
|
@ -0,0 +1,20 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\src\src.fsproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Orleans.Core" Version="8.2.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Server" Version="8.2.0" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<ImplicitUsings>enable</ImplicitUsings>
|
||||
<Nullable>enable</Nullable>
|
||||
</PropertyGroup>
|
||||
|
||||
</Project>
|
21
entrypoint/script.gnuplot
Normal file
21
entrypoint/script.gnuplot
Normal file
|
@ -0,0 +1,21 @@
|
|||
# set terminal pngcairo transparent enhanced font "arial,10" fontscale 1.0 size 600, 400
|
||||
# set output 'scatter.5.png'
|
||||
set dgrid3d 10,10 qnorm 16
|
||||
set dummy u, v
|
||||
set key fixed right top vertical Right noreverse enhanced autotitle box lt black linewidth 1.000 dashtype solid
|
||||
set parametric
|
||||
set contour base
|
||||
set style data lines
|
||||
set title "Simple demo of scatter data conversion to grid data"
|
||||
set xlabel "data style lines, dgrid3d qnorm 16, contour"
|
||||
set xrange [ * : * ] noreverse writeback
|
||||
set x2range [ * : * ] noreverse writeback
|
||||
set yrange [ * : * ] noreverse writeback
|
||||
set y2range [ * : * ] noreverse writeback
|
||||
set zrange [ * : * ] noreverse writeback
|
||||
set cbrange [ * : * ] noreverse writeback
|
||||
set rrange [ * : * ] noreverse writeback
|
||||
set colorbox vertical origin screen 0.9, 0.2 size screen 0.05, 0.6 front noinvert bdefault
|
||||
NO_ANIMATION = 1
|
||||
## Last datafile plotted: "hemisphr.dat"
|
||||
splot "hemisphr.dat"
|
161
src/Cron.fs
Normal file
161
src/Cron.fs
Normal file
|
@ -0,0 +1,161 @@
|
|||
module Bidello.Cron
|
||||
|
||||
open NodaTime
|
||||
open Cronos
|
||||
|
||||
open Pentole.Path
|
||||
open Pentole
|
||||
open Datatypes
|
||||
|
||||
open Pentole.String
|
||||
|
||||
type User = string
|
||||
|
||||
let rnd = new System.Random 2
|
||||
let local_tz = DateTimeZoneProviders.Tzdb.GetSystemDefault ()
|
||||
let local_tz_net = System.TimeZoneInfo.Local
|
||||
|
||||
let private rand_int incl_ excl_ = rnd.NextInt64 (incl_, excl_)
|
||||
|
||||
let private parse_expr (now: Instant) text =
|
||||
let to_cron text =
|
||||
let mutable c: CronExpression = Unchecked.defaultof<CronExpression>
|
||||
match CronExpression.TryParse (text, &c) with
|
||||
| true -> Ok c
|
||||
| false -> Error $"Can't parse cron expression: {text}"
|
||||
|
||||
let to_pattern text =
|
||||
match Pentole.String.split " " text |> List.head with
|
||||
| Prefix "@after" job -> After job |> Ok
|
||||
| Prefix "@before" job -> Before job |> Ok
|
||||
| _ -> Error $"Can't parse as pattern: {text}"
|
||||
|
||||
match to_cron text, to_pattern text with
|
||||
| Error e, Error _ -> Error e
|
||||
| _, Ok p -> Ok (Pattern p)
|
||||
| Ok cron_expr, _ ->
|
||||
(now.ToDateTimeOffset(), local_tz_net)
|
||||
|> Result.protect (cron_expr.GetNextOccurrence)
|
||||
|> Result.mapError (_.Message)
|
||||
|> Result.bind (fun i ->
|
||||
if not i.HasValue then Error $"invalid cron expression: {text}"
|
||||
else Instant.FromDateTimeOffset i.Value |> Cron |> Ok)
|
||||
(*
|
||||
let private schedule (pt: PatternType) (now: Instant) (done_at: Instant option) =
|
||||
let now' = now.InZone local_tz
|
||||
if done_at.IsNone then
|
||||
let last_possible_moment =
|
||||
match pt with
|
||||
| After _ | Before _ ->
|
||||
"CronJobs with patterns should be in a different partition"
|
||||
|> System.InvalidOperationException |> raise
|
||||
| Hourly ->
|
||||
now'.Minute + (60 - now'.Minute) |> int64 |> Duration.FromMinutes
|
||||
| Daily ->
|
||||
now'.Hour + (24 - now'.Hour) |> Duration.FromHours
|
||||
| Weekly ->
|
||||
let sunday = now'.LocalDateTime.With(DateAdjusters.NextOrSame(IsoDayOfWeek.Sunday))
|
||||
sunday
|
||||
| Monthly ->
|
||||
let end_of_the_month = now'.LocalDateTime.With DateAdjusters.EndOfMonth
|
||||
end_of_the_month
|
||||
let ts = dur.TotalSeconds |> int
|
||||
rand_int 0 ts
|
||||
|
||||
else failwith ""
|
||||
|
||||
|
||||
let make (now: Instant) (env: Database.EnvVarsEntry list) (src: Database.CronTableEntry) =
|
||||
let resolve_ path =
|
||||
Path.of_string path
|
||||
|> Result.bind (FileSystem.resolve)
|
||||
|> Result.mapError (fun e -> $"{e} for path {path}")
|
||||
|
||||
let get_env (job_name: string) =
|
||||
env
|
||||
|> List.filter (fun e -> e.job_name = job_name)
|
||||
|> List.map (fun e -> (e.value, e.variable))
|
||||
|
||||
|
||||
(src.executable, src.workdir)
|
||||
|> Result.pairwise_map resolve_
|
||||
|> Result.bind (fun (what, where) ->
|
||||
parse_expr now src.``when`` |> Result.map (fun when_ -> (what, where, when_)))
|
||||
|> Result.map (fun (what, where, when_) -> {
|
||||
job_name = src.job_name
|
||||
user = src.user
|
||||
when_ = when_
|
||||
executable = what
|
||||
args = src.args |> List.ofArray
|
||||
environment = get_env src.job_name
|
||||
workdir = where
|
||||
hostname = src.hostname })
|
||||
|> Result.mapError (fun e -> (src.job_name, e))
|
||||
|
||||
|
||||
let build_sorted_jobs_table (logger: Serilog.ILogger) (hostname: string) (now: Instant)
|
||||
(requirements: Database.Requirements) =
|
||||
let cronjobs =
|
||||
requirements.cron
|
||||
|> List.map (Cron.make now requirements.environment)
|
||||
|> List.choose (function
|
||||
| Ok cj when cj.hostname <> hostname -> None
|
||||
| Ok cronjob -> Some cronjob
|
||||
| Error (j, e) ->
|
||||
logger.Warning $"Invalid job definition {j}, reason: {e}"
|
||||
None)
|
||||
|
||||
let has_deps = function
|
||||
| {when_=Pattern (After _ | Before _)} -> true
|
||||
| {when_=Cron _} -> false
|
||||
| {when_=Pattern (Hourly | Daily | Weekly | Monthly)} -> false
|
||||
|
||||
let resolve_ts = function
|
||||
| {when_=Pattern (After _ | Before _)} ->
|
||||
"CronJobs with patterns should be in a different partition"
|
||||
|> System.InvalidOperationException |> raise
|
||||
|
||||
| {when_=Cron x} as cj -> (x, cj)
|
||||
| {when_=Pattern pt} as cj ->
|
||||
cj.job_name
|
||||
|> requirements.backlog.TryFind
|
||||
|> schedule pt now
|
||||
|
||||
|
||||
|
||||
|
||||
let standalone, with_deps =
|
||||
cronjobs
|
||||
|> List.partition has_deps
|
||||
|
||||
let standalone =
|
||||
standalone
|
||||
|> List.map resolve_ts
|
||||
|
||||
let children =
|
||||
with_deps
|
||||
|> List.choose (function
|
||||
| {when_=Pattern (Before _child)} as parent -> Some parent
|
||||
| _ -> None)
|
||||
|> List.groupBy (function | {when_=Pattern (After child)} -> child)
|
||||
|> Map.ofList
|
||||
|
||||
let parents =
|
||||
with_deps
|
||||
|> List.choose (function
|
||||
| {when_=Pattern (After _parent)} as child -> Some child
|
||||
| _ -> None)
|
||||
|> List.groupBy (function | {when_=Pattern (After parent)} -> parent)
|
||||
|> Map.ofList
|
||||
(*
|
||||
let rec sort acc : CronJob list -> CronJob list = function
|
||||
| [] -> acc
|
||||
| x::rest ->
|
||||
let parents_of_x = parents |> Map.tryFind x.hostname |> Option.defaultValue []
|
||||
let children_of_x = children |> Map.tryFind x.hostname |> Option.defaultValue []
|
||||
let current = parents_of_x @ [x] @ children_of_x
|
||||
|
||||
let acc' = System.Diagnostics.Trace.Assert
|
||||
*)
|
||||
parents
|
||||
*)
|
89
src/Database.fs
Normal file
89
src/Database.fs
Normal file
|
@ -0,0 +1,89 @@
|
|||
module Bidello.Database
|
||||
|
||||
open System.Reflection
|
||||
open System.Threading
|
||||
|
||||
open Microsoft.Extensions.DependencyInjection
|
||||
open FluentMigrator.Runner
|
||||
open Serilog
|
||||
open Npgsql
|
||||
open Dapper
|
||||
|
||||
let private connstring =
|
||||
let c = Environment.Environment()
|
||||
$"Server={c.pg_host};Database={c.pg_dbname};" +
|
||||
$"UserId={c.pg_user};Password={c.pg_password};" +
|
||||
"Tcp Keepalive=true"
|
||||
|
||||
type t = {
|
||||
connection: NpgsqlConnection
|
||||
}
|
||||
|
||||
let run_migrations (logger: ILogger) =
|
||||
|
||||
let assembly =
|
||||
Assembly.GetAssembly(typeof<Bidello.DatabaseMigrations.Init>)
|
||||
|
||||
|
||||
let s =
|
||||
ServiceCollection()
|
||||
.AddFluentMigratorCore()
|
||||
.ConfigureRunner(fun c ->
|
||||
c
|
||||
.AddPostgres().WithGlobalConnectionString(connstring)
|
||||
.ScanIn([|assembly|]).For.Migrations() |> ignore)
|
||||
// .Configure<RunnerOptions>(fun (opts: RunnerOptions) -> opts.Tags <- [| tag |])
|
||||
.BuildServiceProvider false
|
||||
let runner = s.GetRequiredService<IMigrationRunner>()
|
||||
logger.Information "Running database migrations"
|
||||
runner.MigrateUp()
|
||||
|
||||
|
||||
let make (logger: ILogger) =
|
||||
let conn = new NpgsqlConnection (connstring)
|
||||
conn.Open()
|
||||
let n: NotificationEventHandler =
|
||||
new NotificationEventHandler(fun (_o: obj) -> ignore)
|
||||
|
||||
conn.Notification.AddHandler(n)
|
||||
|
||||
use listen = new NpgsqlCommand ("LISTEN bidello_database_update;", conn)
|
||||
listen.ExecuteNonQuery () |> ignore
|
||||
|
||||
{connection = conn}
|
||||
|
||||
|
||||
let wait_notification (ct: CancellationToken) (db: t) =
|
||||
db.connection.WaitAsync ct
|
||||
|
||||
type Requirements_ = {
|
||||
job_name: string
|
||||
``when``: string
|
||||
executable: string
|
||||
user: string
|
||||
workdir: string
|
||||
hostname: string
|
||||
args: string array
|
||||
environment: string
|
||||
done_at: System.DateTime option
|
||||
}
|
||||
|
||||
let gather_requirements (hostname: string) (ct: CancellationToken) (db: t) =
|
||||
let query = """select
|
||||
c.job_name, c."when", c.executable, c.user, c.workdir, c.args, h.hostname,
|
||||
STRING_AGG(e.variable || '=' || e.value, '|') AS environment_variables,
|
||||
max(b.done_at) as done_at
|
||||
from cron c
|
||||
left join environment e on c.job_name = e.job_name
|
||||
left join backlog b on b.job_name = c.job_name and b.hostname = @hostname
|
||||
join hosts h on h.job_name = c.job_name
|
||||
where h.hostname = @hostname
|
||||
group by c.job_name, h.hostname """
|
||||
|
||||
let nl = System.Nullable<int> ()
|
||||
let cl = System.Nullable<System.Data.CommandType> ()
|
||||
let param = {|hostname = hostname|}
|
||||
new CommandDefinition (query, parameters=param, transaction=null,
|
||||
commandTimeout=nl, commandType=cl,
|
||||
flags=CommandFlags.Buffered, cancellationToken=ct)
|
||||
|> db.connection.QueryAsync<Requirements_>
|
186
src/DatabaseMigrations.fs
Normal file
186
src/DatabaseMigrations.fs
Normal file
|
@ -0,0 +1,186 @@
|
|||
module Bidello.DatabaseMigrations
|
||||
|
||||
open FluentMigrator
|
||||
|
||||
[<AbstractClass>]
|
||||
type OnlyUp () =
|
||||
inherit Migration ()
|
||||
override _.Down () = failwith "Down is not implemented in this migration"
|
||||
|
||||
[<Migration(20241014_0000L)>]
|
||||
type Init () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
let table_notify = """
|
||||
|
||||
-- DROP FUNCTION public.notify_table_update();
|
||||
|
||||
CREATE OR REPLACE FUNCTION public.notify_table_update()
|
||||
RETURNS trigger
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
DECLARE
|
||||
row RECORD;
|
||||
output TEXT;
|
||||
|
||||
BEGIN
|
||||
-- Checking the Operation Type
|
||||
IF (TG_OP = 'DELETE') THEN
|
||||
row = OLD;
|
||||
ELSE
|
||||
row = NEW;
|
||||
END IF;
|
||||
|
||||
-- Forming the Output as notification. You can choose you own notification.
|
||||
output = 'Update on bidello database';
|
||||
|
||||
-- Calling the pg_notify for table_update event with output as payload
|
||||
|
||||
PERFORM pg_notify('bidello_database_update', output);
|
||||
|
||||
-- Returning null because it is an after trigger.
|
||||
RETURN NULL;
|
||||
END;
|
||||
$function$
|
||||
;
|
||||
"""
|
||||
let cron_table = """
|
||||
CREATE TABLE public.cron (
|
||||
job_name varchar(24) NOT NULL primary key,
|
||||
"when" text NOT NULL,
|
||||
executable text NOT NULL,
|
||||
"user" text NOT NULL,
|
||||
workdir text NOT NULL,
|
||||
comment text,
|
||||
args text[]
|
||||
);"""
|
||||
|
||||
let ownership0 = """ALTER TABLE public.cron OWNER TO pico;"""
|
||||
|
||||
let hosts_table = """
|
||||
CREATE TABLE public.hosts (
|
||||
id SERIAL,
|
||||
hostname text NOT NULL,
|
||||
job_name varchar(24) NOT NULL references cron(job_name),
|
||||
comment text
|
||||
);"""
|
||||
let ownership1 = """ALTER TABLE public.hosts OWNER TO pico;"""
|
||||
|
||||
let triggers = [
|
||||
"""CREATE TRIGGER bidello_hosts_notify_delete
|
||||
AFTER DELETE ON public.hosts FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
|
||||
"""CREATE TRIGGER bidello_hosts_notify_insert
|
||||
AFTER INSERT ON public.hosts FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
|
||||
"""CREATE TRIGGER bidello_hosts_notify_update
|
||||
AFTER UPDATE ON public.hosts FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
|
||||
"""CREATE TRIGGER bidello_cron_notify_delete
|
||||
AFTER DELETE ON public.cron FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
"""CREATE TRIGGER bidello_cron_notify_insert
|
||||
AFTER INSERT ON public.cron FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
|
||||
"""CREATE TRIGGER bidello_cron_notify_update
|
||||
AFTER UPDATE ON public.cron FOR EACH STATEMENT EXECUTE FUNCTION public.notify_table_update();"""
|
||||
|
||||
]
|
||||
|
||||
let env_table = """
|
||||
CREATE TABLE public.environment (
|
||||
id SERIAL,
|
||||
variable text NOT NULL,
|
||||
value text NOT NULL,
|
||||
job_name varchar(24) NOT NULL references cron(job_name),
|
||||
comment text
|
||||
);"""
|
||||
let ownership2 = """ALTER TABLE public.environment OWNER TO pico;"""
|
||||
|
||||
x.Execute.Sql table_notify
|
||||
x.Execute.Sql cron_table
|
||||
x.Execute.Sql ownership0
|
||||
x.Execute.Sql hosts_table
|
||||
x.Execute.Sql ownership1
|
||||
x.Execute.Sql env_table
|
||||
x.Execute.Sql ownership2
|
||||
triggers |> List.iter (fun s -> s |> x.Execute.Sql |> ignore)
|
||||
|
||||
[<Migration(20241014_0001L)>]
|
||||
type JobTable () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
|
||||
let job_table = """
|
||||
CREATE TABLE public.current_jobs (
|
||||
job_name varchar(24) NOT NULL references cron(job_name),
|
||||
hostname text NOT NULL,
|
||||
exact_ts timestamptz NOT NULL,
|
||||
);"""
|
||||
let ownership = """ALTER TABLE public.current_jobs OWNER TO pico;"""
|
||||
|
||||
x.Execute.Sql job_table
|
||||
x.Execute.Sql ownership
|
||||
|
||||
|
||||
[<Migration(20241015_0000L)>]
|
||||
type UpdatedAt () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
let tables = ["cron"; "current_jobs"; "hosts"; "environment"]
|
||||
tables
|
||||
|> List.iter (fun table ->
|
||||
x.Execute.Sql $"""ALTER TABLE {table}
|
||||
ADD updated_at timestamptz
|
||||
DEFAULT current_timestamp NOT NULL;""")
|
||||
|
||||
[<Migration(20241019_0000L)>]
|
||||
type AddHostnameToEnv () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
x.Alter.Table("environment")
|
||||
.AddColumn("hostname").AsString().NotNullable()
|
||||
.SetExistingRowsTo("edi")
|
||||
|> ignore
|
||||
|
||||
[<Migration(20241019_0001L)>]
|
||||
type AddBacklog () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
"ALTER TABLE environment ALTER COLUMN hostname SET NOT NULL;"
|
||||
|> x.Execute.Sql
|
||||
|
||||
"""ALTER TABLE current_jobs DROP COLUMN args;"""
|
||||
|> x.Execute.Sql
|
||||
"""ALTER TABLE current_jobs DROP COLUMN env;"""
|
||||
|> x.Execute.Sql
|
||||
|
||||
let backlog =
|
||||
"""CREATE TABLE backlog (
|
||||
job_name varchar(24) NOT NULL references cron(job_name),
|
||||
hostname text NOT NULL ,
|
||||
done_at timestamptz NOT NULL,
|
||||
|
||||
executable text NOT NULL,
|
||||
"user" text NOT NULL,
|
||||
workdir text NOT NULL,
|
||||
args text[],
|
||||
env text -- format: "key=value|key=value"
|
||||
);"""
|
||||
x.Execute.Sql backlog
|
||||
|
||||
[<Migration(20241024_0000L)>]
|
||||
type ChangeBacklogDrop () =
|
||||
inherit OnlyUp ()
|
||||
override x.Up() =
|
||||
x.Delete.Column("executable").FromTable "backlog" |> ignore
|
||||
x.Delete.Column("user").FromTable "backlog" |> ignore
|
||||
x.Delete.Column("workdir").FromTable "backlog" |> ignore
|
||||
x.Delete.Column("args").FromTable "backlog" |> ignore
|
||||
x.Delete.Column("env").FromTable "backlog" |> ignore
|
||||
x.Alter.Table("backlog")
|
||||
.AddColumn("cmd").AsString().NotNullable()
|
||||
.AddColumn("stdout").AsString().Nullable()
|
||||
.AddColumn("stderr").AsString().Nullable()
|
||||
.AddColumn("exit_code").AsCustom("smallint").NotNullable()
|
||||
|> ignore
|
||||
|
28
src/Datatypes.fs
Normal file
28
src/Datatypes.fs
Normal file
|
@ -0,0 +1,28 @@
|
|||
module Bidello.Datatypes
|
||||
|
||||
open Orleans
|
||||
open NodaTime
|
||||
|
||||
open Pentole.Path
|
||||
|
||||
[<GenerateSerializer>]
|
||||
type Notification = | Time | Database
|
||||
|
||||
[<GenerateSerializer>]
|
||||
type PatternType = | Hourly | Daily | Weekly | Monthly | After of string | Before of string
|
||||
|
||||
[<GenerateSerializer>]
|
||||
type WhenExpr = | Cron of Instant | Pattern of PatternType
|
||||
|
||||
[<Immutable>]
|
||||
[<GenerateSerializer>]
|
||||
type CronJob = {
|
||||
job_name: string
|
||||
user: string
|
||||
when_: WhenExpr
|
||||
executable: Path
|
||||
args: string list
|
||||
environment: (string * string) list
|
||||
workdir: Path
|
||||
hostname: string
|
||||
}
|
31
src/Environment.fs
Normal file
31
src/Environment.fs
Normal file
|
@ -0,0 +1,31 @@
|
|||
module Bidello.Environment
|
||||
open CommandLine
|
||||
|
||||
type Environment = {
|
||||
pg_user: string
|
||||
pg_password: string
|
||||
pg_dbname: string
|
||||
pg_host: string
|
||||
}
|
||||
let mutable private _env: Environment option = None
|
||||
|
||||
let Environment () =
|
||||
match _env with
|
||||
| None -> invalidOp "Configuration not initialized"
|
||||
| Some c -> c
|
||||
|
||||
|
||||
type options = {
|
||||
[<Option('u', "user", Required=true, HelpText="pgsql user")>] user: string
|
||||
[<Option('p', "password", Required=true, HelpText="pgsql password")>] pass: string
|
||||
[<Option('H', "hostname", Required=true, HelpText="pgsql host")>] host: string
|
||||
[<Option('d', "database", Required=false, HelpText="database name")>] dbname: string option
|
||||
}
|
||||
|
||||
let parse_cli (parsed: Parsed<options>) =
|
||||
let env =
|
||||
{ pg_user = parsed.Value.user;
|
||||
pg_password = parsed.Value.pass
|
||||
pg_host = parsed.Value.host
|
||||
pg_dbname = parsed.Value.dbname |> Option.defaultValue "bidello" }
|
||||
_env <- Some env
|
22
src/Grains.fs
Normal file
22
src/Grains.fs
Normal file
|
@ -0,0 +1,22 @@
|
|||
module Bidello.Grains
|
||||
|
||||
open Orleans
|
||||
open System.Collections.Concurrent
|
||||
open System.Threading
|
||||
open System.Threading.Tasks
|
||||
|
||||
open Bidello.Datatypes
|
||||
|
||||
|
||||
type IShellGrain =
|
||||
inherit IGrainWithIntegerKey
|
||||
abstract schedule: CancellationToken -> CronJob -> Task
|
||||
|
||||
|
||||
type ShellGrain() =
|
||||
inherit Orleans.Grain ()
|
||||
|
||||
interface IShellGrain with
|
||||
member _.schedule (ct) (job: CronJob) = task {
|
||||
printfn "Grain view: %A" job
|
||||
}
|
5
src/Jobs.fs
Normal file
5
src/Jobs.fs
Normal file
|
@ -0,0 +1,5 @@
|
|||
module Bidello.Jobs
|
||||
|
||||
open Bidello.Datatypes
|
||||
|
||||
open NodaTime
|
127
src/Library.fs
Normal file
127
src/Library.fs
Normal file
|
@ -0,0 +1,127 @@
|
|||
module Bidello.Main
|
||||
|
||||
open System.Threading
|
||||
open System
|
||||
|
||||
open NodaTime
|
||||
open CommandLine
|
||||
open Orleans
|
||||
open Microsoft.Extensions.Hosting
|
||||
open Microsoft.Extensions.DependencyInjection
|
||||
open Serilog
|
||||
|
||||
open Grains
|
||||
|
||||
open Pentole
|
||||
|
||||
open Bidello.Datatypes
|
||||
open Bidello.Environment
|
||||
|
||||
let logger_config: LoggingHelpers.Configuration = {
|
||||
files = []
|
||||
template = LoggingHelpers.Default.debug_template
|
||||
theme = LoggingHelpers.Default.theme
|
||||
overrides = LoggingHelpers.Default.overrides
|
||||
}
|
||||
|
||||
let logger = LoggingHelpers.from_config logger_config
|
||||
|
||||
(* Features:
|
||||
- per user cron jobs
|
||||
- environment variables
|
||||
- output management (email, syslog)
|
||||
- special time specs: @weekly
|
||||
- randomized execution times
|
||||
- conditional cron jobs: check something, then run
|
||||
- concurrency management
|
||||
- job dependency
|
||||
- custom working directory
|
||||
- backlog: MY_VAR='Hello' ANOTHER_VAR='World!!!' say_hello_world.sh
|
||||
*)
|
||||
|
||||
type Bidello(client: IClusterClient) =
|
||||
inherit BackgroundService()
|
||||
|
||||
override _this.ExecuteAsync(ct: CancellationToken) =
|
||||
let rnd = new Random (2)
|
||||
let db = Database.make logger
|
||||
|
||||
let schedule_jobs (job: CronJob) =
|
||||
let runner = rnd.Next () |> client.GetGrain<IShellGrain>
|
||||
runner.schedule ct job |> ignore
|
||||
|
||||
task {
|
||||
while not ct.IsCancellationRequested do
|
||||
|
||||
let hostname = System.Environment.MachineName
|
||||
|
||||
let! requirements =
|
||||
Database.gather_requirements hostname ct db
|
||||
|
||||
let now = SystemClock.Instance.GetCurrentInstant ()
|
||||
|
||||
// let cronjobs =
|
||||
// Cron.build_sorted_jobs_table logger hostname now requirements
|
||||
|
||||
// cronjobs |> Seq.iter schedule_jobs
|
||||
printfn "%A" requirements
|
||||
|
||||
|
||||
let! _wake_up = db |> Database.wait_notification ct
|
||||
()
|
||||
}
|
||||
|
||||
let private graceful() =
|
||||
let x = ref true
|
||||
|
||||
let swap () =
|
||||
lock x (fun () ->
|
||||
let old = x.Value
|
||||
x.Value <- false
|
||||
old)
|
||||
|
||||
swap
|
||||
|
||||
let graceful_shutdown (should_exit: unit -> bool) (host: IHost) (_sender: obj) (e: ConsoleCancelEventArgs) =
|
||||
|
||||
if should_exit () then
|
||||
e.Cancel <- true
|
||||
logger.Warning "Requested SIGINT, shutting down cleanly. You can request an immediate shutdown now."
|
||||
host.StopAsync () |> ignore
|
||||
else
|
||||
e.Cancel <- false
|
||||
logger.Fatal "Exiting immediately"
|
||||
System.Environment.Exit 2
|
||||
|
||||
let private main_ (host_builder: IHostBuilder) =
|
||||
Database.run_migrations logger
|
||||
|
||||
let host =
|
||||
host_builder
|
||||
.ConfigureServices(fun service ->
|
||||
service.AddHostedService<Bidello>() |> ignore)
|
||||
.UseSerilog(logger)
|
||||
.Build()
|
||||
|
||||
let g = graceful()
|
||||
Console.CancelKeyPress.AddHandler (graceful_shutdown g host)
|
||||
host.Run ()
|
||||
|
||||
let main (args: string array, host_builder: IHostBuilder) =
|
||||
|
||||
let r = CommandLine.Parser.Default.ParseArguments<options> args
|
||||
|
||||
match r with
|
||||
| :? Parsed<options> as parsed ->
|
||||
Bidello.Environment.parse_cli parsed
|
||||
main_ host_builder
|
||||
0
|
||||
| :? NotParsed<options> as _ -> 2
|
||||
| _ -> 1
|
||||
|
||||
|
||||
|
||||
(* Man page
|
||||
- spiega cron expressions
|
||||
- spiega due tavole: cron e hosts
|
||||
*)
|
118
src/LoggingHelpers.fs
Normal file
118
src/LoggingHelpers.fs
Normal file
|
@ -0,0 +1,118 @@
|
|||
module Pentole.LoggingHelpers
|
||||
|
||||
open System
|
||||
|
||||
open Serilog
|
||||
open Serilog.Events
|
||||
open Serilog.Sinks.SystemConsole.Themes
|
||||
open Serilog.Sinks.File
|
||||
|
||||
|
||||
type Override =
|
||||
| Verbose of string
|
||||
| Debug of string
|
||||
| Information of string
|
||||
| Warning of string
|
||||
| Error of string
|
||||
| Fatal of string
|
||||
|
||||
type Buffered = Yes | No
|
||||
type Shared = Yes | No
|
||||
type RollOnSizeLimit = Yes | No
|
||||
|
||||
|
||||
type FileConfiguration = {
|
||||
path: string
|
||||
level: LogEventLevel
|
||||
template: string
|
||||
format_provider: System.IFormatProvider
|
||||
file_size_bytes_limit: uint64
|
||||
level_switch: Core.LoggingLevelSwitch option
|
||||
buffered: Buffered
|
||||
shared: Shared
|
||||
flush_interval: TimeSpan option
|
||||
rolling_interval: RollingInterval
|
||||
roll_on_size_limit: RollOnSizeLimit
|
||||
retained_files: uint
|
||||
lifecycle_hooks: FileLifecycleHooks option
|
||||
}
|
||||
|
||||
type Configuration = {
|
||||
theme: SystemConsoleTheme
|
||||
overrides: Override list
|
||||
template: string
|
||||
files: FileConfiguration list
|
||||
}
|
||||
|
||||
let from_config (c: Configuration) =
|
||||
|
||||
let override_ (lc: LoggerConfiguration) = function
|
||||
| Verbose namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Verbose)
|
||||
| Debug namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Debug)
|
||||
| Information namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Information)
|
||||
| Warning namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Warning)
|
||||
| Error namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Error)
|
||||
| Fatal namespace_ ->
|
||||
lc.MinimumLevel.Override (namespace_, LogEventLevel.Fatal)
|
||||
|
||||
let lc =
|
||||
c.overrides
|
||||
|> List.fold (fun lc target -> override_ lc target) (LoggerConfiguration())
|
||||
|
||||
let null_or = function | Some a -> a | None -> null
|
||||
let buffered = function | Buffered.Yes -> true | Buffered.No -> false
|
||||
let shared = function | Shared.Yes -> true | Shared.No -> false
|
||||
let roll = function | RollOnSizeLimit.Yes -> true | RollOnSizeLimit.No -> false
|
||||
|
||||
let flush_interval = function Some ts -> Nullable ts | None -> Nullable<TimeSpan> ()
|
||||
|
||||
let lc =
|
||||
c.files
|
||||
|> List.fold (fun (lc: LoggerConfiguration) (fc: FileConfiguration) ->
|
||||
lc.WriteTo.File(
|
||||
path = fc.path,
|
||||
restrictedToMinimumLevel = fc.level,
|
||||
outputTemplate = fc.template,
|
||||
formatProvider = fc.format_provider,
|
||||
fileSizeLimitBytes = Nullable (int64 fc.file_size_bytes_limit),
|
||||
levelSwitch = null_or fc.level_switch,
|
||||
buffered = buffered fc.buffered,
|
||||
shared = shared fc.shared,
|
||||
flushToDiskInterval = flush_interval fc.flush_interval,
|
||||
rollingInterval = fc.rolling_interval,
|
||||
rollOnFileSizeLimit = roll fc.roll_on_size_limit,
|
||||
retainedFileCountLimit = Nullable (int fc.retained_files),
|
||||
encoding = System.Text.Encoding.UTF8,
|
||||
hooks = null_or fc.lifecycle_hooks,
|
||||
retainedFileTimeLimit = TimeSpan.Zero))
|
||||
lc
|
||||
|
||||
lc
|
||||
.WriteTo.Console(theme=c.theme, outputTemplate=c.template)
|
||||
.CreateLogger()
|
||||
|
||||
module Default =
|
||||
let retained_files = 31 (* one month *)
|
||||
let file_size_bytes = 1 * 1024 * 1024 * 1024 (* 1GB *)
|
||||
let size_limit = 1 * 1024 * 1024 * 1024 (* 1GB *)
|
||||
let template = "{Timestamp:yyyy-MM-dd HH:mm:ss.fff zzz} [{Level:u3}] {Message:lj}{NewLine}{Exception}";
|
||||
let debug_template = "[{Timestamp:HH:mm:ss} {Level:u3}] |{SourceContext}| {Message:lj}{NewLine}{Exception}"
|
||||
let theme = Serilog.Sinks.SystemConsole.Themes.SystemConsoleTheme.Literate
|
||||
|
||||
let overrides = [
|
||||
Warning "Orleans.Runtime"
|
||||
Warning "Orleans.Hosting"
|
||||
Warning "Microsoft"
|
||||
// Warning "Microsoft.AspNetCore"
|
||||
// Warning "Microsoft.AspNetCore.Hosting"
|
||||
// Warning "Microsoft.AspNetCore.Mvc"
|
||||
// Warning "Microsoft.AspNetCore.Routing"
|
||||
// Warning "Orleans.Runtime.Silo"
|
||||
// Warning "Orleans.Runtime.SiloOptionsLogger"
|
||||
// Warning "Orleans.Runtime.SiloHostedService"
|
||||
]
|
25
src/Result.fs
Normal file
25
src/Result.fs
Normal file
|
@ -0,0 +1,25 @@
|
|||
namespace Pentole
|
||||
|
||||
module Result =
|
||||
let inline protect ([<InlineIfLambda>]f) x =
|
||||
try
|
||||
Ok (f x)
|
||||
with e -> Error e
|
||||
|
||||
let inline pairwise_map fun_ (x: 'a, y: 'a) =
|
||||
match fun_ x with
|
||||
| Error e -> Error e
|
||||
| Ok o ->
|
||||
match fun_ y with | Ok o' -> Ok (o, o') | Error e -> Error e
|
||||
|
||||
let of_option = function | Some s -> Ok s | None -> Error ()
|
||||
|
||||
|
||||
type ToStringWrapper(toString) =
|
||||
override this.ToString() = toString ()
|
||||
|
||||
let Result l = ToStringWrapper(fun _ ->
|
||||
match l with
|
||||
| Ok o -> sprintf "Ok %O" o
|
||||
| _ -> failwith "")
|
||||
|
7
src/String.fs
Normal file
7
src/String.fs
Normal file
|
@ -0,0 +1,7 @@
|
|||
module Pentole.String
|
||||
|
||||
let (|Prefix|_|) (p: string) (s: string) =
|
||||
if s.StartsWith p then
|
||||
s.Substring p.Length |> Some
|
||||
else
|
||||
None
|
42
src/src.fsproj
Normal file
42
src/src.fsproj
Normal file
|
@ -0,0 +1,42 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net8.0</TargetFramework>
|
||||
<GenerateDocumentationFile>true</GenerateDocumentationFile>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="Environment.fs" />
|
||||
<Compile Include="LoggingHelpers.fs" />
|
||||
<Compile Include="Result.fs" />
|
||||
<Compile Include="String.fs" />
|
||||
<Compile Include="Datatypes.fs" />
|
||||
<Compile Include="Grains.fs" />
|
||||
<Compile Include="DatabaseMigrations.fs" />
|
||||
<Compile Include="Database.fs" />
|
||||
<Compile Include="Cron.fs" />
|
||||
<Compile Include="Jobs.fs" />
|
||||
<Compile Include="Library.fs" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="CliWrap" Version="3.6.6" />
|
||||
<PackageReference Include="CommandLineParser.FSharp" Version="2.9.1" />
|
||||
<PackageReference Include="Cronos" Version="0.8.4" />
|
||||
<PackageReference Include="Dapper" Version="2.1.35" />
|
||||
<PackageReference Include="FluentMigrator" Version="6.2.0" />
|
||||
<PackageReference Include="FluentMigrator.Runner" Version="6.2.0" />
|
||||
<PackageReference Include="FluentMigrator.Runner.Postgres" Version="6.2.0" />
|
||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
|
||||
<PackageReference Include="Microsoft.Orleans.Core" Version="8.2.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Server" Version="8.2.0" />
|
||||
<PackageReference Include="Microsoft.Orleans.Streaming" Version="8.2.0" />
|
||||
<PackageReference Include="NodaTime" Version="3.2.0" />
|
||||
<PackageReference Include="Npgsql" Version="8.0.5" />
|
||||
<PackageReference Include="Pentole" Version="0.0.3" />
|
||||
<PackageReference Include="Serilog.Extensions.Hosting" Version="8.0.0" />
|
||||
<PackageReference Include="Serilog.Sinks.Console" Version="6.0.0" />
|
||||
<PackageReference Include="Serilog.Sinks.File" Version="6.0.0" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
Loading…
Reference in a new issue