Run scheduled scripts

This commit is contained in:
Magnus Åhall 2025-08-08 09:52:32 +02:00
parent 6d05152ab2
commit ef0a20ffe0
7 changed files with 294 additions and 28 deletions

200
script_scheduler.go Normal file
View file

@ -0,0 +1,200 @@
package main
import (
// External
werr "git.gibonuddevalla.se/go/wrappederror"
// Standard
"bytes"
"database/sql"
"fmt"
"os/exec"
"strings"
"time"
)
type ScriptScheduler struct {
EventQueue chan string
}
type ScriptExecution struct {
ID int
TimeStart sql.NullTime `db:"time_start"`
TimeEnd sql.NullTime `db:"time_end"`
Source []byte
Data string
SSH string
OutputStdout sql.NullString `db:"output_stdout"`
OutputStderr sql.NullString `db:"output_stderr"`
ExitCode sql.NullInt16
}
func NewScriptScheduler() (sched ScriptScheduler) {
sched.EventQueue = make(chan string, 64)
return
}
func (self ScriptScheduler) Loop() { // {{{
// Lets check for somehow missed executions every minute.
// An event SHOULD be received for each new created, but let's be sure.
tick := time.NewTicker(time.Second * 60)
var event string
for {
select {
case <-tick.C:
self.HandleNextExecution()
case event = <-self.EventQueue:
if event == "SCRIPT_SCHEDULED" {
self.HandleNextExecution()
}
}
}
} // }}}
func (self ScriptScheduler) HandleNextExecution() { // {{{
se, err := self.GetNextExecution()
if err != nil {
logger.Error("script_scheduler", "error", err)
return
}
if se.ID == 0 {
return
}
// Setting the time_start value on the database row makes sure it doesn't get handled again.
se.TimeStart.Time = time.Now()
se.TimeStart.Valid = true
se.Update()
logger.Info("script_scheduler", "op", "execute", "id", se.ID)
fname, err := se.GetScriptTempFilename()
if err != nil {
err = werr.Wrap(err)
logger.Error("script_execution", "op", "get_script_temp_filename", "id", se.ID, "error", err)
return
}
err = se.UploadScript(fname)
if err != nil {
err = werr.Wrap(err)
logger.Error("script_execution", "op", "upload_script", "id", se.ID, "error", err)
return
}
se.SSHCommand([]byte{}, false, fmt.Sprintf("rm %s", fname))
logger.Info("script_scheduler", "op", "handled", "script", fname)
} // }}}
func (self ScriptScheduler) GetNextExecution() (e ScriptExecution, err error) { // {{{
row := db.QueryRowx(`
SELECT
e.id,
time_start,
time_end,
data,
ssh,
output_stdout,
output_stderr,
exitcode,
sl.source
FROM execution e
INNER JOIN script_log sl ON e.script_log_id = sl.id
WHERE
time_start IS NULL
ORDER BY
id ASC
`)
err = row.StructScan(&e)
// Returned ScriptExecution is having an ID of 0 if none was returned
if err == sql.ErrNoRows {
err = nil
return
}
if err != nil {
err = werr.Wrap(err)
return
}
return
} // }}}
func (se *ScriptExecution) Update() (err error) { // {{{
_, err = db.Exec(`
UPDATE public.execution
SET
time_start = $2,
time_end = $3,
output_stdout = $4,
output_stderr = $5,
exitcode = $6
WHERE
id=$1`,
se.ID,
se.TimeStart,
se.TimeEnd,
se.OutputStdout,
se.OutputStderr,
se.ExitCode,
)
if err != nil {
err = werr.Wrap(err)
logger.Error("script_execution", "op", "execute", "id", se.ID, "error", err)
return
}
return
} // }}}
func (se *ScriptExecution) SSHCommand(stdin []byte, log bool, args ...string) (stdoutString string, err error) { // {{{
params := []string{se.SSH}
params = append(params, args...)
cmd := exec.Command("ssh", params...)
stdout := new(bytes.Buffer)
stderr := new(bytes.Buffer)
cmd.Stdin = bytes.NewReader(stdin)
cmd.Stdout = stdout
cmd.Stderr = stderr
err = cmd.Run()
// A cleanup command is run after the script. This shouldn't overwrite the output from the actual script.
if log {
se.OutputStdout.String = stdout.String()
se.OutputStderr.String = stderr.String()
se.ExitCode.Int16 = int16(cmd.ProcessState.ExitCode())
se.OutputStdout.Valid = true
se.OutputStderr.Valid = true
se.ExitCode.Valid = true
}
se.TimeEnd.Time = time.Now()
se.TimeEnd.Valid = true
se.Update()
if err != nil {
err = werr.Wrap(err)
return
}
return stdout.String(), nil
} // }}}
func (se *ScriptExecution) GetScriptTempFilename() (fname string, err error) { // {{{
fname, err = se.SSHCommand([]byte{}, true, "mktemp -t datagraph.XXXXXX")
if err != nil {
err = werr.Wrap(err)
return
}
fname = strings.TrimSpace(fname)
return
} // }}}
func (se *ScriptExecution) UploadScript(fname string) (err error) { // {{{
_, err = se.SSHCommand(se.Source, true, fmt.Sprintf("sh -c 'touch %s && chmod 700 %s && cat >%s && %s'", fname, fname, fname, fname))
if err != nil {
err = werr.Wrap(err)
}
return
} // }}}