337 lines
7.3 KiB
Go
337 lines
7.3 KiB
Go
package main
|
|
|
|
import (
|
|
// External
|
|
werr "git.gibonuddevalla.se/go/wrappederror"
|
|
"github.com/jmoiron/sqlx"
|
|
|
|
// Standard
|
|
"bytes"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os/exec"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const ENV_NAME = 0
|
|
const SCRIPT_NAME = 1
|
|
|
|
type ScriptScheduler struct {
|
|
EventQueue chan string
|
|
}
|
|
|
|
type ScriptExecution struct {
|
|
ID int
|
|
TimeStart sql.NullTime `db:"time_start"`
|
|
TimeEnd sql.NullTime `db:"time_end"`
|
|
ScriptName string `db:"script_name"`
|
|
Source string
|
|
Data string
|
|
SSH string
|
|
Env string
|
|
OutputStdout sql.NullString `db:"output_stdout"`
|
|
OutputStderr sql.NullString `db:"output_stderr"`
|
|
ExitCode sql.NullInt16
|
|
}
|
|
|
|
type ScriptExecutionBrief struct {
|
|
ID int
|
|
TimeStart sql.NullTime `db:"time_start"`
|
|
TimeEnd sql.NullTime `db:"time_end"`
|
|
ScriptName string `db:"script_name"`
|
|
SSH string
|
|
ExitCode sql.NullInt16
|
|
HasSource bool `db:"has_source"`
|
|
HasData bool `db:"has_data"`
|
|
HasEnv bool `db:"has_env"`
|
|
HasOutputStdout bool `db:"has_output_stdout"`
|
|
HasOutputStderr bool `db:"has_output_stderr"`
|
|
}
|
|
|
|
func NewScriptScheduler() (sched ScriptScheduler) {
|
|
sched.EventQueue = make(chan string, 64)
|
|
return
|
|
}
|
|
|
|
func (self ScriptScheduler) Loop() { // {{{
|
|
|
|
// Lets check for somehow missed executions every minute.
|
|
// An event SHOULD be received for each new created, but let's be sure.
|
|
tick := time.NewTicker(time.Second * 60)
|
|
|
|
var event string
|
|
for {
|
|
select {
|
|
case <-tick.C:
|
|
self.HandleNextExecution()
|
|
|
|
case event = <-self.EventQueue:
|
|
if event == "SCRIPT_SCHEDULED" {
|
|
self.HandleNextExecution()
|
|
}
|
|
}
|
|
}
|
|
} // }}}
|
|
func (self ScriptScheduler) HandleNextExecution() { // {{{
|
|
se, err := self.GetNextExecution()
|
|
if err != nil {
|
|
logger.Error("script_scheduler", "error", err)
|
|
return
|
|
}
|
|
|
|
if se.ID == 0 {
|
|
return
|
|
}
|
|
|
|
// Setting the time_start value on the database row makes sure it doesn't get handled again.
|
|
se.TimeStart.Time = time.Now()
|
|
se.TimeStart.Valid = true
|
|
se.Update()
|
|
|
|
logger.Info("script_scheduler", "op", "execute", "id", se.ID)
|
|
|
|
var fnames []string
|
|
fnames, err = se.UploadScript()
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
logger.Error("script_execution", "op", "upload_script", "id", se.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
err = se.UploadEnv(fnames[ENV_NAME], fnames[SCRIPT_NAME])
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
logger.Error("script_execution", "op", "upload_env", "id", se.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
err = se.RunScript(fnames[ENV_NAME])
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
logger.Error("script_execution", "op", "run_script", "id", se.ID, "error", err)
|
|
return
|
|
}
|
|
|
|
se.SSHCommand([]byte{}, false, fmt.Sprintf("rm %s %s", fnames[ENV_NAME], fnames[SCRIPT_NAME]))
|
|
|
|
logger.Info("script_scheduler", "op", "handled", "script", fnames[SCRIPT_NAME])
|
|
} // }}}
|
|
func (self ScriptScheduler) GetNextExecution() (e ScriptExecution, err error) { // {{{
|
|
row := db.QueryRowx(`
|
|
SELECT
|
|
e.id,
|
|
time_start,
|
|
time_end,
|
|
data,
|
|
ssh,
|
|
env,
|
|
output_stdout,
|
|
output_stderr,
|
|
exitcode,
|
|
sl.source
|
|
FROM execution e
|
|
INNER JOIN script_log sl ON e.script_log_id = sl.id
|
|
WHERE
|
|
time_start IS NULL
|
|
ORDER BY
|
|
id ASC
|
|
`)
|
|
err = row.StructScan(&e)
|
|
|
|
// Returned ScriptExecution is having an ID of 0 if none was returned
|
|
if err == sql.ErrNoRows {
|
|
err = nil
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
|
|
return
|
|
} // }}}
|
|
|
|
func (se *ScriptExecution) Update() (err error) { // {{{
|
|
_, err = db.Exec(`
|
|
UPDATE public.execution
|
|
SET
|
|
time_start = $2,
|
|
time_end = $3,
|
|
output_stdout = $4,
|
|
output_stderr = $5,
|
|
exitcode = $6
|
|
|
|
WHERE
|
|
id=$1`,
|
|
se.ID,
|
|
se.TimeStart,
|
|
se.TimeEnd,
|
|
se.OutputStdout,
|
|
se.OutputStderr,
|
|
se.ExitCode,
|
|
)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
logger.Error("script_execution", "op", "execute", "id", se.ID, "error", err)
|
|
return
|
|
}
|
|
return
|
|
} // }}}
|
|
func (se *ScriptExecution) SSHCommand(stdin []byte, log bool, args ...string) (stdoutString string, err error) { // {{{
|
|
params := []string{se.SSH}
|
|
params = append(params, args...)
|
|
cmd := exec.Command("ssh", params...)
|
|
|
|
stdout := new(bytes.Buffer)
|
|
stderr := new(bytes.Buffer)
|
|
cmd.Stdin = bytes.NewReader(stdin)
|
|
cmd.Stdout = stdout
|
|
cmd.Stderr = stderr
|
|
err = cmd.Run()
|
|
|
|
// A cleanup command is run after the script. This shouldn't overwrite the output from the actual script.
|
|
if log {
|
|
se.OutputStdout.String = stdout.String()
|
|
se.OutputStderr.String = stderr.String()
|
|
se.ExitCode.Int16 = int16(cmd.ProcessState.ExitCode())
|
|
se.OutputStdout.Valid = true
|
|
se.OutputStderr.Valid = true
|
|
se.ExitCode.Valid = true
|
|
}
|
|
|
|
se.TimeEnd.Time = time.Now()
|
|
se.TimeEnd.Valid = true
|
|
se.Update()
|
|
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
|
|
return stdout.String(), nil
|
|
} // }}}
|
|
func (se *ScriptExecution) UploadScript() (fnames []string, err error) { // {{{
|
|
var filenames string
|
|
filenames, err = se.SSHCommand(
|
|
[]byte(se.Source),
|
|
true,
|
|
`sh -c 'RUNENV=$(mktemp -t datagraph.XXXXXX) && SCRIPT=$(mktemp -t datagraph.XXXXXX) && touch $RUNENV $SCRIPT && chmod 700 $RUNENV $SCRIPT && cat >$SCRIPT && echo $RUNENV $SCRIPT'`,
|
|
)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
}
|
|
|
|
fnames = strings.Split(strings.TrimSpace(filenames), " ")
|
|
|
|
if len(fnames) != 2 {
|
|
err = werr.New("Invalid temp filename count: %d", len(fnames))
|
|
return
|
|
}
|
|
|
|
return fnames[:2], nil
|
|
} // }}}
|
|
func (se *ScriptExecution) UploadEnv(envFname, scriptFname string) (err error) { // {{{
|
|
env := make(map[string]string)
|
|
err = json.Unmarshal([]byte(se.Env), &env)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
|
|
var script = "#!/bin/sh\n\n"
|
|
for key, val := range env {
|
|
script = script + fmt.Sprintf("export %s=\"%s\"\n", key, strings.ReplaceAll(val, `"`, `\"`))
|
|
}
|
|
script = script + "\n" + scriptFname + "\n"
|
|
|
|
_, err = se.SSHCommand(
|
|
[]byte(script),
|
|
true,
|
|
fmt.Sprintf(`sh -c 'cat >%s'`, envFname),
|
|
)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
}
|
|
|
|
return
|
|
} // }}}
|
|
func (se *ScriptExecution) RunScript(fname string) (err error) { // {{{
|
|
_, err = se.SSHCommand([]byte(se.Data), true, fname)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
}
|
|
return
|
|
} // }}}
|
|
|
|
func GetScriptExecutions() (executions []ScriptExecutionBrief, err error) { // {{{
|
|
executions = []ScriptExecutionBrief{}
|
|
|
|
var rows *sqlx.Rows
|
|
rows, err = db.Queryx(`
|
|
SELECT
|
|
e.id,
|
|
time_start,
|
|
time_end,
|
|
ssh,
|
|
sl.name AS script_name,
|
|
exitcode,
|
|
LENGTH(source) > 0 AS has_source,
|
|
LENGTH(data::varchar) > 0 AS has_data,
|
|
LENGTH(env::varchar) > 0 AS has_env,
|
|
LENGTH(output_stdout) > 0 AS has_output_stdout,
|
|
LENGTH(output_stderr) > 0 AS has_output_stderr
|
|
FROM execution e
|
|
INNER JOIN script_log sl ON e.script_log_id = sl.id
|
|
ORDER BY
|
|
id DESC
|
|
LIMIT 100
|
|
`)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var execution ScriptExecutionBrief
|
|
err = rows.StructScan(&execution)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
executions = append(executions, execution)
|
|
}
|
|
|
|
return
|
|
} // }}}
|
|
func GetScriptExecution(id int) (e ScriptExecution, err error) {
|
|
row := db.QueryRowx(`
|
|
SELECT
|
|
e.id,
|
|
time_start,
|
|
time_end,
|
|
ssh,
|
|
sl.name AS script_name,
|
|
sl.source,
|
|
exitcode,
|
|
data,
|
|
env,
|
|
output_stdout,
|
|
output_stderr
|
|
FROM execution e
|
|
INNER JOIN script_log sl ON e.script_log_id = sl.id
|
|
WHERE
|
|
e.id = $1`,
|
|
id,
|
|
)
|
|
|
|
err = row.StructScan(&e)
|
|
if err != nil {
|
|
err = werr.Wrap(err)
|
|
return
|
|
}
|
|
return
|
|
}
|